Skip to content

Commit

Permalink
Merge pull request #36 from wri/feat/TM-1520-unified-db-bulk-upload
Browse files Browse the repository at this point in the history
[TM-1520] POC Unified DB Bulk Upload
  • Loading branch information
roguenet authored Jan 7, 2025
2 parents a5b2ebd + 39c5b09 commit 594ce2d
Show file tree
Hide file tree
Showing 21 changed files with 769 additions and 196 deletions.
72 changes: 17 additions & 55 deletions apps/unified-database-service/src/airtable/airtable.processor.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
import { Processor, WorkerHost } from "@nestjs/bullmq";
import {
InternalServerErrorException,
LoggerService,
NotFoundException,
NotImplementedException,
Scope
} from "@nestjs/common";
import { InternalServerErrorException, LoggerService, NotImplementedException, Scope } from "@nestjs/common";
import { TMLogService } from "@terramatch-microservices/common/util/tm-log.service";
import { Job } from "bullmq";
import { UpdateEntitiesData } from "./airtable.service";
import { ConfigService } from "@nestjs/config";
import Airtable from "airtable";
import { Project } from "@terramatch-microservices/database/entities";
import { ProjectEntity } from "./entities";
import { AirtableEntity } from "./entities/airtable-entity";
import { Model } from "sequelize-typescript";
import { FieldSet } from "airtable/lib/field_set";
import { Records } from "airtable/lib/records";

const AIRTABLE_ENTITIES = {
project: ProjectEntity
Expand Down Expand Up @@ -52,58 +41,31 @@ export class AirtableProcessor extends WorkerHost {
}
}

private async updateEntities({ entityType, entityUuid }: UpdateEntitiesData) {
this.logger.log(`Beginning entity update: ${JSON.stringify({ entityType, entityUuid })}`);
private async updateEntities({ entityType }: UpdateEntitiesData) {
this.logger.log(`Beginning entity update: ${JSON.stringify({ entityType })}`);

const airtableEntity = AIRTABLE_ENTITIES[entityType];
if (airtableEntity == null) {
throw new InternalServerErrorException(`Entity mapping not found for entity type ${entityType}`);
}

const id = await this.findAirtableEntity(airtableEntity, entityUuid);
const record = await airtableEntity.findOne(entityUuid);
// bogus offset and page size. The big early PPC records are blowing up the JS memory heap. Need
// to make some performance improvements to how the project airtable entity fetches records, pulling
// in fewer included associations. That will be a refactor for the next ticket.
const records = await airtableEntity.findMany(3, 200);
const airtableRecords = await Promise.all(
records.map(async record => ({ fields: await airtableEntity.mapDbEntity(record) }))
);
try {
await this.base(airtableEntity.TABLE_NAME).update(id, await airtableEntity.mapDbEntity(record));
// @ts-expect-error The types for this lib haven't caught up with its support for upserts
// https://github.com/Airtable/airtable.js/issues/348
await this.base(airtableEntity.TABLE_NAME).update(airtableRecords, {
performUpsert: { fieldsToMergeOn: ["uuid"] }
});
} catch (error) {
this.logger.error(
`Entity update failed: ${JSON.stringify({
entityType,
entityUuid,
error
})}`
);
this.logger.error(`Entity update failed: ${JSON.stringify({ entityType, error, airtableRecords }, null, 2)}`);
throw error;
}
this.logger.log(`Entity update complete: ${JSON.stringify({ entityType, entityUuid })}`);
}

private async findAirtableEntity<T extends Model<T>>(entity: AirtableEntity<T>, entityUuid: string) {
let records: Records<FieldSet>;
try {
records = await this.base(entity.TABLE_NAME)
.select({
maxRecords: 2,
fields: [entity.UUID_COLUMN],
filterByFormula: `{${entity.UUID_COLUMN}} = '${entityUuid}'`
})
.firstPage();
} catch (error) {
this.logger.error(
`Error finding entity in Airtable: ${JSON.stringify({ table: entity.TABLE_NAME, entityUuid, error })}`
);
throw new NotFoundException(`No ${entity.TABLE_NAME} with UUID ${entityUuid} found in Airtable`);
}

if (records.length === 0) {
this.logger.error(`No ${entity.TABLE_NAME} with UUID ${entityUuid} found in Airtable`);
throw new NotFoundException(`No ${entity.TABLE_NAME} with UUID ${entityUuid} found in Airtable`);
} else if (records.length > 1) {
this.logger.error(`More than one ${entity.TABLE_NAME} with UUID ${entityUuid} found in Airtable`);
throw new InternalServerErrorException(
`More than one ${entity.TABLE_NAME} with UUID ${entityUuid} found in Airtable`
);
}

return records[0].id;
this.logger.log(`Entity update complete: ${JSON.stringify({ entityType })}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export type EntityType = (typeof ENTITY_TYPES)[number];

export type UpdateEntitiesData = {
entityType: EntityType;
entityUuid: string;
};

@Injectable()
Expand All @@ -18,8 +17,8 @@ export class AirtableService {
constructor(@InjectQueue("airtable") private readonly airtableQueue: Queue) {}

// TODO (NJC) This method will probably go away entirely, or at least change drastically after this POC
async updateAirtableJob(entityType: EntityType, entityUuid: string) {
const data: UpdateEntitiesData = { entityType, entityUuid };
async updateAirtableJob(entityType: EntityType) {
const data: UpdateEntitiesData = { entityType };

this.logger.log(`Adding entity update to queue: ${JSON.stringify(data)}`);
await this.airtableQueue.add("updateEntities", data);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,100 @@
import { Model } from "sequelize-typescript";
import { isArray } from "lodash";
import { Model, ModelType } from "sequelize-typescript";
import { cloneDeep, isArray, isObject, uniq } from "lodash";
import { Attributes } from "sequelize";

export type AirtableEntity<T extends Model<T>> = {
TABLE_NAME: string;
UUID_COLUMN: string;
mapDbEntity: (entity: T) => Promise<object>;
findOne: (uuid: string) => Promise<T>;
findMany: (pageSize: number, offset: number) => Promise<T[]>;
};

export type MergeableInclude = {
model?: ModelType<unknown, unknown>;
association?: string;
attributes?: string[];
include?: MergeableInclude[];
};

/**
* A ColumnMapping is either a tuple of [dbColumn, airtableColumn], or a more descriptive object
*/
export type ColumnMapping<T extends Model<T>> =
| keyof Attributes<T>
| [keyof Attributes<T>, string]
| {
airtableColumn: string;
// Include if this mapping should include a particular DB column in the DB query
dbColumn?: keyof Attributes<T>;
// Include if this mapping should eager load an association on the DB query
include?: MergeableInclude[];
valueMap: (entity: T) => Promise<null | string | number | boolean | Date>;
};

export const selectAttributes = <T extends Model<T>>(columns: ColumnMapping<T>[]) =>
columns.map(mapping => (isArray(mapping) ? mapping[0] : mapping.dbColumn)).filter(dbColumn => dbColumn != null);
columns
.map(mapping => (isArray(mapping) ? mapping[0] : isObject(mapping) ? mapping.dbColumn : mapping))
.filter(dbColumn => dbColumn != null);

/**
* Recursively merges MergeableIncludes to arrive at a cohesive set of IncludeOptions for a Sequelize find
* query.
*/
const mergeInclude = (includes: MergeableInclude[], include: MergeableInclude) => {
const existing = includes.find(
({ model, association }) =>
(model != null && model === include.model) || (association != null && association === include.association)
);
if (existing == null) {
// Use clone deep here so that if this include gets modified in the future, it doesn't mutate the
// original definition.
includes.push(cloneDeep(include));
} else {
if (existing.attributes != null) {
// If either the current include or the new mapping is missing an attributes array, we want
// to make sure the final include is missing it as well so that all columns are pulled.
if (include.attributes == null) {
delete include.attributes;
} else {
// We don't need cloneDeep here because attributes is a simple string array.
existing.attributes = uniq([...existing.attributes, ...include.attributes]);
}
}

if (include.include != null) {
// Use clone deep here so that if this include gets modified in the future, it doesn't mutate the
// original definition.
if (existing.include == null) existing.include = cloneDeep(include.include);
else {
existing.include = include.include.reduce(mergeInclude, existing.include);
}
}
}

return includes;
};

export const selectIncludes = <T extends Model<T>>(columns: ColumnMapping<T>[]) =>
columns.reduce((includes, mapping) => {
if (isArray(mapping) || !isObject(mapping)) return includes;
if (mapping.include == null) return includes;

return mapping.include.reduce(mergeInclude, includes);
}, [] as MergeableInclude[]);

export const mapEntityColumns = async <T extends Model<T>>(entity: T, columns: ColumnMapping<T>[]) => {
const airtableObject = {};
for (const mapping of columns) {
const airtableColumn = isArray(mapping) ? mapping[1] : mapping.airtableColumn;
airtableObject[airtableColumn] = isArray(mapping) ? entity[mapping[0]] : await mapping.valueMap(entity);
const airtableColumn = isArray(mapping)
? mapping[1]
: isObject(mapping)
? mapping.airtableColumn
: (mapping as string);
airtableObject[airtableColumn] = isArray(mapping)
? entity[mapping[0]]
: isObject(mapping)
? await mapping.valueMap(entity)
: entity[mapping];
}

return airtableObject;
Expand Down
Loading

0 comments on commit 594ce2d

Please sign in to comment.