Skip to content

Commit

Permalink
Fix schema migration count being incremented on every restart (#2476)
Browse files Browse the repository at this point in the history
* Fix schema migration count being incremented on every restart

* Update changelog

* Fix removed models not being removed from the store cache

* Fix test
  • Loading branch information
stwiname authored Jul 9, 2024
1 parent e85e7fc commit 6ac9778
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 107 deletions.
7 changes: 5 additions & 2 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Enable ts strict model
- Enable ts strict model

### Fixed
- Incrementing the schemaMigration count on every start (#2476)

## [10.10.0] - 2024-07-01
### Changed
- Bump version with `@subql/common`
Expand Down
47 changes: 24 additions & 23 deletions packages/node-core/src/configure/ProjectUpgrade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {getAllEntitiesRelations} from '@subql/utils';
import {Sequelize, Transaction} from '@subql/x-sequelize';
import {findLast, last, parseInt} from 'lodash';
import {SchemaMigrationService} from '../db';
import {ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {getStartHeight, mainThreadOnly} from '../utils';
Expand Down Expand Up @@ -109,21 +109,26 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
#currentProject: P;

#storeCache?: StoreCacheService;
#storeService?: StoreService;
#schema?: string;
#initialized = false;

private config?: NodeConfig;
private onProjectUpgrade?: OnProjectUpgradeCallback<P>;
private migrationService?: SchemaMigrationService;

private constructor(private _projects: BlockHeightMap<P>, currentHeight: number, private _isRewindable = true) {
private constructor(
private _projects: BlockHeightMap<P>,
currentHeight: number,
private _isRewindable = true
) {
logger.info(
`Projects: ${JSON.stringify(
[..._projects.getAll().entries()].reduce((acc, curr) => {
acc[curr[0]] = curr[1].id;
return acc;
}, {} as Record<number, string>),
[..._projects.getAll().entries()].reduce(
(acc, curr) => {
acc[curr[0]] = curr[1].id;
return acc;
},
{} as Record<number, string>
),
undefined,
2
)}`
Expand All @@ -147,8 +152,6 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
}
this.#initialized = true;
this.#storeCache = storeService.storeCache;
this.#storeService = storeService;
this.#schema = schema;
this.config = config;

this.migrationService = new SchemaMigrationService(
Expand Down Expand Up @@ -194,6 +197,11 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return this.#currentHeight;
}

private get metadata(): CacheMetadataModel {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
return this.#storeCache.metadata;
}

async rewind(
targetBlockHeight: number,
lastProcessedHeight: number,
Expand All @@ -216,7 +224,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject

if (currentProject && nextProject) {
if (this.config?.dbSchema) {
await storeService.init(getAllEntitiesRelations(currentProject.schema), this.config.dbSchema);
await storeService.init(this.config.dbSchema);
}
await this.migrate(currentProject, nextProject, transaction);
}
Expand All @@ -237,12 +245,8 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
if (!this.config.unfinalizedBlocks) {
assert(this.migrationService, 'MigrationService is undefined');
if (this.config.allowSchemaMigration) {
const modifiedModels = await this.migrationService.run(project.schema, newProject.schema, transaction);
if (modifiedModels) {
this.#storeCache?.updateModels(modifiedModels);
assert(this.#schema, 'Schema is undefined');
await this.#storeService?.updateModels(this.#schema, getAllEntitiesRelations(newProject.schema));
}
await this.migrationService.run(project.schema, newProject.schema, transaction);
this.metadata.setIncrement('schemaMigrationCount');
}
}
}
Expand Down Expand Up @@ -410,8 +414,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
}

private async getDeploymentsMetadata(): Promise<Record<number, string>> {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
const deploymentsRaw = await this.#storeCache?.metadata.find('deployments');
const deploymentsRaw = await this.metadata.find('deployments');

if (!deploymentsRaw) return {};

Expand All @@ -420,7 +423,6 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject

@mainThreadOnly()
async updateIndexedDeployments(id: string, blockHeight: number): Promise<void> {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
const deployments = await this.getDeploymentsMetadata();

// If the last deployment is the same as the one we're updating to theres no need to do anything
Expand All @@ -438,12 +440,11 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject

deployments[blockHeight] = id;

this.#storeCache?.metadata.set('deployments', JSON.stringify(deployments));
this.metadata.set('deployments', JSON.stringify(deployments));
}

// Remove metadata deployments beyond this blockHeight
async removeIndexedDeployments(blockHeight: number): Promise<void> {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
const deployments = await this.getDeploymentsMetadata();

// remove all future block heights
Expand All @@ -455,6 +456,6 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
}
});

this.#storeCache?.metadata.set('deployments', JSON.stringify(deployments));
this.metadata.set('deployments', JSON.stringify(deployments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async function setup(
await sequelize.createSchema(`"${schemaName}"`, {});

await storeService.initCoreTables(schemaName);
await initDbSchema(project, schemaName, storeService);
await initDbSchema(schemaName, storeService);

return new SchemaMigrationService(
sequelize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,7 @@ export class SchemaMigrationService {
}
}

async run(
currentSchema: GraphQLSchema | null,
nextSchema: GraphQLSchema,
transaction?: Transaction
): Promise<ModelStatic<any>[] | void> {
async run(currentSchema: GraphQLSchema | null, nextSchema: GraphQLSchema, transaction?: Transaction): Promise<void> {
const schemaDifference = SchemaMigrationService.schemaComparator(currentSchema, nextSchema);
const {
addedEnums,
Expand Down Expand Up @@ -184,7 +180,14 @@ export class SchemaMigrationService {
for (const enumValue of removedEnums) {
migrationAction.dropEnum(enumValue);
}
return migrationAction.run(transaction);

const modelChanges = await migrationAction.run(transaction);

// Update any relevant application state so the right models are used
this.storeService.storeCache.updateModels(modelChanges);
await this.storeService.updateModels(this.dbSchema, getAllEntitiesRelations(nextSchema));

await this.flushCache();
} catch (e: any) {
logger.error(e, 'Failed to execute Schema Migration');
throw e;
Expand Down
16 changes: 11 additions & 5 deletions packages/node-core/src/db/migration-service/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ type RemovedIndexes = Record<string, IndexesOptions[]>;
const logger = getLogger('db-manager');

export class Migration {
private sequelizeModels: ModelStatic<any>[] = [];
/* Models that are added or modified during the migration */
private modifiedModels: ModelStatic<any>[] = [];
private removedModels: string[] = [];
/*
mainQueries are used for executions, that are not reliant on any prior db operations
extraQueries are executions, that are reliant on certain db operations, e.g. comments on foreignKeys or comments on tables, should be executed only after the table has been created
Expand Down Expand Up @@ -103,7 +105,7 @@ export class Migration {
);
}

async run(transaction: Transaction | undefined): Promise<ModelStatic<any>[]> {
async run(transaction?: Transaction): Promise<{modifiedModels: ModelStatic<any>[]; removedModels: string[]}> {
const effectiveTransaction = transaction ?? (await this.sequelize.transaction());

if (this.historical) {
Expand Down Expand Up @@ -137,7 +139,10 @@ export class Migration {

this.afterHandleCockroachIndex();

return this.sequelizeModels;
return {
modifiedModels: this.modifiedModels,
removedModels: this.removedModels,
};
}

private prepareModelAttributesAndIndexes(model: GraphQLModelsType): {
Expand All @@ -160,8 +165,8 @@ export class Migration {

private addModelToSequelizeCache(sequelizeModel: ModelStatic<any>): void {
const modelName = sequelizeModel.name;
if (!this.sequelizeModels.find((m) => m.name === modelName)) {
this.sequelizeModels.push(sequelizeModel);
if (!this.modifiedModels.find((m) => m.name === modelName)) {
this.modifiedModels.push(sequelizeModel);
}
}

Expand Down Expand Up @@ -235,6 +240,7 @@ export class Migration {
// should prioritise dropping the triggers
this.mainQueries.unshift(syncHelper.dropNotifyTrigger(this.schemaName, tableName));
this.mainQueries.push(`DROP TABLE IF EXISTS "${this.schemaName}"."${tableName}";`);
this.removedModels.push(model.name);
}

createColumn(model: GraphQLModelsType, field: GraphQLEntityField): void {
Expand Down
19 changes: 3 additions & 16 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,7 @@ import {IProjectUpgradeService, NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {
getExistingProjectSchema,
getStartHeight,
hasValue,
initDbSchema,
initHotSchemaReload,
mainThreadOnly,
reindex,
} from '../utils';
import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils';
import {BlockHeightMap} from '../utils/blockHeightMap';
import {BaseDsProcessorService} from './ds-processor.service';
import {DynamicDsService} from './dynamic-ds.service';
Expand All @@ -41,7 +33,7 @@ class NotInitError extends Error {
export abstract class BaseProjectService<
API extends IApi,
DS extends BaseDataSource,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>,
> implements IProjectService<DS>
{
private _schema?: string;
Expand Down Expand Up @@ -129,7 +121,6 @@ export abstract class BaseProjectService<

// These need to be init before upgrade and unfinalized services because they may cause rewinds.
await this.initDbSchema();
await this.initHotSchemaReload();

if (this.nodeConfig.proofOfIndex) {
// Prepare for poi migration and creation
Expand Down Expand Up @@ -188,12 +179,8 @@ export abstract class BaseProjectService<
return schema;
}

private async initHotSchemaReload(): Promise<void> {
await initHotSchemaReload(this.schema, this.storeService);
}

private async initDbSchema(): Promise<void> {
await initDbSchema(this.project, this.schema, this.storeService);
await initDbSchema(this.schema, this.storeService);
}

private async ensureMetadata(): Promise<void> {
Expand Down
72 changes: 40 additions & 32 deletions packages/node-core/src/indexer/store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
GraphQLModelsType,
} from '@subql/utils';
import {IndexesOptions, ModelAttributes, ModelStatic, Op, QueryTypes, Sequelize, Transaction} from '@subql/x-sequelize';
import {camelCase, flatten, upperFirst} from 'lodash';
import {camelCase, flatten, last, upperFirst} from 'lodash';
import {NodeConfig} from '../configure';
import {
BTREE_GIST_EXTENSION_EXIST_QUERY,
Expand Down Expand Up @@ -167,22 +167,52 @@ export class StoreService {

this._metadataModel = this.storeCache.metadata;

await this.initHotSchemaReloadQueries(schema);

this.metadataModel.set('historicalStateEnabled', this.historical);
this.metadataModel.setIncrement('schemaMigrationCount');
}

async init(modelsRelations: GraphQLModelsRelationsEnums, schema: string): Promise<void> {
this._modelsRelations = modelsRelations;

async init(schema: string): Promise<void> {
try {
await this.syncSchema(schema);
const tx = await this.sequelize.transaction();
if (this.historical) {
const [results] = await this.sequelize.query(BTREE_GIST_EXTENSION_EXIST_QUERY);
if (results.length === 0) {
throw new Error('Btree_gist extension is required to enable historical data, contact DB admin for support');
}
}
/*
On SyncSchema, if no schema migration is introduced, it would consider current schema to be null, and go all db operations again
every start up is a migration
*/
const schemaMigrationService = new SchemaMigrationService(
this.sequelize,
this,
this.storeCache._flushCache.bind(this.storeCache),
schema,
this.config
);

await schemaMigrationService.run(null, this.subqueryProject.schema, tx);

const deploymentsRaw = await this.metadataModel.find('deployments');
const deployments = deploymentsRaw ? JSON.parse(deploymentsRaw) : {};

// Check if the deployment change or a local project is running
// WARNING:This assumes that the root is the same as the id for local project, there are no checks for this and it could change at any time
if (
this.subqueryProject.id === this.subqueryProject.root ||
last(Object.values(deployments)) !== this.subqueryProject.id
) {
// TODO this should run with the same db transaction as the migration
this.metadataModel.setIncrement('schemaMigrationCount');
}
} catch (e: any) {
exitWithError(new Error(`Having a problem when syncing schema`, {cause: e}), logger);
}
await this.updateModels(schema, modelsRelations);
}

async initHotSchemaReloadQueries(schema: string): Promise<void> {
private async initHotSchemaReloadQueries(schema: string): Promise<void> {
if (this.dbType === SUPPORT_DB.cockRoach) {
logger.warn(`Hot schema reload feature is not supported with ${this.dbType}`);
return;
Expand All @@ -207,34 +237,12 @@ export class StoreService {
}
}

async syncSchema(schema: string): Promise<void> {
const tx = await this.sequelize.transaction();
if (this.historical) {
const [results] = await this.sequelize.query(BTREE_GIST_EXTENSION_EXIST_QUERY);
if (results.length === 0) {
throw new Error('Btree_gist extension is required to enable historical data, contact DB admin for support');
}
}
/*
On SyncSchema, if no schema migration is introduced, it would consider current schema to be null, and go all db operations again
every start up is a migration
*/
const schemaMigrationService = new SchemaMigrationService(
this.sequelize,
this,
this.storeCache._flushCache.bind(this.storeCache),
schema,
this.config
);
await schemaMigrationService.run(null, this.subqueryProject.schema, tx);
}

async updateModels(schema: string, modelsRelations: GraphQLModelsRelationsEnums): Promise<void> {
this._modelsRelations = modelsRelations;
try {
this._modelIndexedFields = await this.getAllIndexFields(schema);
} catch (e: any) {
exitWithError(new Error(`Having a problem when get indexed fields`, {cause: e}), logger);
exitWithError(new Error(`Having a problem when getting indexed fields`, {cause: e}), logger);
}
}

Expand Down Expand Up @@ -269,7 +277,7 @@ export class StoreService {
sequelizeModel.addHook('beforeValidate', (attributes, options) => {
attributes.__block_range = [this.blockHeight, null];
});
// TODO, remove id and block_range constrain, check id manually
// TODO, remove id and block_range constraint, check id manually
// see https://github.com/subquery/subql/issues/1542
}

Expand Down
Loading

0 comments on commit 6ac9778

Please sign in to comment.