diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index 3490d98807..d9c2f26ea8 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - lazy loading for monitor service (#2583) +- Add an `--enable-cache` flag, allowing you to choose between DB or cache for IO operations. + +### Fixed +- When using a GET query to retrieve an entity, it will include a “store” field. ### Fixed - When configuring multiple endpoints, poor network conditions may lead to block crawling delays. (#2572) diff --git a/packages/node-core/src/configure/NodeConfig.ts b/packages/node-core/src/configure/NodeConfig.ts index 8429a8796f..c8ecd3baf2 100644 --- a/packages/node-core/src/configure/NodeConfig.ts +++ b/packages/node-core/src/configure/NodeConfig.ts @@ -56,6 +56,7 @@ export interface IConfig { readonly csvOutDir?: string; readonly monitorOutDir: string; readonly monitorFileSize?: number; + readonly enableCache?: boolean; } export type MinConfig = Partial> & Pick; @@ -328,6 +329,10 @@ export class NodeConfig implements IConfig { return this._config.monitorFileSize ?? this._config.proofOfIndex ? defaultMonitorFileSize : 0; } + get enableCache(): boolean { + return this._config.enableCache ?? true; + } + merge(config: Partial): this { assign(this._config, config); return this; diff --git a/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts b/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts index 36f87a7f07..6921b6cf39 100644 --- a/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts +++ b/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts @@ -3,7 +3,7 @@ import {SchedulerRegistry} from '@nestjs/schedule'; import {Sequelize} from '@subql/x-sequelize'; -import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer'; +import {CacheMetadataModel, IStoreModelProvider, ISubqueryProject, StoreCacheService, StoreService} from '../indexer'; import {NodeConfig} from './NodeConfig'; import {IProjectUpgradeService, ProjectUpgradeService, upgradableSubqueryProject} from './ProjectUpgrade.service'; @@ -289,7 +289,7 @@ describe('Project Upgrades', () => { describe('Upgradable subquery project', () => { let upgradeService: ProjectUpgradeService; let project: ISubqueryProject & IProjectUpgradeService; - let storeCache: StoreCacheService; + let storeCache: IStoreModelProvider; beforeEach(async () => { storeCache = new StoreCacheService({} as any, {} as any, {} as any, new SchedulerRegistry()); diff --git a/packages/node-core/src/configure/ProjectUpgrade.service.ts b/packages/node-core/src/configure/ProjectUpgrade.service.ts index 6bd66e80e7..09f8d8a535 100644 --- a/packages/node-core/src/configure/ProjectUpgrade.service.ts +++ b/packages/node-core/src/configure/ProjectUpgrade.service.ts @@ -7,7 +7,7 @@ import {ParentProject} from '@subql/types-core'; import {Sequelize, Transaction} from '@subql/x-sequelize'; import {findLast, last, parseInt} from 'lodash'; import {SchemaMigrationService} from '../db'; -import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer'; +import {IMetadata, IStoreModelProvider, ISubqueryProject, StoreService} from '../indexer'; import {getLogger} from '../logger'; import {exitWithError, monitorWrite} from '../process'; import {getStartHeight, mainThreadOnly} from '../utils'; @@ -107,7 +107,7 @@ export class ProjectUpgradeService

{ + async removeIndexedDeployments(blockHeight: number, tx?: Transaction): Promise { const deployments = await this.getDeploymentsMetadata(); // remove all future block heights @@ -466,6 +465,6 @@ export class ProjectUpgradeService

Promise, private dbSchema: string, private config: NodeConfig, private dbType: SUPPORT_DB = SUPPORT_DB.postgres @@ -115,8 +114,7 @@ export class SchemaMigrationService { const sortedAddedModels = alignModelOrder(sortedSchemaModels, addedModels); const sortedModifiedModels = alignModelOrder(sortedSchemaModels, modifiedModels); - // Flush any pending data before running the migration - await this.flushCache(true); + await cacheProviderFlushData(this.storeService.modelProvider, true); const migrationAction = await Migration.create( this.sequelize, @@ -186,10 +184,10 @@ export class SchemaMigrationService { const modelChanges = await migrationAction.run(transaction); // Update any relevant application state so the right models are used - this.storeService.storeCache.updateModels(modelChanges); + this.storeService.modelProvider.updateModels(modelChanges); await this.storeService.updateModels(this.dbSchema, getAllEntitiesRelations(nextSchema)); - await this.flushCache(); + await cacheProviderFlushData(this.storeService.modelProvider, true); } catch (e: any) { logger.error(e, 'Failed to execute Schema Migration'); throw e; diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 5ddb106d14..f3d98557da 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -5,17 +5,18 @@ import assert from 'assert'; import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; import {hexToU8a, u8aEq} from '@subql/utils'; +import {Transaction} from '@subql/x-sequelize'; import {NodeConfig, IProjectUpgradeService} from '../../configure'; import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../events'; import {getLogger} from '../../logger'; -import {exitWithError, monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process'; +import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process'; import {IQueue, mainThreadOnly} from '../../utils'; import {MonitorServiceInterface} from '../monitor.service'; import {PoiBlock, PoiSyncService} from '../poi'; import {SmartBatchService} from '../smartBatch.service'; import {StoreService} from '../store.service'; -import {StoreCacheService} from '../storeCache'; -import {CachePoiModel} from '../storeCache/cachePoi'; +import {IStoreModelProvider} from '../storeModelProvider'; +import {IPoi} from '../storeModelProvider/poi'; import {IBlock, IProjectService, ISubqueryProject} from '../types'; const logger = getLogger('BaseBlockDispatcherService'); @@ -63,7 +64,7 @@ export abstract class BaseBlockDispatcher implements IB private projectUpgradeService: IProjectUpgradeService, protected queue: Q, protected storeService: StoreService, - private storeCacheService: StoreCacheService, + private storeModelProvider: IStoreModelProvider, private poiSyncService: PoiSyncService, protected monitorService?: MonitorServiceInterface ) { @@ -75,7 +76,7 @@ export abstract class BaseBlockDispatcher implements IB async init(onDynamicDsCreated: (height: number) => void): Promise { this._onDynamicDsCreated = onDynamicDsCreated; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.setProcessedBlockCount((await this.storeCacheService.metadata.find('processedBlockCount', 0))!); + this.setProcessedBlockCount((await this.storeModelProvider.metadata.find('processedBlockCount', 0))!); } get queueSize(): number { @@ -157,7 +158,7 @@ export abstract class BaseBlockDispatcher implements IB @mainThreadOnly() protected async preProcessBlock(height: number): Promise { monitorCreateBlockStart(height); - this.storeService.setBlockHeight(height); + await this.storeService.setBlockHeight(height); await this.projectUpgradeService.setCurrentHeight(height); @@ -198,10 +199,10 @@ export abstract class BaseBlockDispatcher implements IB throw e; } } else { - this.updateStoreMetadata(height); + await this.updateStoreMetadata(height, undefined, this.storeService.transaction); const operationHash = this.storeService.getOperationMerkleRoot(); - this.createPOI(height, blockHash, operationHash); + await this.createPOI(height, blockHash, operationHash, this.storeService.transaction); if (dynamicDsCreated) { this.onDynamicDsCreated(height); @@ -215,21 +216,11 @@ export abstract class BaseBlockDispatcher implements IB this.setLatestProcessedHeight(height); } - if (this.nodeConfig.storeCacheAsync) { - // Flush all completed block data and don't wait - await this.storeCacheService.flushAndWaitForCapacity(false)?.catch((e) => { - exitWithError(new Error(`Flushing cache failed`, {cause: e}), logger); - }); - } else { - // Flush all data from cache and wait - await this.storeCacheService.flushCache(false); - } - - if (!this.projectService.hasDataSourcesAfterHeight(height)) { - const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`; - await this.storeCacheService.flushCache(false); - exitWithError(msg, logger, 0); - } + await this.storeModelProvider.applyPendingChanges( + height, + !this.projectService.hasDataSourcesAfterHeight(height), + this.storeService.transaction + ); } @OnEvent(AdminEvent.rewindTarget) @@ -258,7 +249,12 @@ export abstract class BaseBlockDispatcher implements IB * @param operationHash * @private */ - private createPOI(height: number, blockHash: string, operationHash: Uint8Array): void { + private async createPOI( + height: number, + blockHash: string, + operationHash: Uint8Array, + tx?: Transaction + ): Promise { if (!this.nodeConfig.proofOfIndex) { return; } @@ -268,8 +264,8 @@ export abstract class BaseBlockDispatcher implements IB const poiBlock = PoiBlock.create(height, blockHash, operationHash, this.project.id); // This is the first creation of POI - this.poi.bulkUpsert([poiBlock]); - this.storeCacheService.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}]); + await this.poi.bulkUpsert([poiBlock], tx); + await this.storeModelProvider.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}], tx); this.eventEmitter.emit(PoiEvent.PoiTarget, { height, timestamp: Date.now(), @@ -277,21 +273,24 @@ export abstract class BaseBlockDispatcher implements IB } @mainThreadOnly() - private updateStoreMetadata(height: number, updateProcessed = true): void { - const meta = this.storeCacheService.metadata; + private async updateStoreMetadata(height: number, updateProcessed = true, tx?: Transaction): Promise { + const meta = this.storeModelProvider.metadata; // Update store metadata - meta.setBulk([ - {key: 'lastProcessedHeight', value: height}, - {key: 'lastProcessedTimestamp', value: Date.now()}, - ]); + await meta.setBulk( + [ + {key: 'lastProcessedHeight', value: height}, + {key: 'lastProcessedTimestamp', value: Date.now()}, + ], + tx + ); // Db Metadata increase BlockCount, in memory ref to block-dispatcher _processedBlockCount if (updateProcessed) { - meta.setIncrement('processedBlockCount'); + await meta.setIncrement('processedBlockCount', undefined, tx); } } - private get poi(): CachePoiModel { - const poi = this.storeCacheService.poi; + private get poi(): IPoi { + const poi = this.storeModelProvider.poi; if (!poi) { throw new Error('Poi service expected poi repo but it was not found'); } diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 83071a864e..01ca467238 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -14,7 +14,7 @@ import {exitWithError, monitorWrite} from '../../process'; import {profilerWrap} from '../../profiler'; import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils'; import {StoreService} from '../store.service'; -import {StoreCacheService} from '../storeCache'; +import {IStoreModelProvider} from '../storeModelProvider'; import {IProjectService, ISubqueryProject} from '../types'; import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; @@ -45,7 +45,7 @@ export abstract class BlockDispatcher projectService: IProjectService, projectUpgradeService: IProjectUpgradeService, storeService: StoreService, - storeCacheService: StoreCacheService, + storeModelProvider: IStoreModelProvider, poiSyncService: PoiSyncService, project: ISubqueryProject, fetchBlocksBatches: BatchBlockFetcher @@ -58,7 +58,7 @@ export abstract class BlockDispatcher projectUpgradeService, new Queue(nodeConfig.batchSize * 3), storeService, - storeCacheService, + storeModelProvider, poiSyncService ); this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process'); diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts index a1ce276196..cc8e5da01e 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts @@ -5,7 +5,7 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {IProjectUpgradeService, NodeConfig} from '../../configure'; import {PoiSyncService} from '../poi'; import {StoreService} from '../store.service'; -import {StoreCacheService} from '../storeCache'; +import {StoreCacheService} from '../storeModelProvider'; import {IProjectService, ISubqueryProject} from '../types'; import {WorkerBlockDispatcher} from './worker-block-dispatcher'; diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 00b1e86c71..8b105e1272 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -15,7 +15,7 @@ import {monitorWrite} from '../../process'; import {AutoQueue, isTaskFlushedError} from '../../utils'; import {MonitorServiceInterface} from '../monitor.service'; import {StoreService} from '../store.service'; -import {StoreCacheService} from '../storeCache'; +import {IStoreModelProvider} from '../storeModelProvider'; import {ISubqueryProject, IProjectService} from '../types'; import {isBlockUnavailableError} from '../worker/utils'; import {BaseBlockDispatcher} from './base-block-dispatcher'; @@ -58,7 +58,7 @@ export abstract class WorkerBlockDispatcher projectService: IProjectService, projectUpgradeService: IProjectUpgradeService, storeService: StoreService, - storeCacheService: StoreCacheService, + storeModelProvider: IStoreModelProvider, poiSyncService: PoiSyncService, project: ISubqueryProject, private createIndexerWorker: () => Promise, @@ -72,7 +72,7 @@ export abstract class WorkerBlockDispatcher projectUpgradeService, initAutoQueue(nodeConfig.workers, nodeConfig.batchSize, nodeConfig.timeout, 'Worker'), storeService, - storeCacheService, + storeModelProvider, poiSyncService, monitorService ); diff --git a/packages/node-core/src/indexer/core.module.ts b/packages/node-core/src/indexer/core.module.ts index fe26a601d9..1f4cb87c3f 100644 --- a/packages/node-core/src/indexer/core.module.ts +++ b/packages/node-core/src/indexer/core.module.ts @@ -2,7 +2,11 @@ // SPDX-License-Identifier: GPL-3.0 import {Module} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {SchedulerRegistry} from '@nestjs/schedule'; +import {Sequelize} from '@subql/x-sequelize'; import {AdminController, AdminListener} from '../admin/admin.controller'; +import {NodeConfig} from '../configure'; import {IndexingBenchmarkService, PoiBenchmarkService} from './benchmark.service'; import {ConnectionPoolService} from './connectionPool.service'; import {ConnectionPoolStateManager} from './connectionPoolState.manager'; @@ -11,7 +15,7 @@ import {MonitorService} from './monitor.service'; import {PoiService, PoiSyncService} from './poi'; import {SandboxService} from './sandbox.service'; import {StoreService} from './store.service'; -import {StoreCacheService} from './storeCache'; +import {IStoreModelProvider, PlainStoreModelService, StoreCacheService} from './storeModelProvider'; @Module({ providers: [ @@ -25,7 +29,20 @@ import {StoreCacheService} from './storeCache'; PoiService, PoiSyncService, StoreService, - StoreCacheService, + { + provide: 'IStoreModelProvider', + useFactory: ( + nodeConfig: NodeConfig, + eventEmitter: EventEmitter2, + schedulerRegistry: SchedulerRegistry, + sequelize: Sequelize + ): IStoreModelProvider => { + return nodeConfig.enableCache + ? new StoreCacheService(sequelize, nodeConfig, eventEmitter, schedulerRegistry) + : new PlainStoreModelService(sequelize, nodeConfig); + }, + inject: [NodeConfig, EventEmitter2, SchedulerRegistry, Sequelize], + }, AdminListener, ], controllers: [AdminController], @@ -37,7 +54,7 @@ import {StoreCacheService} from './storeCache'; PoiService, PoiSyncService, StoreService, - StoreCacheService, + 'IStoreModelProvider', InMemoryCacheService, ], }) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 3d242cda57..6c7ca7be7d 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {DatasourceParams, DynamicDsService} from './dynamic-ds.service'; -import {CacheMetadataModel} from './storeCache'; +import {CacheMetadataModel} from './storeModelProvider'; import {ISubqueryProject} from './types'; class TestDynamicDsService extends DynamicDsService { @@ -69,7 +69,7 @@ describe('DynamicDsService', () => { const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); await service.init(meta); - await service.resetDynamicDatasource(2); + await service.resetDynamicDatasource(2, null as any); await expect(meta.find('dynamicDatasources')).resolves.toEqual([testParam1, testParam2]); await expect(service.getDynamicDatasources()).resolves.toEqual([testParam1, testParam2]); @@ -79,7 +79,7 @@ describe('DynamicDsService', () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); - meta.set('dynamicDatasources', [testParam1, testParam2, testParam3, testParam4]); + await meta.set('dynamicDatasources', [testParam1, testParam2, testParam3, testParam4]); await expect(service.getDynamicDatasources()).resolves.toEqual([testParam1, testParam2]); await expect(service.getDynamicDatasources(true)).resolves.toEqual([ diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 8203dc96f9..9dae8d6245 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -1,10 +1,11 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 +import {Transaction} from '@subql/x-sequelize'; import {cloneDeep} from 'lodash'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; -import {CacheMetadataModel} from './storeCache/cacheMetadata'; +import {IMetadata} from './storeModelProvider'; import {ISubqueryProject} from './types'; const logger = getLogger('dynamic-ds'); @@ -26,7 +27,7 @@ export interface IDynamicDsService { export abstract class DynamicDsService implements IDynamicDsService { - private _metadata?: CacheMetadataModel; + private _metadata?: IMetadata; private _datasources?: DS[]; private _datasourceParams?: DatasourceParams[]; @@ -34,7 +35,7 @@ export abstract class DynamicDsService { + async init(metadata: IMetadata): Promise { this._metadata = metadata; await this.getDynamicDatasources(true); @@ -47,7 +48,7 @@ export abstract class DynamicDsService { + async resetDynamicDatasource(targetHeight: number, tx: Transaction): Promise { if (this._datasourceParams && this._datasourceParams.length !== 0) { const filteredDs = this._datasourceParams.filter((ds) => ds.startBlock <= targetHeight); - this.metadata.set(METADATA_KEY, filteredDs); + await this.metadata.set(METADATA_KEY, filteredDs, tx); await this.loadDynamicDatasources(filteredDs); } } - async createDynamicDatasource(params: DatasourceParams): Promise { + // TODO make tx required + async createDynamicDatasource(params: DatasourceParams, tx?: Transaction): Promise { try { const ds = await this.getDatasource(params); - this.metadata.setNewDynamicDatasource(params); + await this.metadata.setNewDynamicDatasource(params, tx); logger.info(`Created new dynamic datasource from template: "${params.templateName}"`); diff --git a/packages/node-core/src/indexer/entities/Metadata.entity.ts b/packages/node-core/src/indexer/entities/Metadata.entity.ts index e02224dee5..b3d7056442 100644 --- a/packages/node-core/src/indexer/entities/Metadata.entity.ts +++ b/packages/node-core/src/indexer/entities/Metadata.entity.ts @@ -39,10 +39,10 @@ export interface Metadata { value: MetadataKeys[keyof MetadataKeys]; } -export interface MetadataModel extends Model, Metadata {} +interface MetadataEntity extends Model, Metadata {} export type MetadataRepo = typeof Model & { - new (values?: unknown, options?: BuildOptions): MetadataModel; + new (values?: unknown, options?: BuildOptions): MetadataEntity; }; async function checkSchemaMetadata(sequelize: Sequelize, schema: string, chainId: string): Promise { diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 33bea47473..2e4d4eda6e 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -135,7 +135,7 @@ const getDictionaryService = () => initDictionaries: () => { /* TODO */ }, - } as any as DictionaryService); + }) as any as DictionaryService; const getBlockDispatcher = () => { const inst = { diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index b2b4c4fbd0..65a76e7ba2 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -15,7 +15,7 @@ import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {mergeNumAndBlocks} from './dictionary/utils'; -import {StoreCacheService} from './storeCache'; +import {IStoreModelProvider} from './storeModelProvider'; import {BypassBlocks, Header, IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; @@ -51,7 +51,7 @@ export abstract class BaseFetchService { @@ -72,11 +73,7 @@ describe('PoiService', () => { } as any; const module: TestingModule = await Test.createTestingModule({ - providers: [ - PoiService, - {provide: NodeConfig, useValue: nodeConfig}, - {provide: StoreCacheService, useValue: storeCache}, - ], + providers: [PoiService, {provide: 'IStoreModelProvider', useValue: storeCache}], }).compile(); service = module.get(PoiService); @@ -158,7 +155,7 @@ describe('PoiService', () => { } as any; await service.rewind(targetBlockHeight, transaction); - expect(storeCache.metadata.bulkRemove).toHaveBeenCalledWith(['lastCreatedPoiHeight']); + expect(storeCache.metadata.bulkRemove).toHaveBeenCalledWith(['lastCreatedPoiHeight'], transaction); }); }); diff --git a/packages/node-core/src/indexer/poi/poi.service.ts b/packages/node-core/src/indexer/poi/poi.service.ts index e4cc050d3f..9f0fb8b890 100644 --- a/packages/node-core/src/indexer/poi/poi.service.ts +++ b/packages/node-core/src/indexer/poi/poi.service.ts @@ -1,7 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import {Injectable, OnApplicationShutdown} from '@nestjs/common'; +import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; import {u8aToHex} from '@subql/utils'; import {Op, QueryTypes, Transaction} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; @@ -9,9 +9,8 @@ import {sqlIterator} from '../../db'; import {getLogger} from '../../logger'; import {PoiRepo} from '../entities'; import {ProofOfIndex, ProofOfIndexHuman, SyncedProofOfIndex} from '../entities/Poi.entity'; -import {PlainPoiModel} from '../poi'; -import {StoreCacheService} from '../storeCache'; -import {CachePoiModel} from '../storeCache/cachePoi'; +import {IStoreModelProvider} from '../storeModelProvider'; +import {IPoi, CachePoiModel, PlainPoiModel} from '../storeModelProvider/poi'; const logger = getLogger('PoiService'); @@ -23,12 +22,9 @@ const logger = getLogger('PoiService'); @Injectable() export class PoiService implements OnApplicationShutdown { private isShutdown = false; - private _poiRepo?: CachePoiModel; + private _poiRepo?: IPoi; - constructor( - protected readonly nodeConfig: NodeConfig, - private storeCache: StoreCacheService - ) {} + constructor(@Inject('IStoreModelProvider') private storeModelProvider: IStoreModelProvider) {} onApplicationShutdown(): void { this.isShutdown = true; @@ -44,7 +40,7 @@ export class PoiService implements OnApplicationShutdown { }; } - get poiRepo(): CachePoiModel { + get poiRepo(): IPoi { if (!this._poiRepo) { throw new Error(`No poi repo inited`); } @@ -52,7 +48,12 @@ export class PoiService implements OnApplicationShutdown { } get plainPoiRepo(): PlainPoiModel { - return this.poiRepo.plainPoiModel; + if (this.poiRepo instanceof CachePoiModel) { + return this.poiRepo.plainPoiModel; + } else if (this.poiRepo instanceof PlainPoiModel) { + return this.poiRepo; + } + throw new Error(`No plainPoiRepo repo inited`); } /** @@ -61,11 +62,11 @@ export class PoiService implements OnApplicationShutdown { * @param schema */ async init(schema: string): Promise { - this._poiRepo = this.storeCache.poi ?? undefined; + this._poiRepo = this.storeModelProvider.poi ?? undefined; if (!this._poiRepo) { return; } - const latestSyncedPoiHeight = await this.storeCache.metadata.find('latestSyncedPoiHeight'); + const latestSyncedPoiHeight = await this.storeModelProvider.metadata.find('latestSyncedPoiHeight'); if (latestSyncedPoiHeight === undefined) { await this.migratePoi(schema); } @@ -94,7 +95,7 @@ export class PoiService implements OnApplicationShutdown { }); // Drop previous keys in metadata - this.storeCache.metadata.bulkRemove(['blockOffset', 'latestPoiWithMmr', 'lastPoiHeight']); + await this.storeModelProvider.metadata.bulkRemove(['blockOffset', 'latestPoiWithMmr', 'lastPoiHeight']); const queries: string[] = []; @@ -166,7 +167,7 @@ export class PoiService implements OnApplicationShutdown { async rewind(targetBlockHeight: number, transaction: Transaction): Promise { await batchDeletePoi(this.poiRepo.model, transaction, targetBlockHeight); - const lastSyncedPoiHeight = await this.storeCache.metadata.find('latestSyncedPoiHeight'); + const lastSyncedPoiHeight = await this.storeModelProvider.metadata.find('latestSyncedPoiHeight'); if (lastSyncedPoiHeight !== undefined && lastSyncedPoiHeight > targetBlockHeight) { const genesisPoi = await this.poiRepo.model.findOne({ @@ -176,12 +177,12 @@ export class PoiService implements OnApplicationShutdown { // This indicates reindex height is less than genesis poi height // And genesis poi has been remove from `batchDeletePoi` if (!genesisPoi) { - this.storeCache.metadata.bulkRemove(['latestSyncedPoiHeight']); + await this.storeModelProvider.metadata.bulkRemove(['latestSyncedPoiHeight'], transaction); } else { - this.storeCache.metadata.set('latestSyncedPoiHeight', targetBlockHeight); + await this.storeModelProvider.metadata.set('latestSyncedPoiHeight', targetBlockHeight, transaction); } } - this.storeCache.metadata.bulkRemove(['lastCreatedPoiHeight']); + await this.storeModelProvider.metadata.bulkRemove(['lastCreatedPoiHeight'], transaction); } } @@ -192,9 +193,8 @@ async function batchDeletePoi( targetBlockHeight: number, batchSize = 10000 ): Promise { - let completed = false; // eslint-disable-next-line no-constant-condition - while (!completed) { + while (true) { try { const recordsToDelete = await model.findAll({ transaction, @@ -207,7 +207,6 @@ async function batchDeletePoi( }); if (recordsToDelete.length === 0) { break; - completed = true; } logger.debug(`Found Poi recordsToDelete ${recordsToDelete.length}`); if (recordsToDelete.length) { diff --git a/packages/node-core/src/indexer/poi/poiSync.service.spec.ts b/packages/node-core/src/indexer/poi/poiSync.service.spec.ts index 580ca00b31..fa30f84b18 100644 --- a/packages/node-core/src/indexer/poi/poiSync.service.spec.ts +++ b/packages/node-core/src/indexer/poi/poiSync.service.spec.ts @@ -6,8 +6,9 @@ import {delay} from '@subql/common'; import {Sequelize} from '@subql/x-sequelize'; import {range} from 'lodash'; import {NodeConfig} from '../../configure'; -import {MetadataFactory, PlainPoiModel, PoiFactory, ProofOfIndex} from '../../indexer'; +import {MetadataFactory, PoiFactory, ProofOfIndex} from '../../indexer'; import {Queue} from '../../utils'; +import {PlainPoiModel} from '../storeModelProvider/poi'; import {ISubqueryProject} from '../types'; import {PoiSyncService} from './poiSync.service'; diff --git a/packages/node-core/src/indexer/poi/poiSync.service.ts b/packages/node-core/src/indexer/poi/poiSync.service.ts index 75f59c4c51..93254b036d 100644 --- a/packages/node-core/src/indexer/poi/poiSync.service.ts +++ b/packages/node-core/src/indexer/poi/poiSync.service.ts @@ -14,9 +14,9 @@ import {exitWithError} from '../../process'; import {hasValue, Queue} from '../../utils'; import {Metadata, MetadataFactory, MetadataRepo} from '../entities'; import {PoiFactory, ProofOfIndex, SyncedProofOfIndex} from '../entities/Poi.entity'; +import {PlainPoiModel} from '../storeModelProvider/poi'; import {ISubqueryProject} from '../types'; import {PoiBlock} from './PoiBlock'; -import {PlainPoiModel} from './poiModel'; const GENESIS_PARENT_HASH = hexToU8a('0x00'); const logger = getLogger('PoiSyncService'); diff --git a/packages/node-core/src/indexer/project.service.spec.ts b/packages/node-core/src/indexer/project.service.spec.ts index 4870290b72..0585b3bd8b 100644 --- a/packages/node-core/src/indexer/project.service.spec.ts +++ b/packages/node-core/src/indexer/project.service.spec.ts @@ -7,6 +7,7 @@ import {NodeConfig, ProjectUpgradeService} from '../configure'; import {BaseDsProcessorService} from './ds-processor.service'; import {DynamicDsService} from './dynamic-ds.service'; import {BaseProjectService} from './project.service'; +import {StoreService} from './store.service'; import {Header, ISubqueryProject} from './types'; import { BaseUnfinalizedBlocksService, @@ -309,26 +310,33 @@ describe('BaseProjectService', () => { init: jest.fn(), initCoreTables: jest.fn(), historical: true, - storeCache: { + modelProvider: { metadata: { - findMany: jest.fn(() => ({})), - find: jest.fn((key: string) => { + findMany: jest.fn(async () => Promise.resolve({})), + find: jest.fn(async (key: string): Promise => { + let result: any; switch (key) { case METADATA_LAST_FINALIZED_PROCESSED_KEY: - return lastFinalizedHeight; + result = lastFinalizedHeight; + break; case METADATA_UNFINALIZED_BLOCKS_KEY: - return JSON.stringify(unfinalizedBlocks); + result = JSON.stringify(unfinalizedBlocks); + break; case 'lastProcessedHeight': - return startBlock - 1; + result = startBlock - 1; + break; case 'deployments': - return JSON.stringify({1: '1'}); + result = JSON.stringify({1: '1'}); + break; default: - return undefined; + result = undefined; + break; } + return Promise.resolve(result); }), set: jest.fn(), flush: jest.fn(), - }, + } as any, resetCache: jest.fn(), flushCache: jest.fn(), _flushCache: jest.fn(), @@ -359,7 +367,7 @@ describe('BaseProjectService', () => { resetDynamicDatasource: jest.fn(), } as unknown as DynamicDsService, // dynamicDsService new EventEmitter2(), // eventEmitter - new TestUnfinalizedBlocksService(nodeConfig, storeService.storeCache) // unfinalizedBlocksService + new TestUnfinalizedBlocksService(nodeConfig, storeService.modelProvider) // unfinalizedBlocksService ); }; diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 1ab12404ca..0ab35f227b 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -19,6 +19,7 @@ import {MetadataKeys} from './entities'; import {PoiSyncService} from './poi'; import {PoiService} from './poi/poi.service'; import {StoreService} from './store.service'; +import {cacheProviderFlushData} from './storeModelProvider'; import {ISubqueryProject, IProjectService, BypassBlocks} from './types'; import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service'; @@ -33,7 +34,7 @@ class NotInitError extends Error { export abstract class BaseProjectService< API extends IApi, DS extends BaseDataSource, - UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService + UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService, > implements IProjectService { private _schema?: string; @@ -110,7 +111,7 @@ export abstract class BaseProjectService< await this.storeService.initCoreTables(this._schema); await this.ensureMetadata(); // DynamicDsService is dependent on metadata so we need to ensure it exists first - await this.dynamicDsService.init(this.storeService.storeCache.metadata); + await this.dynamicDsService.init(this.storeService.modelProvider.metadata); /** * WARNING: The order of the following steps is very important. @@ -149,7 +150,7 @@ export abstract class BaseProjectService< } // Flush any pending operations to set up DB - await this.storeService.storeCache.flushCache(true); + await cacheProviderFlushData(this.storeService.modelProvider, true); } else { assert(startHeight, 'ProjectService must be initalized with a start height in workers'); this.projectUpgradeService.initWorker(startHeight, this.handleProjectChange.bind(this)); @@ -198,7 +199,7 @@ export abstract class BaseProjectService< } private async ensureMetadata(): Promise { - const metadata = this.storeService.storeCache.metadata; + const metadata = this.storeService.modelProvider.metadata; this.eventEmitter.emit(IndexerEvent.NetworkMetadata, this.apiService.networkMeta); @@ -272,7 +273,7 @@ export abstract class BaseProjectService< } protected async getLastProcessedHeight(): Promise { - return this.storeService.storeCache.metadata.find('lastProcessedHeight'); + return this.storeService.modelProvider.metadata.find('lastProcessedHeight'); } private async nextProcessHeight(): Promise { diff --git a/packages/node-core/src/indexer/store.service.test.ts b/packages/node-core/src/indexer/store.service.test.ts new file mode 100644 index 0000000000..1b42fac89b --- /dev/null +++ b/packages/node-core/src/indexer/store.service.test.ts @@ -0,0 +1,262 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {EventEmitter2} from '@nestjs/event-emitter'; +import {buildSchemaFromString} from '@subql/utils'; +import {Sequelize, QueryTypes} from '@subql/x-sequelize'; +import {NodeConfig} from '../configure'; +import {DbOption} from '../db'; +import {StoreService} from './store.service'; +import {CachedModel, PlainStoreModelService, StoreCacheService} from './storeModelProvider'; +const option: DbOption = { + host: process.env.DB_HOST ?? '127.0.0.1', + port: process.env.DB_PORT ? Number(process.env.DB_PORT) : 5432, + username: process.env.DB_USER ?? 'postgres', + password: process.env.DB_PASS ?? 'postgres', + database: process.env.DB_DATABASE ?? 'postgres', + timezone: 'utc', +}; + +jest.setTimeout(60000); +const testSchemaName = 'test_model_store'; +const schema = buildSchemaFromString(` + type Account @entity { + id: ID! # Account address + balance: Int + } +`); + +describe('Check whether the db store and cache store are consistent.', () => { + let sequelize: Sequelize; + let storeService: StoreService; + + beforeAll(async () => { + sequelize = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + await sequelize.authenticate(); + + await sequelize.query(`CREATE SCHEMA ${testSchemaName};`); + const nodeConfig = new NodeConfig({subquery: 'test', proofOfIndex: true, enableCache: false}); + const project = {network: {chainId: '1'}, schema} as any; + const dbModel = new PlainStoreModelService(sequelize, nodeConfig); + storeService = new StoreService(sequelize, nodeConfig, dbModel, project); + await storeService.initCoreTables(testSchemaName); + await storeService.init(testSchemaName); + }); + afterAll(async () => { + await sequelize.query(`DROP SCHEMA ${testSchemaName} CASCADE;`); + await sequelize.close(); + }); + + afterEach(async () => { + if (storeService.transaction) { + await storeService.modelProvider.applyPendingChanges(1, false, storeService.transaction); + } + }); + + it('Same block, Execute the set method multiple times.', async () => { + await storeService.setBlockHeight(1); + + const accountEntity = {id: 'block-001', balance: 100}; + + // account not exist. + let account = await storeService.getStore().get('Account', accountEntity.id); + expect(account).toBeUndefined(); + + // insert account success. + await storeService.getStore().set('Account', accountEntity.id, accountEntity as any); + account = await storeService.getStore().get('Account', accountEntity.id); + expect(account).toEqual(accountEntity); + + // block range check. + const [dbData] = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts" WHERE id = :id`, { + replacements: {id: accountEntity.id}, + type: QueryTypes.SELECT, + transaction: storeService.transaction, + }); + expect(dbData._block_range).toEqual([ + {value: '1', inclusive: true}, + {value: null, inclusive: false}, + ]); + + // update account success. + const account001 = {id: 'block-001', balance: 10000}; + await storeService.getStore().set('Account', account001.id, account001 as any); + const account001After = await storeService.getStore().get('Account', account001.id); + expect(account001After).toEqual(account001); + console.log({accountAfter: account001After, accountEntityAfter: account001}); + + // only one record in db and block range check. + const allDatas = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts"`, { + type: QueryTypes.SELECT, + transaction: storeService.transaction, + }); + expect(allDatas).toHaveLength(1); + expect(allDatas[0]._block_range).toEqual([ + {value: '1', inclusive: true}, + {value: null, inclusive: false}, + ]); + + const account002 = {id: 'block-002', balance: 100}; + await storeService.getStore().bulkCreate('Account', [account002, account001]); + const account002After = await storeService.getStore().get('Account', account002.id); + expect(account002After).toEqual(account002); + + // two records in db and block range check. + const allDatas2 = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts"`, { + type: QueryTypes.SELECT, + transaction: storeService.transaction, + }); + expect(allDatas2).toHaveLength(2); + expect(allDatas2[0]._block_range).toEqual([ + {value: '1', inclusive: true}, + {value: null, inclusive: false}, + ]); + expect(allDatas2[1]._block_range).toEqual([ + {value: '1', inclusive: true}, + {value: null, inclusive: false}, + ]); + }, 30000); + + it('_block_range update check', async () => { + await storeService.setBlockHeight(1000); + + // insert new account. + const account1000Data = {id: 'block-1000', balance: 999}; + await storeService.getStore().set('Account', account1000Data.id, account1000Data as any); + const account1000 = await storeService.getStore().get('Account', account1000Data.id); + expect(account1000).toEqual(account1000Data); + + const allDatas = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts"`, { + type: QueryTypes.SELECT, + transaction: storeService.transaction, + }); + expect(allDatas).toHaveLength(3); + + // set old account. + const account002 = {id: 'block-002', balance: 222222}; + await storeService.getStore().set('Account', account002.id, account002 as any); + const account002After = await storeService.getStore().get('Account', account002.id); + expect(account002After).toEqual(account002); + expect((account002After as any).balance).toEqual(222222); + + const allDatas2 = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts"`, { + type: QueryTypes.SELECT, + transaction: storeService.transaction, + }); + expect(allDatas2).toHaveLength(4); + + // check block range. + const account002Datas = allDatas2.filter((v) => v.id === account002.id); + expect(account002Datas).toHaveLength(2); + expect(account002Datas.map((v) => v._block_range).sort((a, b) => b[0].value - a[0].value)).toEqual([ + [ + {value: '1000', inclusive: true}, + {value: null, inclusive: false}, + ], + [ + {value: '1', inclusive: true}, + {value: '1000', inclusive: false}, + ], + ]); + }, 100000); +}); + +describe('Cache Provider', () => { + let sequelize: Sequelize; + let storeService: StoreService; + let cacheModel: StoreCacheService; + let Account: CachedModel; + + beforeAll(async () => { + sequelize = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + await sequelize.authenticate(); + + await sequelize.query(`CREATE SCHEMA ${testSchemaName};`); + const nodeConfig = new NodeConfig({ + subquery: 'test', + proofOfIndex: true, + enableCache: false, + storeCacheAsync: true, + storeCacheThreshold: 1, + storeCacheUpperLimit: 1, + storeFlushInterval: 0, + }); + const project = {network: {chainId: '1'}, schema} as any; + cacheModel = new StoreCacheService(sequelize, nodeConfig, new EventEmitter2(), null as any); + storeService = new StoreService(sequelize, nodeConfig, cacheModel, project); + await storeService.initCoreTables(testSchemaName); + await storeService.init(testSchemaName); + Account = cacheModel.getModel('Account') as CachedModel; + }); + afterAll(async () => { + await sequelize.query(`DROP SCHEMA ${testSchemaName} CASCADE;`); + await sequelize.close(); + }); + + async function cacheFlush(blockHeight: number, handle: (blockHeight: number) => Promise) { + const tx = await sequelize.transaction(); + tx.afterCommit(() => { + Account.clear(blockHeight); + }); + await storeService.setBlockHeight(blockHeight); + await handle(blockHeight); + await Account.runFlush(tx, blockHeight); + await tx.commit(); + } + + it('For data that already exists, if there is a delete-create-delete operation, the database should have two entries for the data.', async () => { + const getAllAccounts = () => + sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts"`, { + type: QueryTypes.SELECT, + }); + + const accountEntity1 = {id: 'accountEntity-001', balance: 100}; + await cacheFlush(1, async (blockHeight) => { + await Account.set(accountEntity1.id, accountEntity1, blockHeight); + }); + + // database check + let allDatas = await getAllAccounts(); + expect(allDatas).toHaveLength(1); + + // next block 999 + const accountEntity2 = {id: 'accountEntity-002', balance: 9999}; + await cacheFlush(999, async (blockHeight) => { + await Account.remove(accountEntity1.id, blockHeight); + const oldAccunt = await Account.get(accountEntity1.id); + expect(oldAccunt).toBeUndefined(); + + await Account.set(accountEntity2.id, accountEntity2, blockHeight); + }); + + allDatas = await getAllAccounts(); + expect(allDatas).toHaveLength(2); + + // next block 99999 + await cacheFlush(99999, async (blockHeight) => { + // last block, accountEntity1 should be deleted. + const oldAccunt1 = await Account.get(accountEntity1.id); + expect(oldAccunt1).toBeUndefined(); + + let oldAccunt2 = await Account.get(accountEntity2.id); + expect(oldAccunt2.balance).toEqual(accountEntity2.balance); + + await Account.remove(accountEntity2.id, blockHeight); + oldAccunt2 = await Account.get(accountEntity2.id); + expect(oldAccunt2).toBeUndefined(); + + await Account.set(accountEntity2.id, {id: 'accountEntity-002', balance: 999999} as any, blockHeight); + oldAccunt2 = await Account.get(accountEntity2.id); + expect(oldAccunt2.balance).toEqual(999999); + }); + + allDatas = await getAllAccounts(); + expect(allDatas).toHaveLength(3); + }); +}); diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index 8c1c86d05e..a48119a815 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -13,7 +13,16 @@ import { hexToU8a, GraphQLModelsType, } from '@subql/utils'; -import {IndexesOptions, ModelAttributes, ModelStatic, Op, QueryTypes, Sequelize, Transaction} from '@subql/x-sequelize'; +import { + IndexesOptions, + ModelAttributes, + ModelStatic, + Op, + QueryTypes, + Sequelize, + Transaction, + Deferrable, +} from '@subql/x-sequelize'; import {camelCase, flatten, last, upperFirst} from 'lodash'; import {NodeConfig} from '../configure'; import { @@ -29,8 +38,7 @@ import {exitWithError} from '../process'; import {camelCaseObjectKey, customCamelCaseGraphqlKey} from '../utils'; import {MetadataFactory, MetadataRepo, PoiFactory, PoiFactoryDeprecate, PoiRepo} from './entities'; import {Store} from './store'; -import {CacheMetadataModel} from './storeCache'; -import {StoreCacheService} from './storeCache/storeCache.service'; +import {IMetadata, IStoreModelProvider, PlainStoreModelService} from './storeModelProvider'; import {StoreOperations} from './StoreOperations'; import {ISubqueryProject} from './types'; @@ -59,17 +67,19 @@ export class StoreService { private _metaDataRepo?: MetadataRepo; private _historical?: boolean; private _dbType?: SUPPORT_DB; - private _metadataModel?: CacheMetadataModel; + private _metadataModel?: IMetadata; private _schema?: string; // Should be updated each block private _blockHeight?: number; private _operationStack?: StoreOperations; private _lastTimeDbSizeChecked?: number; + #transaction?: Transaction; + constructor( private sequelize: Sequelize, private config: NodeConfig, - readonly storeCache: StoreCacheService, + @Inject('IStoreModelProvider') readonly modelProvider: IStoreModelProvider, @Inject('ISubqueryProject') private subqueryProject: ISubqueryProject ) {} @@ -106,12 +116,16 @@ export class StoreService { return this._historical; } + get transaction(): Transaction | undefined { + return this.#transaction; + } + async syncDbSize(): Promise { if (!this._lastTimeDbSizeChecked || Date.now() - this._lastTimeDbSizeChecked > DB_SIZE_CACHE_TIMEOUT) { this._lastTimeDbSizeChecked = Date.now(); return getDbSizeAndUpdateMetadata(this.sequelize, this.schema); } else { - return this.storeCache.metadata.find('dbSize').then((cachedDbSize) => { + return this.modelProvider.metadata.find('dbSize').then((cachedDbSize) => { if (cachedDbSize !== undefined) { return cachedDbSize; } else { @@ -127,7 +141,7 @@ export class StoreService { return this._dbType; } - private get metadataModel(): CacheMetadataModel { + private get metadataModel(): IMetadata { assert(this._metadataModel, new NoInitError()); return this._metadataModel; } @@ -163,13 +177,13 @@ export class StoreService { } logger.info(`Historical state is ${this.historical ? 'enabled' : 'disabled'}`); - this.storeCache.init(this.historical, this.dbType === SUPPORT_DB.cockRoach, this.metaDataRepo, this.poiRepo); + this.modelProvider.init(this.historical, this.dbType === SUPPORT_DB.cockRoach, this.metaDataRepo, this.poiRepo); - this._metadataModel = this.storeCache.metadata; + this._metadataModel = this.modelProvider.metadata; await this.initHotSchemaReloadQueries(schema); - this.metadataModel.set('historicalStateEnabled', this.historical); + await this.metadataModel.set('historicalStateEnabled', this.historical); } async init(schema: string): Promise { @@ -185,13 +199,7 @@ export class StoreService { On SyncSchema, if no schema migration is introduced, it would consider current schema to be null, and go all db operations again every start up is a migration */ - const schemaMigrationService = new SchemaMigrationService( - this.sequelize, - this, - this.storeCache._flushCache.bind(this.storeCache), - schema, - this.config - ); + const schemaMigrationService = new SchemaMigrationService(this.sequelize, this, schema, this.config); await schemaMigrationService.run(null, this.subqueryProject.schema, tx); @@ -205,7 +213,7 @@ export class StoreService { last(Object.values(deployments)) !== this.subqueryProject.id ) { // TODO this should run with the same db transaction as the migration - this.metadataModel.setIncrement('schemaMigrationCount'); + await this.metadataModel.setIncrement('schemaMigrationCount'); } } catch (e: any) { exitWithError(new Error(`Having a problem when syncing schema`, {cause: e}), logger); @@ -277,6 +285,14 @@ export class StoreService { sequelizeModel.addHook('beforeValidate', (attributes, options) => { attributes.__block_range = [this.blockHeight, null]; }); + + if (!this.config.enableCache) { + sequelizeModel.addHook('beforeBulkCreate', (instances, options) => { + instances.forEach((item) => { + item.__block_range = [this.blockHeight, null]; + }); + }); + } // TODO, remove id and block_range constraint, check id manually // see https://github.com/subquery/subql/issues/1542 } @@ -345,8 +361,18 @@ export class StoreService { return !disableHistorical; } } - setBlockHeight(blockHeight: number): void { + async setBlockHeight(blockHeight: number): Promise { this._blockHeight = blockHeight; + + if (this.modelProvider instanceof PlainStoreModelService) { + assert(!this.#transaction, new Error(`Transaction already exists for height: ${blockHeight}`)); + + this.#transaction = await this.sequelize.transaction({ + deferrable: this._historical || this.dbType === SUPPORT_DB.cockRoach ? undefined : Deferrable.SET_DEFERRED(), + }); + this.#transaction.afterCommit(() => (this.#transaction = undefined)); + } + if (this.config.proofOfIndex) { this.operationStack = new StoreOperations(this.modelsRelations.models); } @@ -422,14 +448,14 @@ group by // This should only been called from CLI, blockHeight in storeService never been set and is required for`beforeFind` hook // Height no need to change for rewind during indexing if (this._blockHeight === undefined) { - this.setBlockHeight(targetBlockHeight); + await this.setBlockHeight(targetBlockHeight); } for (const model of Object.values(this.sequelize.models)) { if ('__block_range' in model.getAttributes()) { await batchDeleteAndThenUpdate(this.sequelize, model, transaction, targetBlockHeight); } } - this.metadataModel.set('lastProcessedHeight', targetBlockHeight); + await this.metadataModel.set('lastProcessedHeight', targetBlockHeight, transaction); // metadataModel will be flushed in reindex.ts#reindex() } @@ -458,7 +484,7 @@ group by } getStore(): Store { - return new Store(this.config, this.storeCache, this); + return new Store(this.config, this.modelProvider, this); } } diff --git a/packages/node-core/src/indexer/store/entity.ts b/packages/node-core/src/indexer/store/entity.ts index 4b256b5c65..a1cd391990 100644 --- a/packages/node-core/src/indexer/store/entity.ts +++ b/packages/node-core/src/indexer/store/entity.ts @@ -7,9 +7,11 @@ export class EntityClass implements Entity { id: string; // Name needs to be private so that it can be converted to a generated entity #name: string; + #store: Store; - constructor(name: string, properties: {id: string} & any, private store: Store) { + constructor(name: string, properties: {id: string} & any, store: Store) { this.#name = name; + this.#store = store; this.id = properties.id; Object.assign(this, properties); } @@ -21,6 +23,6 @@ export class EntityClass implements Entity { } async save(): Promise { - return this.store.set(this.#name, this.id, this); + return this.#store.set(this.#name, this.id, this); } } diff --git a/packages/node-core/src/indexer/store/store.ts b/packages/node-core/src/indexer/store/store.ts index be13122cf6..e5a13acb80 100644 --- a/packages/node-core/src/indexer/store/store.ts +++ b/packages/node-core/src/indexer/store/store.ts @@ -2,11 +2,13 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; +import {Inject} from '@nestjs/common'; import {Store as IStore, Entity, FieldsExpression, GetOptions} from '@subql/types-core'; +import {Transaction} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; import {monitorWrite} from '../../process'; import {handledStringify} from '../../utils'; -import {StoreCacheService} from '../storeCache'; +import {IStoreModelProvider} from '../storeModelProvider'; import {StoreOperations} from '../StoreOperations'; import {OperationType} from '../types'; import {EntityClass} from './entity'; @@ -14,6 +16,7 @@ import {EntityClass} from './entity'; /* A context is provided to allow it to be updated by the owner of the class instance */ type Context = { blockHeight: number; + transaction?: Transaction; operationStack?: StoreOperations; isIndexed: (entity: string, field: string) => boolean; isIndexedHistorical: (entity: string, field: string) => boolean; @@ -22,12 +25,12 @@ type Context = { export class Store implements IStore { /* These need to explicily be private using JS style private properties in order to not leak these in the sandbox */ #config: NodeConfig; - #storeCache: StoreCacheService; + #modelProvider: IStoreModelProvider; #context: Context; - constructor(config: NodeConfig, storeCache: StoreCacheService, context: Context) { + constructor(config: NodeConfig, modelProvider: IStoreModelProvider, context: Context) { this.#config = config; - this.#storeCache = storeCache; + this.#modelProvider = modelProvider; this.#context = context; } @@ -39,7 +42,7 @@ export class Store implements IStore { async get(entity: string, id: string): Promise { try { - const raw = await this.#storeCache.getModel(entity).get(id); + const raw = await this.#modelProvider.getModel(entity).get(id, this.#context.transaction); monitorWrite(() => `-- [Store][get] Entity ${entity} ID ${id}, data: ${handledStringify(raw)}`); return EntityClass.create(entity, raw, this); } catch (e) { @@ -59,7 +62,9 @@ export class Store implements IStore { this.#queryLimitCheck('getByField', entity, options); - const raw = await this.#storeCache.getModel(entity).getByField(field, value, options); + const raw = await this.#modelProvider + .getModel(entity) + .getByFields([Array.isArray(value) ? [field, 'in', value] : [field, '=', value]], options); monitorWrite(() => `-- [Store][getByField] Entity ${entity}, data: ${handledStringify(raw)}`); return raw.map((v) => EntityClass.create(entity, v, this)) as T[]; } catch (e) { @@ -83,7 +88,7 @@ export class Store implements IStore { this.#queryLimitCheck('getByFields', entity, options); - const raw = await this.#storeCache.getModel(entity).getByFields(filter, options); + const raw = await this.#modelProvider.getModel(entity).getByFields(filter, options, this.#context.transaction); monitorWrite(() => `-- [Store][getByFields] Entity ${entity}, data: ${handledStringify(raw)}`); return raw.map((v) => EntityClass.create(entity, v, this)) as T[]; } catch (e) { @@ -95,7 +100,9 @@ export class Store implements IStore { try { const indexed = this.#context.isIndexedHistorical(entity, field as string); assert(indexed, `to query by field ${String(field)}, a unique index must be created on model ${entity}`); - const raw = await this.#storeCache.getModel(entity).getOneByField(field, value); + const [raw] = await this.#modelProvider + .getModel(entity) + .getByFields([Array.isArray(value) ? [field, 'in', value] : [field, '=', value]], {limit: 1}); monitorWrite(() => `-- [Store][getOneByField] Entity ${entity}, data: ${handledStringify(raw)}`); return EntityClass.create(entity, raw, this); } catch (e) { @@ -103,10 +110,9 @@ export class Store implements IStore { } } - // eslint-disable-next-line @typescript-eslint/require-await async set(entity: string, _id: string, data: Entity): Promise { try { - this.#storeCache.getModel(entity).set(_id, data, this.#context.blockHeight); + await this.#modelProvider.getModel(entity).set(_id, data, this.#context.blockHeight, this.#context.transaction); monitorWrite( () => `-- [Store][set] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}` ); @@ -115,10 +121,10 @@ export class Store implements IStore { throw new Error(`Failed to set Entity ${entity} with _id ${_id}: ${e}`); } } - // eslint-disable-next-line @typescript-eslint/require-await + async bulkCreate(entity: string, data: Entity[]): Promise { try { - this.#storeCache.getModel(entity).bulkCreate(data, this.#context.blockHeight); + await this.#modelProvider.getModel(entity).bulkCreate(data, this.#context.blockHeight, this.#context.transaction); for (const item of data) { this.#context.operationStack?.put(OperationType.Set, entity, item); } @@ -131,10 +137,11 @@ export class Store implements IStore { } } - // eslint-disable-next-line @typescript-eslint/require-await async bulkUpdate(entity: string, data: Entity[], fields?: string[]): Promise { try { - this.#storeCache.getModel(entity).bulkUpdate(data, this.#context.blockHeight, fields); + await this.#modelProvider + .getModel(entity) + .bulkUpdate(data, this.#context.blockHeight, fields, this.#context.transaction); for (const item of data) { this.#context.operationStack?.put(OperationType.Set, entity, item); } @@ -146,20 +153,20 @@ export class Store implements IStore { throw new Error(`Failed to bulkCreate Entity ${entity}: ${e}`); } } - // eslint-disable-next-line @typescript-eslint/require-await + async remove(entity: string, id: string): Promise { try { - this.#storeCache.getModel(entity).remove(id, this.#context.blockHeight); + await this.#modelProvider.getModel(entity).bulkRemove([id], this.#context.blockHeight, this.#context.transaction); this.#context.operationStack?.put(OperationType.Remove, entity, id); monitorWrite(() => `-- [Store][remove] Entity ${entity}, height: ${this.#context.blockHeight}, id: ${id}`); } catch (e) { throw new Error(`Failed to remove Entity ${entity} with id ${id}: ${e}`); } } - // eslint-disable-next-line @typescript-eslint/require-await + async bulkRemove(entity: string, ids: string[]): Promise { try { - this.#storeCache.getModel(entity).bulkRemove(ids, this.#context.blockHeight); + await this.#modelProvider.getModel(entity).bulkRemove(ids, this.#context.blockHeight, this.#context.transaction); for (const id of ids) { this.#context.operationStack?.put(OperationType.Remove, entity, id); diff --git a/packages/node-core/src/indexer/storeCache/baseCache.service.ts b/packages/node-core/src/indexer/storeModelProvider/baseCache.service.ts similarity index 79% rename from packages/node-core/src/indexer/storeCache/baseCache.service.ts rename to packages/node-core/src/indexer/storeModelProvider/baseCache.service.ts index bc01a30ea9..29d7c69aad 100644 --- a/packages/node-core/src/indexer/storeCache/baseCache.service.ts +++ b/packages/node-core/src/indexer/storeModelProvider/baseCache.service.ts @@ -6,9 +6,14 @@ import Pino from 'pino'; import {getLogger} from '../../logger'; import {profiler} from '../../profiler'; import {timeout} from '../../utils/promise'; +import {BaseStoreModelService} from './baseStoreModel.service'; +import {ICachedModelControl} from './types'; @Injectable() -export abstract class BaseCacheService implements BeforeApplicationShutdown { +export abstract class BaseCacheService + extends BaseStoreModelService + implements BeforeApplicationShutdown +{ private pendingFlush?: Promise; private queuedFlush?: Promise; protected logger: Pino.Logger; @@ -17,14 +22,14 @@ export abstract class BaseCacheService implements BeforeApplicationShutdown { abstract _resetCache(): Promise | void; abstract isFlushable(): boolean; abstract get flushableRecords(): number; - abstract flushExportStores(): Promise; protected constructor(loggerName: string) { + super(); this.logger = getLogger(loggerName); } @profiler() - async flushCache(forceFlush?: boolean): Promise { + async flushData(forceFlush?: boolean): Promise { const flushCacheGuarded = async (forceFlush?: boolean): Promise => { // When we force flush, this will ensure not interrupt current block flushing, // Force flush will continue after last block flush tx committed. @@ -49,14 +54,13 @@ export abstract class BaseCacheService implements BeforeApplicationShutdown { return this.queuedFlush; } - async resetCache(): Promise { + async resetData(): Promise { await this._resetCache(); } async beforeApplicationShutdown(): Promise { - await timeout(this.flushCache(true), 60, 'Before shutdown flush cache timeout'); + await timeout(this.flushData(true), 60, 'Before shutdown flush cache timeout'); this.logger.info(`Force flush cache successful!`); - await this.flushExportStores(); - this.logger.info(`Force flush exports successful!`); + await super.beforeApplicationShutdown(); } } diff --git a/packages/node-core/src/indexer/storeModelProvider/baseStoreModel.service.ts b/packages/node-core/src/indexer/storeModelProvider/baseStoreModel.service.ts new file mode 100644 index 0000000000..524662894d --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/baseStoreModel.service.ts @@ -0,0 +1,55 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {BeforeApplicationShutdown} from '@nestjs/common'; +import {getLogger} from '@subql/node-core/logger'; +import {ModelStatic} from '@subql/x-sequelize'; +import {MetadataRepo, PoiRepo} from '../entities'; +import {METADATA_ENTITY_NAME} from './metadata/utils'; +import {BaseEntity, IModel} from './model'; +import {POI_ENTITY_NAME} from './poi'; +import {Exporter} from './types'; + +const logger = getLogger('BaseStoreModelService'); +export abstract class BaseStoreModelService> implements BeforeApplicationShutdown { + protected historical = true; + protected poiRepo?: PoiRepo; + protected metadataRepo?: MetadataRepo; + protected cachedModels: Record = {}; + protected useCockroachDb?: boolean; + protected exports: Exporter[] = []; + + protected abstract createModel(entity: string): M; + + init(historical: boolean, useCockroachDb: boolean, meta: MetadataRepo, poi?: PoiRepo): void { + this.historical = historical; + this.metadataRepo = meta; + this.poiRepo = poi; + this.useCockroachDb = useCockroachDb; + } + + getModel(entity: string): IModel { + if (entity === METADATA_ENTITY_NAME) { + throw new Error('Please use getMetadataModel instead'); + } + if (entity === POI_ENTITY_NAME) { + throw new Error('Please use getPoiModel instead'); + } + if (!this.cachedModels[entity]) { + this.cachedModels[entity] = this.createModel(entity); + } + return this.cachedModels[entity] as IModel; + } + + updateModels({modifiedModels, removedModels}: {modifiedModels: ModelStatic[]; removedModels: string[]}): void { + modifiedModels.forEach((m) => { + this.cachedModels[m.name] = this.createModel(m.name); + }); + removedModels.forEach((r) => delete this.cachedModels[r]); + } + + async beforeApplicationShutdown(): Promise { + await Promise.all(this.exports.map((f) => f.shutdown())); + logger.info(`Force flush exports successful!`); + } +} diff --git a/packages/node-core/src/indexer/storeCache/cacheable.ts b/packages/node-core/src/indexer/storeModelProvider/cacheable.ts similarity index 100% rename from packages/node-core/src/indexer/storeCache/cacheable.ts rename to packages/node-core/src/indexer/storeModelProvider/cacheable.ts diff --git a/packages/node-core/src/indexer/storeCache/csvStore.service.spec.ts b/packages/node-core/src/indexer/storeModelProvider/csvStore.service.spec.ts similarity index 100% rename from packages/node-core/src/indexer/storeCache/csvStore.service.spec.ts rename to packages/node-core/src/indexer/storeModelProvider/csvStore.service.spec.ts diff --git a/packages/node-core/src/indexer/storeCache/csvStore.service.ts b/packages/node-core/src/indexer/storeModelProvider/csvStore.service.ts similarity index 96% rename from packages/node-core/src/indexer/storeCache/csvStore.service.ts rename to packages/node-core/src/indexer/storeModelProvider/csvStore.service.ts index 116310ac55..7b7ddadca8 100644 --- a/packages/node-core/src/indexer/storeCache/csvStore.service.ts +++ b/packages/node-core/src/indexer/storeModelProvider/csvStore.service.ts @@ -14,7 +14,10 @@ export class CsvStoreService implements Exporter { private stringifyStream: Stringifier; private readonly writeStream: fs.WriteStream; - constructor(private modelName: string, private outputPath: string) { + constructor( + private modelName: string, + private outputPath: string + ) { this.writeStream = fs.createWriteStream(this.getCsvFilePath(), {flags: 'a'}); this.stringifyStream = stringify({header: !this.fileExist}).on('error', (err) => { diff --git a/packages/node-core/src/indexer/storeModelProvider/index.ts b/packages/node-core/src/indexer/storeModelProvider/index.ts new file mode 100644 index 0000000000..58bfa8248b --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/index.ts @@ -0,0 +1,9 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export * from './storeCache.service'; +export * from './types'; +export * from './model'; +export * from './metadata'; +export * from './storeModel.service'; +export * from './utils'; diff --git a/packages/node-core/src/indexer/storeCache/cacheMetadata.spec.ts b/packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.spec.ts similarity index 100% rename from packages/node-core/src/indexer/storeCache/cacheMetadata.spec.ts rename to packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.spec.ts diff --git a/packages/node-core/src/indexer/storeCache/cacheMetadata.test.ts b/packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.test.ts similarity index 97% rename from packages/node-core/src/indexer/storeCache/cacheMetadata.test.ts rename to packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.test.ts index e1025fc87c..73594431de 100644 --- a/packages/node-core/src/indexer/storeCache/cacheMetadata.test.ts +++ b/packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.test.ts @@ -2,8 +2,8 @@ // SPDX-License-Identifier: GPL-3.0 import {Sequelize} from '@subql/x-sequelize'; -import {MetadataFactory, MetadataKeys, MetadataRepo} from '../'; -import {DbOption} from '../../'; +import {MetadataFactory, MetadataKeys, MetadataRepo} from '../..'; +import {DbOption} from '../../..'; import {CacheMetadataModel} from './cacheMetadata'; const option: DbOption = { diff --git a/packages/node-core/src/indexer/storeCache/cacheMetadata.ts b/packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.ts similarity index 77% rename from packages/node-core/src/indexer/storeCache/cacheMetadata.ts rename to packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.ts index 781f3a4067..d070a131c1 100644 --- a/packages/node-core/src/indexer/storeCache/cacheMetadata.ts +++ b/packages/node-core/src/indexer/storeModelProvider/metadata/cacheMetadata.ts @@ -3,19 +3,21 @@ import assert from 'assert'; import {Transaction} from '@subql/x-sequelize'; -import {hasValue} from '../../utils'; -import {DatasourceParams} from '../dynamic-ds.service'; -import {Metadata, MetadataKeys, MetadataRepo} from '../entities'; -import {Cacheable} from './cacheable'; -import {ICachedModelControl} from './types'; - -type MetadataKey = keyof MetadataKeys; -const incrementKeys: MetadataKey[] = ['processedBlockCount', 'schemaMigrationCount']; -type IncrementalMetadataKey = 'processedBlockCount' | 'schemaMigrationCount'; +import {hasValue} from '../../../utils'; +import {DatasourceParams} from '../../dynamic-ds.service'; +import {Metadata, MetadataKeys, MetadataRepo} from '../../entities'; +import {Cacheable} from '../cacheable'; +import {ICachedModelControl} from '../types'; +import {IMetadata} from './metadata'; +import {MetadataKey, incrementKeys, IncrementalMetadataKey, INCREMENT_QUERY, APPEND_DS_QUERY} from './utils'; + +// type MetadataKey = keyof MetadataKeys; +// const incrementKeys: MetadataKey[] = ['processedBlockCount', 'schemaMigrationCount']; +// type IncrementalMetadataKey = 'processedBlockCount' | 'schemaMigrationCount'; const specialKeys: MetadataKey[] = [...incrementKeys, 'dynamicDatasources']; -export class CacheMetadataModel extends Cacheable implements ICachedModelControl { +export class CacheMetadataModel extends Cacheable implements IMetadata, ICachedModelControl { private setCache: Partial = {}; // Needed for dynamic datasources private getCache: Partial = {}; @@ -83,7 +85,8 @@ export class CacheMetadataModel extends Cacheable implements ICachedModelControl return result; } - set(key: K, value: MetadataKeys[K]): void { + // eslint-disable-next-line @typescript-eslint/require-await + async set(key: K, value: MetadataKeys[K], tx?: Transaction): Promise { if (this.setCache[key] === undefined) { this.flushableRecordCounter += 1; } @@ -94,15 +97,18 @@ export class CacheMetadataModel extends Cacheable implements ICachedModelControl } } - setBulk(metadata: Metadata[]): void { + // eslint-disable-next-line @typescript-eslint/require-await + async setBulk(metadata: Metadata[]): Promise { metadata.map((m) => this.set(m.key, m.value)); } - setIncrement(key: IncrementalMetadataKey, amount = 1): void { + // eslint-disable-next-line @typescript-eslint/require-await + async setIncrement(key: IncrementalMetadataKey, amount = 1): Promise { this.setCache[key] = (this.setCache[key] ?? 0) + amount; } - setNewDynamicDatasource(item: DatasourceParams): void { + // eslint-disable-next-line @typescript-eslint/require-await + async setNewDynamicDatasource(item: DatasourceParams): Promise { this.datasourceUpdates.push(item); } @@ -111,16 +117,7 @@ export class CacheMetadataModel extends Cacheable implements ICachedModelControl assert(this.model.sequelize, `Sequelize is not available on ${this.model.name}`); - await this.model.sequelize.query( - ` - INSERT INTO ${schemaTable} (key, value, "createdAt", "updatedAt") - VALUES ('${key}', '0'::jsonb, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (key) DO - UPDATE SET value = (COALESCE(${schemaTable}.value->>0)::int + '${amount}')::text::jsonb, - "updatedAt" = CURRENT_TIMESTAMP - WHERE ${schemaTable}.key = '${key}';`, - tx && {transaction: tx} - ); + await this.model.sequelize.query(INCREMENT_QUERY(schemaTable, key, amount), tx && {transaction: tx}); } private async appendDynamicDatasources(items: DatasourceParams[], tx?: Transaction): Promise { @@ -128,22 +125,7 @@ export class CacheMetadataModel extends Cacheable implements ICachedModelControl assert(this.model.sequelize, `Sequelize is not available on ${this.model.name}`); - const VALUE = '"value"'; - - const makeSet = (item: DatasourceParams, value: string, index = 1): string => - `jsonb_set(${value}, array[(jsonb_array_length(${VALUE}) + ${index})::text], '${JSON.stringify( - item - )}'::jsonb, true)`; - - await this.model.sequelize.query( - ` - UPDATE ${schemaTable} - SET ${VALUE} = ${items.reduce((acc, item, index) => makeSet(item, acc, index + 1), VALUE)}, - "updatedAt" = CURRENT_TIMESTAMP - WHERE ${schemaTable}.key = 'dynamicDatasources'; - `, - tx && {transaction: tx} - ); + await this.model.sequelize.query(APPEND_DS_QUERY(schemaTable, items), tx && {transaction: tx}); } private async handleSpecialKeys(tx?: Transaction): Promise { @@ -185,7 +167,7 @@ export class CacheMetadataModel extends Cacheable implements ICachedModelControl async runFlush(tx: Transaction, blockHeight?: number): Promise { const ops = Object.entries(this.setCache) .filter(([key]) => !specialKeys.includes(key as MetadataKey)) - .map(([key, value]) => ({key, value} as Metadata)); + .map(([key, value]) => ({key, value}) as Metadata); const lastProcessedHeightIdx = ops.findIndex((k) => k.key === 'lastProcessedHeight'); if (blockHeight !== undefined && lastProcessedHeightIdx >= 0) { @@ -209,7 +191,8 @@ export class CacheMetadataModel extends Cacheable implements ICachedModelControl // This is current only use for migrate Poi // If concurrent change to cache, please add mutex if needed - bulkRemove(keys: K[]): void { + // eslint-disable-next-line @typescript-eslint/require-await + async bulkRemove(keys: K[], tx?: Transaction): Promise { this.removeCache.push(...keys); for (const key of keys) { delete this.setCache[key]; diff --git a/packages/node-core/src/indexer/storeCache/index.ts b/packages/node-core/src/indexer/storeModelProvider/metadata/index.ts similarity index 58% rename from packages/node-core/src/indexer/storeCache/index.ts rename to packages/node-core/src/indexer/storeModelProvider/metadata/index.ts index 2cda5d9c25..94df86710c 100644 --- a/packages/node-core/src/indexer/storeCache/index.ts +++ b/packages/node-core/src/indexer/storeModelProvider/metadata/index.ts @@ -1,7 +1,5 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -export * from './storeCache.service'; -export * from './types'; -export * from './cacheModel'; export * from './cacheMetadata'; +export {IMetadata} from './metadata'; diff --git a/packages/node-core/src/indexer/storeModelProvider/metadata/metadata.ts b/packages/node-core/src/indexer/storeModelProvider/metadata/metadata.ts new file mode 100644 index 0000000000..45b0108463 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/metadata/metadata.ts @@ -0,0 +1,83 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import assert from 'assert'; +import {hasValue} from '@subql/node-core/utils'; +import {Op, Transaction} from '@subql/x-sequelize'; +import {DatasourceParams} from '../../dynamic-ds.service'; +import {Metadata, MetadataKeys, MetadataRepo} from '../../entities'; +import {APPEND_DS_QUERY, INCREMENT_QUERY} from './utils'; + +export type MetadataKey = keyof MetadataKeys; +export const incrementKeys: MetadataKey[] = ['processedBlockCount', 'schemaMigrationCount']; +export type IncrementalMetadataKey = 'processedBlockCount' | 'schemaMigrationCount'; + +export interface IMetadata { + readonly model: MetadataRepo; + find(key: K, fallback?: MetadataKeys[K]): Promise; + findMany(keys: readonly K[]): Promise>; + + set(key: K, value: MetadataKeys[K], tx?: Transaction): Promise; + setBulk(metadata: Metadata[], tx?: Transaction): Promise; + setIncrement(key: IncrementalMetadataKey, amount?: number, tx?: Transaction): Promise; + setNewDynamicDatasource(item: DatasourceParams, tx?: Transaction): Promise; + + bulkRemove(keys: K[], tx?: Transaction): Promise; + + flush?(tx: Transaction, blockHeight: number): Promise; +} + +export class MetadataModel implements IMetadata { + constructor(readonly model: MetadataRepo) {} + + async find(key: K, fallback?: MetadataKeys[K]): Promise { + const record = await this.model.findByPk(key); + + return hasValue(record) ? (record.toJSON().value as MetadataKeys[K]) : fallback; + } + + async findMany(keys: readonly K[]): Promise> { + const entries = await this.model.findAll({ + where: { + key: keys, + }, + }); + + return entries.reduce((arr, curr) => { + arr[curr.key as K] = curr.value as MetadataKeys[K]; + return arr; + }, {} as Partial); + } + + async set(key: K, value: MetadataKeys[K], tx?: Transaction): Promise { + return this.setBulk([{key, value}], tx); + } + + async setBulk(metadata: Metadata[], tx?: Transaction): Promise { + await this.model.bulkCreate(metadata, {transaction: tx, updateOnDuplicate: ['key', 'value']}); + } + + async setIncrement(key: IncrementalMetadataKey, amount = 1, tx?: Transaction): Promise { + const schemaTable = this.model.getTableName(); + + assert(this.model.sequelize, `Sequelize is not available on ${this.model.name}`); + assert(incrementKeys.includes(key), `Key ${key} is not incrementable`); + + await this.model.sequelize.query(INCREMENT_QUERY(schemaTable, key, amount), tx && {transaction: tx}); + } + + async setNewDynamicDatasource(item: DatasourceParams, tx?: Transaction): Promise { + const schemaTable = this.model.getTableName(); + + assert(this.model.sequelize, `Sequelize is not available on ${this.model.name}`); + + await this.model.sequelize.query(APPEND_DS_QUERY(schemaTable, [item]), tx && {transaction: tx}); + } + + async bulkRemove(keys: K[], tx: Transaction): Promise { + await this.model.destroy({ + where: {key: {[Op.in]: keys}}, + transaction: tx, + }); + } +} diff --git a/packages/node-core/src/indexer/storeModelProvider/metadata/utils.ts b/packages/node-core/src/indexer/storeModelProvider/metadata/utils.ts new file mode 100644 index 0000000000..e08b909279 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/metadata/utils.ts @@ -0,0 +1,38 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {DatasourceParams} from '../../dynamic-ds.service'; +import {MetadataKeys} from '../../entities'; + +export type MetadataKey = keyof MetadataKeys; +export const incrementKeys: MetadataKey[] = ['processedBlockCount', 'schemaMigrationCount']; +export type IncrementalMetadataKey = 'processedBlockCount' | 'schemaMigrationCount'; + +export const METADATA_ENTITY_NAME = '_metadata'; + +type SchemaTable = string | {tableName: string; schema: string; delimiter: string}; + +export function INCREMENT_QUERY(schemaTable: SchemaTable, key: MetadataKey, amount = 1): string { + return `INSERT INTO ${schemaTable} (key, value, "createdAt", "updatedAt") + VALUES ('${key}', '0'::jsonb, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (key) DO + UPDATE SET value = (COALESCE(${schemaTable}.value->>0)::int + '${amount}')::text::jsonb, + "updatedAt" = CURRENT_TIMESTAMP + WHERE ${schemaTable}.key = '${key}';`; +} + +export function APPEND_DS_QUERY(schemaTable: SchemaTable, items: DatasourceParams[]): string { + const VALUE = '"value"'; + + const makeSet = (item: DatasourceParams, value: string, index = 1): string => + `jsonb_set(${value}, array[(jsonb_array_length(${VALUE}) + ${index})::text], '${JSON.stringify( + item + )}'::jsonb, true)`; + + return ` + UPDATE ${schemaTable} + SET ${VALUE} = ${items.reduce((acc, item, index) => makeSet(item, acc, index + 1), VALUE)}, + "updatedAt" = CURRENT_TIMESTAMP + WHERE ${schemaTable}.key = 'dynamicDatasources'; + `; +} diff --git a/packages/node-core/src/indexer/storeCache/cacheModel.spec.ts b/packages/node-core/src/indexer/storeModelProvider/model/cacheModel.spec.ts similarity index 86% rename from packages/node-core/src/indexer/storeCache/cacheModel.spec.ts rename to packages/node-core/src/indexer/storeModelProvider/model/cacheModel.spec.ts index f72eff89ac..5c1cb03b80 100644 --- a/packages/node-core/src/indexer/storeCache/cacheModel.spec.ts +++ b/packages/node-core/src/indexer/storeModelProvider/model/cacheModel.spec.ts @@ -3,7 +3,7 @@ import {delay} from '@subql/common'; import {Sequelize} from '@subql/x-sequelize'; -import {NodeConfig} from '../../configure'; +import {NodeConfig} from '../../../configure'; import {CachedModel} from './cacheModel'; jest.mock('@subql/x-sequelize', () => { @@ -106,7 +106,7 @@ describe('cacheModel', () => { // Set an initial model and flush it blockHeight = 1; - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -126,7 +126,7 @@ describe('cacheModel', () => { // updated height to 2 blockHeight = 2; - testModel.set( + await testModel.set( 'entity1_id_0x01', { ...entity1, @@ -144,7 +144,7 @@ describe('cacheModel', () => { await delay(0.2); const entity2 = await testModel.get('entity1_id_0x01'); - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -161,7 +161,7 @@ describe('cacheModel', () => { it('can call getByFields, with entities updated in the same block', async () => { blockHeight = 2; - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -187,7 +187,7 @@ describe('cacheModel', () => { }); it('cannot mutate data in the cache without calling methods', async () => { - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -210,13 +210,13 @@ describe('cacheModel', () => { /* getBy methods use set cache */ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const entity3 = (await testModel.getOneByField('field1', 2))!; + const [entity3] = (await testModel.getByFields([['field1', '=', 2]], {limit: 1}))!; expect(entity3?.field1).toEqual(2); // Mutate field directly entity3.field1 = -2; - const entity4 = await testModel.getOneByField('field1', 2); + const [entity4] = await testModel.getByFields([['field1', '=', 2]], {limit: 1}); expect(entity4?.field1).toEqual(2); }); }); @@ -229,16 +229,16 @@ describe('cacheModel', () => { testModel = new CachedModel(sequelize.model('entity1'), true, {} as NodeConfig, () => i++); }); - it('throws when trying to set undefined', () => { - expect(() => testModel.set('0x01', undefined as any, 1)).toThrow(); - expect(() => testModel.set('0x01', null as any, 1)).toThrow(); + it('throws when trying to set undefined', async () => { + await expect(() => testModel.set('0x01', undefined as any, 1)).rejects.toThrow(); + await expect(() => testModel.set('0x01', null as any, 1)).rejects.toThrow(); }); // it should keep same behavior as hook we used it('when get data after flushed, it should exclude block range', async () => { const spyDbGet = jest.spyOn(testModel.model, 'findOne'); const sypOnApplyBlockRange = jest.spyOn(testModel as any, 'applyBlockRange'); - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -264,8 +264,8 @@ describe('cacheModel', () => { // Some edge cases for set get and remove describe('set, remove and get', () => { it('In different block, remove and set, should able to get', async () => { - testModel.remove('entity1_id_0x01', 4); - testModel.set( + await testModel.remove('entity1_id_0x01', 4); + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -277,9 +277,9 @@ describe('cacheModel', () => { expect(result?.field1).toBe(5); }); - it('In same block, remove then set, should able to get', () => { - testModel.remove('entity1_id_0x01', 1); - testModel.set( + it('In same block, remove then set, should able to get', async () => { + await testModel.remove('entity1_id_0x01', 1); + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -287,7 +287,7 @@ describe('cacheModel', () => { }, 1 ); - const result = testModel.get('entity1_id_0x01'); + const result = await testModel.get('entity1_id_0x01'); // data should be erased from removeCache expect((testModel as any).removeCache.entity1_id_0x01).toBeUndefined(); @@ -295,8 +295,8 @@ describe('cacheModel', () => { }); it('In different block, remove and set, then remove again, should get nothing', async () => { - testModel.remove('entity1_id_0x01', 4); - testModel.set( + await testModel.remove('entity1_id_0x01', 4); + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -304,7 +304,7 @@ describe('cacheModel', () => { }, 6 ); - testModel.remove('entity1_id_0x01', 8); + await testModel.remove('entity1_id_0x01', 8); const result = await testModel.get('entity1_id_0x01'); expect((testModel as any).removeCache.entity1_id_0x01).toBeDefined(); // should match with last removed @@ -318,8 +318,8 @@ describe('cacheModel', () => { }); it('In same block, remove and set, then remove again, should get nothing', async () => { - testModel.remove('entity1_id_0x01', 1); - testModel.set( + await testModel.remove('entity1_id_0x01', 1); + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -327,7 +327,7 @@ describe('cacheModel', () => { }, 1 ); - testModel.remove('entity1_id_0x01', 1); + await testModel.remove('entity1_id_0x01', 1); const result = await testModel.get('entity1_id_0x01'); expect((testModel as any).removeCache.entity1_id_0x01).toBeDefined(); @@ -338,8 +338,8 @@ describe('cacheModel', () => { expect(result).toBeUndefined(); }); - it('clean flushable records when applyBlockRange, if found set and removed happened in the same height', () => { - testModel.set( + it('clean flushable records when applyBlockRange, if found set and removed happened in the same height', async () => { + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -347,9 +347,9 @@ describe('cacheModel', () => { }, 1 ); - testModel.remove('entity1_id_0x01', 1); + await testModel.remove('entity1_id_0x01', 1); - testModel.set( + await testModel.set( 'entity1_id_0x02', { id: 'entity1_id_0x02', @@ -370,8 +370,8 @@ describe('cacheModel', () => { expect(records[0].id).toBe('entity1_id_0x02'); }); - it('clean flushable records when applyBlockRange, pass if set and remove in the different height', () => { - testModel.set( + it('clean flushable records when applyBlockRange, pass if set and remove in the different height', async () => { + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -379,9 +379,9 @@ describe('cacheModel', () => { }, 1 ); - testModel.remove('entity1_id_0x01', 2); + await testModel.remove('entity1_id_0x01', 2); - testModel.set( + await testModel.set( 'entity1_id_0x02', { id: 'entity1_id_0x02', @@ -399,7 +399,7 @@ describe('cacheModel', () => { }); it('getFromCache could filter out removed data', async () => { - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -407,9 +407,9 @@ describe('cacheModel', () => { }, 1 ); - testModel.remove('entity1_id_0x01', 1); + await testModel.remove('entity1_id_0x01', 1); - testModel.set( + await testModel.set( 'entity1_id_0x02', { id: 'entity1_id_0x02', @@ -418,7 +418,7 @@ describe('cacheModel', () => { 2 ); const spyFindAll = jest.spyOn(testModel.model, 'findAll'); - const result = await testModel.getByField('field1', 1, {offset: 0, limit: 50}); + const result = await testModel.getByFields([['field1', '=', 1]], {offset: 0, limit: 50}); expect(spyFindAll).toHaveBeenCalledTimes(1); expect(result).toStrictEqual([ @@ -427,7 +427,7 @@ describe('cacheModel', () => { }); it('getFromCache with removed and set again data', async () => { - testModel.set( + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -435,8 +435,8 @@ describe('cacheModel', () => { }, 1 ); - testModel.remove('entity1_id_0x01', 1); - testModel.set( + await testModel.remove('entity1_id_0x01', 1); + await testModel.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -445,12 +445,12 @@ describe('cacheModel', () => { 1 ); const spyFindAll = jest.spyOn(testModel.model, 'findAll'); - const result = await testModel.getByField('field1', 1, {offset: 0, limit: 50}); + const result = await testModel.getByFields([['field1', '=', 1]], {offset: 0, limit: 50}); expect(spyFindAll).toHaveBeenCalledTimes(1); expect(result).toStrictEqual([{id: 'entity1_id_0x01', field1: 1}]); // Should not include any previous recorded value - const result3 = await testModel.getByField('field1', 3, {offset: 0, limit: 50}); + const result3 = await testModel.getByFields([['field1', '=', 3]], {offset: 0, limit: 50}); // Expect only mocked expect(result3).toStrictEqual([]); }); diff --git a/packages/node-core/src/indexer/storeCache/cacheModel.test.ts b/packages/node-core/src/indexer/storeModelProvider/model/cacheModel.test.ts similarity index 97% rename from packages/node-core/src/indexer/storeCache/cacheModel.test.ts rename to packages/node-core/src/indexer/storeModelProvider/model/cacheModel.test.ts index 52aead2e55..a7163ecfe5 100644 --- a/packages/node-core/src/indexer/storeCache/cacheModel.test.ts +++ b/packages/node-core/src/indexer/storeModelProvider/model/cacheModel.test.ts @@ -4,7 +4,7 @@ import {GraphQLModelsType} from '@subql/utils'; import {Sequelize, DataTypes, QueryTypes} from '@subql/x-sequelize'; import {cloneDeep, padStart} from 'lodash'; -import {DbOption, modelsTypeToModelAttributes, NodeConfig} from '../../'; +import {DbOption, modelsTypeToModelAttributes, NodeConfig} from '../../..'; import {CachedModel} from './cacheModel'; const option: DbOption = { @@ -74,7 +74,7 @@ describe('cacheMetadata integration', () => { // Pre-populate some data and flush it do the db let n = 0; while (n < 100) { - cacheModel.set( + await cacheModel.set( `entity1_id_0x${formatIdNumber(n)}`, { id: `entity1_id_0x${formatIdNumber(n)}`, @@ -90,7 +90,7 @@ describe('cacheMetadata integration', () => { // Updates to existing data let m = 20; while (m < 30) { - cacheModel.set( + await cacheModel.set( `entity1_id_0x${formatIdNumber(m)}`, { id: `entity1_id_0x${formatIdNumber(m)}`, @@ -104,7 +104,7 @@ describe('cacheMetadata integration', () => { // New data let o = 100; while (o < 130) { - cacheModel.set( + await cacheModel.set( `entity1_id_0x${formatIdNumber(o)}`, { id: `entity1_id_0x${formatIdNumber(o)}`, @@ -117,16 +117,17 @@ describe('cacheMetadata integration', () => { }); it('gets one item correctly', async () => { + const getOneBy = (field: 'id', value: string) => cacheModel.getByFields([[field, '=', value]], {limit: 1}); // Db value - const res0 = await cacheModel.getOneByField('id', 'entity1_id_0x001'); + const [res0] = await getOneBy('id', 'entity1_id_0x001'); expect(res0).toEqual({id: 'entity1_id_0x001', field1: 1}); // Cache value - const res1 = await cacheModel.getOneByField('id', 'entity1_id_0x020'); + const [res1] = await getOneBy('id', 'entity1_id_0x020'); expect(res1).toEqual({id: 'entity1_id_0x020', field1: 0}); // Cache value - const res2 = await cacheModel.getOneByField('id', 'entity1_id_0x021'); + const [res2] = await getOneBy('id', 'entity1_id_0x021'); expect(res2).toEqual({id: 'entity1_id_0x021', field1: 1}); }); @@ -454,7 +455,7 @@ describe('cacheModel integration', () => { // Update the value res1?.delegators.push({delegator: '0x03', amount: BigInt(9000000000000000000000n)}); - cacheModel.set(`0x01`, res1!, 2); + await cacheModel.set(`0x01`, res1!, 2); await flush(3); const res2 = await cacheModel.get('0x01'); console.log(JSON.stringify(res2)); @@ -504,7 +505,7 @@ describe('cacheModel integration', () => { amount: BigInt(6000000000000000000000n), nested: {testItem: 'test', amount: BigInt(6000000000000000000000n)}, }); - cacheModel.set(`0x01`, res1!, 4); + await cacheModel.set(`0x01`, res1!, 4); await flush(5); const [rows2] = await sequelize.query(`SELECT delegators FROM ${schema}."testModels" LIMIT 1;`, { type: QueryTypes.SELECT, @@ -544,7 +545,7 @@ describe('cacheModel integration', () => { ], randomNArray: undefined, }; - cacheModel.set(`0x02`, data0x02, 6); + await cacheModel.set(`0x02`, data0x02, 6); await flush(7); const res5 = ( await cacheModel.model.findOne({ diff --git a/packages/node-core/src/indexer/storeCache/cacheModel.ts b/packages/node-core/src/indexer/storeModelProvider/model/cacheModel.ts similarity index 86% rename from packages/node-core/src/indexer/storeCache/cacheModel.ts rename to packages/node-core/src/indexer/storeModelProvider/model/cacheModel.ts index 712c5eba92..cc1699fb85 100644 --- a/packages/node-core/src/indexer/storeCache/cacheModel.ts +++ b/packages/node-core/src/indexer/storeModelProvider/model/cacheModel.ts @@ -2,24 +2,16 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import {FieldOperators, FieldsExpression, GetOptions} from '@subql/types-core'; +import {FieldsExpression, GetOptions} from '@subql/types-core'; import {CreationAttributes, Model, ModelStatic, Op, Sequelize, Transaction} from '@subql/x-sequelize'; -import {Fn} from '@subql/x-sequelize/types/utils'; import {flatten, uniq, cloneDeep, orderBy, unionBy} from 'lodash'; -import {NodeConfig} from '../../configure'; -import {Cacheable} from './cacheable'; -import {CsvStoreService} from './csvStore.service'; -import {SetValueModel} from './setValueModel'; -import { - ICachedModelControl, - RemoveValue, - SetData, - ICachedModel, - GetData, - FilteredHeightRecords, - SetValue, - Exporter, -} from './types'; +import {NodeConfig} from '../../../configure'; +import {Cacheable} from '../cacheable'; +import {CsvStoreService} from '../csvStore.service'; +import {SetValueModel} from '../setValueModel'; +import {ICachedModelControl, RemoveValue, SetData, GetData, FilteredHeightRecords, SetValue, Exporter} from '../types'; +import {BaseEntity, IModel} from './model'; +import {getFullOptions, operatorsMap} from './utils'; const getCacheOptions = { max: 500, // default value @@ -27,28 +19,9 @@ const getCacheOptions = { updateAgeOnGet: true, // we want to keep most used record in cache longer }; -const operatorsMap: Record = { - '=': Op.eq, - '!=': Op.ne, - in: Op.in, - '!in': Op.notIn, -}; - -const defaultOptions: Required> = { - offset: 0, - limit: 100, - orderBy: 'id', - orderDirection: 'ASC', -}; - -export class CachedModel< - T extends {id: string; __block_range?: (number | null)[] | Fn} = { - id: string; - __block_range?: (number | null)[] | Fn; - } - > +export class CachedModel extends Cacheable - implements ICachedModel, ICachedModelControl + implements IModel, ICachedModelControl { // Null value indicates its not defined in the db private getCache: GetData; @@ -138,7 +111,7 @@ export class CachedModel< * There is also no way to flush data here, * flushing will only flush data before the current block so its still required to consider the setCache * */ - async getByFields(filters: FieldsExpression[], options: GetOptions = defaultOptions): Promise { + async getByFields(filters: FieldsExpression[], options?: GetOptions): Promise { filters.forEach(([field, operator]) => { assert( operatorsMap[operator], @@ -152,10 +125,7 @@ export class CachedModel< // If projects use inefficient store methods, thats on them. // Ensure we have all the options - const fullOptions: Required> = { - ...defaultOptions, - ...options, - }; + const fullOptions = getFullOptions(options); await this.mutex.waitForUnlock(); @@ -171,7 +141,7 @@ export class CachedModel< .map((value) => value.getLatest()?.data) .map((value) => cloneDeep(value)) as T[]; - const offsetCacheData = cacheData.slice(options.offset); + const offsetCacheData = cacheData.slice(fullOptions.offset); // Return early if cache covers all the data if (offsetCacheData.length > fullOptions.limit) { @@ -199,24 +169,8 @@ export class CachedModel< return combined; } - async getByField( - field: keyof T, - value: T[keyof T] | T[keyof T][], - options: GetOptions = defaultOptions - ): Promise { - return this.getByFields([Array.isArray(value) ? [field, 'in', value] : [field, '=', value]], options); - } - - async getOneByField(field: keyof T, value: T[keyof T]): Promise { - const [res] = await this.getByField(field, value, { - ...defaultOptions, - limit: 1, - }); - - return res; - } - - set(id: string, data: T, blockHeight: number): void { + // eslint-disable-next-line @typescript-eslint/require-await + async set(id: string, data: T, blockHeight: number): Promise { if (data === undefined || data === null) { throw new Error('Cannot set undefined or null data. If you wish to remove data, use remove()'); } @@ -230,26 +184,28 @@ export class CachedModel< if (this.removeCache[id] && this.removeCache[id].removedAtBlock === blockHeight) { delete this.removeCache[id]; } + this.flushableRecordCounter += 1; } - bulkCreate(data: T[], blockHeight: number): void { + async bulkCreate(data: T[], blockHeight: number): Promise { for (const entity of data) { - this.set(entity.id, entity, blockHeight); + await this.set(entity.id, entity, blockHeight); } } - bulkUpdate(data: T[], blockHeight: number, fields?: string[] | undefined): void { + async bulkUpdate(data: T[], blockHeight: number, fields?: string[]): Promise { //TODO, remove fields if (fields) { throw new Error(`Currently not supported: update by fields`); } for (const entity of data) { - this.set(entity.id, entity, blockHeight); + await this.set(entity.id, entity, blockHeight); } } - remove(id: string, blockHeight: number): void { + // eslint-disable-next-line @typescript-eslint/require-await + async remove(id: string, blockHeight: number): Promise { // we don't need to check whether id is already removed, // because it could be removed->create-> removed again, // the operationIndex should always be the latest operation @@ -268,8 +224,8 @@ export class CachedModel< } } - bulkRemove(ids: string[], blockHeight: number): void { - ids.map((id) => this.remove(id, blockHeight)); + async bulkRemove(ids: string[], blockHeight: number): Promise { + await Promise.all(ids.map((id) => this.remove(id, blockHeight))); } get isFlushable(): boolean { @@ -354,13 +310,16 @@ export class CachedModel< } private filterRemoveRecordByHeight(blockHeight: number, lessEqt: boolean): Record { - return Object.entries(this.removeCache).reduce((acc, [key, value]) => { - if (lessEqt ? value.removedAtBlock <= blockHeight : value.removedAtBlock > blockHeight) { - acc[key] = value; - } + return Object.entries(this.removeCache).reduce( + (acc, [key, value]) => { + if (lessEqt ? value.removedAtBlock <= blockHeight : value.removedAtBlock > blockHeight) { + acc[key] = value; + } - return acc; - }, {} as Record); + return acc; + }, + {} as Record + ); } private filterRecordsWithHeight(blockHeight: number): FilteredHeightRecords { diff --git a/packages/node-core/src/indexer/storeModelProvider/model/index.ts b/packages/node-core/src/indexer/storeModelProvider/model/index.ts new file mode 100644 index 0000000000..c32b7b677c --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/model/index.ts @@ -0,0 +1,5 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export * from './cacheModel'; +export {IModel, BaseEntity} from './model'; diff --git a/packages/node-core/src/indexer/storeModelProvider/model/model.test.ts b/packages/node-core/src/indexer/storeModelProvider/model/model.test.ts new file mode 100644 index 0000000000..7932162a18 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/model/model.test.ts @@ -0,0 +1,120 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {DbOption} from '@subql/node-core/db'; +import {DataTypes, Sequelize} from '@subql/x-sequelize'; +import _ from 'lodash'; +import {NodeConfig} from '../../../configure'; +import {CachedModel} from './cacheModel'; +import {PlainModel} from './model'; + +const option: DbOption = { + host: process.env.DB_HOST ?? '127.0.0.1', + port: process.env.DB_PORT ? Number(process.env.DB_PORT) : 5432, + username: process.env.DB_USER ?? 'postgres', + password: process.env.DB_PASS ?? 'postgres', + database: process.env.DB_DATABASE ?? 'postgres', + timezone: 'utc', +}; + +jest.setTimeout(50_000); + +describe('Model provider consistency test', () => { + let sequelize: Sequelize; + let schema: string; + let model: any; + + beforeAll(async () => { + sequelize = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + await sequelize.authenticate(); + + schema = '"model-test-1"'; + await sequelize.createSchema(schema, {}); + + const modelFactory = sequelize.define( + 'testModel', + { + id: { + type: DataTypes.STRING, + primaryKey: true, + }, + field1: DataTypes.INTEGER, + }, + {timestamps: false, schema: schema} + ); + model = await modelFactory.sync(); + }); + + afterAll(async () => { + await sequelize.dropSchema(schema, {logging: false}); + await sequelize.close(); + }); + + describe('disable historical', () => { + let plainModel: PlainModel<{id: string; field1: number}>; + let cacheModel: CachedModel<{id: string; field1: number}>; + let i = 0; + beforeAll(() => { + plainModel = new PlainModel(model, false); + cacheModel = new CachedModel(model, false, new NodeConfig({} as any), () => i++); + }); + + it('insert data', async () => { + const id = '1'; + const data = {id, field1: 1}; + + await plainModel.set(id, data, 1); + await cacheModel.set(id, data, 1); + + const result = await plainModel.get(id, undefined as any); + const cacheResult = await cacheModel.get(id); + + expect(result).toEqual(data); + expect(cacheResult).toEqual(result); + }); + + it('select data', async () => { + const id2 = '2'; + const data2 = {id: id2, field1: 2}; + await plainModel.set(id2, data2, 1); + await cacheModel.set(id2, data2, 1); + const result2 = await plainModel.getByFields([['id', '=', id2]], {limit: 1}, undefined as any); + const cacheResult2 = await cacheModel.getByFields([['id', '=', id2]], {limit: 1}); + expect(result2.length).toEqual(1); + expect(cacheResult2).toEqual(result2); + + const result3 = await plainModel.getByFields([], {limit: 2}, undefined as any); + const cacheResult3 = await cacheModel.getByFields([], {limit: 2}); + expect(result3.length).toEqual(2); + expect(cacheResult3).toEqual(result3); + }); + + it('update data', async () => { + const datas = [ + {id: '1', field1: 1}, + {id: '2', field1: 2}, + {id: '3', field1: 3}, + ]; + await plainModel.bulkUpdate(datas, 1); + await cacheModel.bulkUpdate(datas, 1); + + const result3 = await plainModel.getByFields([], {limit: 10}, undefined as any); + const cacheResult3 = await cacheModel.getByFields([], {limit: 10}); + expect(result3.length).toEqual(3); + expect(cacheResult3).toEqual(result3); + }); + + it('delete data', async () => { + await plainModel.bulkRemove(['1'], 1); + await cacheModel.bulkRemove(['1'], 1); + + const result3 = await plainModel.getByFields([], {limit: 10}, undefined as any); + const cacheResult3 = await cacheModel.getByFields([], {limit: 10}); + expect(result3.length).toEqual(2); + expect(cacheResult3).toEqual(result3); + }); + }); +}); diff --git a/packages/node-core/src/indexer/storeModelProvider/model/model.ts b/packages/node-core/src/indexer/storeModelProvider/model/model.ts new file mode 100644 index 0000000000..015cdc5e62 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/model/model.ts @@ -0,0 +1,159 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {FieldsExpression, GetOptions} from '@subql/types-core'; +import {Op, Model, ModelStatic, Transaction, CreationAttributes, Sequelize} from '@subql/x-sequelize'; +import {Fn} from '@subql/x-sequelize/types/utils'; +import _ from 'lodash'; +import {CsvStoreService} from '../csvStore.service'; +import {Exporter} from '../types'; +import {getFullOptions, operatorsMap} from './utils'; + +export type BaseEntity = {id: string; __block_range?: (number | null)[] | Fn}; + +export interface IModel { + get(id: string, tx?: Transaction): Promise; + + getByFields(filters: FieldsExpression[], options: GetOptions, tx?: Transaction): Promise; + + set: (id: string, data: T, blockHeight: number, tx?: Transaction) => Promise; + bulkCreate(data: T[], blockHeight: number, tx?: Transaction): Promise; + bulkUpdate(data: T[], blockHeight: number, fields?: string[], tx?: Transaction): Promise; + + bulkRemove(ids: string[], blockHeight: number, tx?: Transaction): Promise; +} + +// All operations must be carried out within a transaction. +export class PlainModel implements IModel { + private exporters: Exporter[] = []; + + constructor( + readonly model: ModelStatic>, + private readonly historical = true + ) {} + + async get(id: string, tx?: Transaction): Promise { + const record = await this.model.findOne({ + // https://github.com/sequelize/sequelize/issues/15179 + where: {id} as any, + transaction: tx, + }); + + return record?.toJSON(); + } + + async getByFields(filters: FieldsExpression[], options: GetOptions, tx?: Transaction): Promise { + const fullOptions = getFullOptions(options); + // Query DB with all params + const records = await this.model.findAll({ + where: { + [Op.and]: [...filters.map(([field, operator, value]) => ({[field]: {[operatorsMap[operator]]: value}}))] as any, // Types not working properly + }, + limit: fullOptions.limit, + offset: fullOptions.offset, + order: [[fullOptions.orderBy as string, fullOptions.orderDirection]], + transaction: tx, + }); + + return records.map((r) => r.toJSON()); + } + + async set(id: string, data: T, blockHeight: number, tx?: Transaction): Promise { + if (id !== data.id) { + throw new Error(`Id doesnt match with data`); + } + + await this.bulkUpdate([data], blockHeight, undefined, tx); + } + + async bulkCreate(data: T[], blockHeight: number, tx?: Transaction): Promise { + await this.bulkUpdate(data, blockHeight, undefined, tx); + } + + async bulkUpdate(data: T[], blockHeight: number, fields?: string[], tx?: Transaction): Promise { + if (!data.length) return; + + // TODO, understand why this happens, its also on the store cache + if (fields) { + throw new Error(`Currently not supported: update by fields`); + } + + if (this.historical) { + // To prevent the scenario of repeated created-deleted-created, which may result in multiple entries. + await this.model.destroy({ + where: { + id: data.map((v) => v.id), + [Op.and]: this.sequelize.where( + this.sequelize.fn('lower', this.sequelize.col('_block_range')), + Op.eq, + blockHeight + ), + } as any, + transaction: tx, + }); + await this.markAsDeleted( + data.map((v) => v.id), + blockHeight, + tx + ); + } + + await this.model.bulkCreate(data as CreationAttributes>[], { + transaction: tx, + updateOnDuplicate: Object.keys(data[0]) as unknown as (keyof T)[], + }); + + if (tx) { + this.exporters.forEach((store: Exporter) => { + tx.afterCommit(async () => { + await store.export(data); + }); + }); + } else { + Promise.all(this.exporters.map(async (store: Exporter) => store.export(data))); + } + } + + async bulkRemove(ids: string[], blockHeight: number, tx?: Transaction): Promise { + if (!ids.length) return; + if (!this.historical) { + await this.model.destroy({where: {id: ids} as any, transaction: tx}); + } else { + await this.markAsDeleted(ids, blockHeight, tx); + } + } + + private get sequelize(): Sequelize { + const sequelize = this.model.sequelize; + + if (!sequelize) { + throw new Error(`Sequelize is not available on ${this.model.name}`); + } + + return sequelize; + } + + private async markAsDeleted(ids: string[], blockHeight: number, tx?: Transaction): Promise { + await this.model.update( + { + __block_range: this.sequelize.fn( + 'int8range', + this.sequelize.fn('lower', this.sequelize.col('_block_range')), + blockHeight + ), + } as any, + { + hooks: false, + where: { + id: ids, + __block_range: {[Op.contains]: blockHeight}, + } as any, + transaction: tx, + } + ); + } + + addExporterStore(exporter: CsvStoreService): void { + this.exporters.push(exporter); + } +} diff --git a/packages/node-core/src/indexer/storeModelProvider/model/utils.ts b/packages/node-core/src/indexer/storeModelProvider/model/utils.ts new file mode 100644 index 0000000000..5c1aa9c564 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/model/utils.ts @@ -0,0 +1,26 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {FieldOperators, GetOptions} from '@subql/types-core'; +import {Op} from '@subql/x-sequelize'; + +export const operatorsMap: Record = { + '=': Op.eq, + '!=': Op.ne, + in: Op.in, + '!in': Op.notIn, +}; + +const defaultOptions: Required> = { + offset: 0, + limit: 100, + orderBy: 'id', + orderDirection: 'ASC', +}; + +// Ensure we have all the options +export const getFullOptions = (options?: GetOptions): Required> => + ({ + ...(defaultOptions as GetOptions), + ...options, + }) as Required>; diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts b/packages/node-core/src/indexer/storeModelProvider/poi/cachePoi.spec.ts similarity index 99% rename from packages/node-core/src/indexer/storeCache/cachePoi.spec.ts rename to packages/node-core/src/indexer/storeModelProvider/poi/cachePoi.spec.ts index 9c135c9dbb..f31ad5e3a7 100644 --- a/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts +++ b/packages/node-core/src/indexer/storeModelProvider/poi/cachePoi.spec.ts @@ -3,7 +3,7 @@ import {delay} from '@subql/common'; import {Op, Sequelize} from '@subql/x-sequelize'; -import {PoiRepo} from '../entities'; +import {PoiRepo} from '../../entities'; import {CachePoiModel} from './cachePoi'; const mockPoiRepo = (): PoiRepo => { diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.ts b/packages/node-core/src/indexer/storeModelProvider/poi/cachePoi.ts similarity index 86% rename from packages/node-core/src/indexer/storeCache/cachePoi.ts rename to packages/node-core/src/indexer/storeModelProvider/poi/cachePoi.ts index c839cb4f89..c098563c48 100644 --- a/packages/node-core/src/indexer/storeCache/cachePoi.ts +++ b/packages/node-core/src/indexer/storeModelProvider/poi/cachePoi.ts @@ -4,14 +4,14 @@ import {DEFAULT_FETCH_RANGE} from '@subql/common'; import {u8aToBuffer} from '@subql/utils'; import {Transaction} from '@subql/x-sequelize'; -import {getLogger} from '../../logger'; -import {PoiRepo, ProofOfIndex} from '../entities'; -import {PlainPoiModel, PoiInterface} from '../poi/poiModel'; -import {Cacheable} from './cacheable'; -import {ICachedModelControl} from './types'; +import {getLogger} from '../../../logger'; +import {PoiRepo, ProofOfIndex} from '../../entities'; +import {Cacheable} from '../cacheable'; +import {ICachedModelControl} from '../types'; +import {IPoi, PlainPoiModel} from './poi'; const logger = getLogger('PoiCache'); -export class CachePoiModel extends Cacheable implements ICachedModelControl, PoiInterface { +export class CachePoiModel extends Cacheable implements IPoi, ICachedModelControl { private setCache: Record = {}; flushableRecordCounter = 0; plainPoiModel: PlainPoiModel; @@ -21,7 +21,8 @@ export class CachePoiModel extends Cacheable implements ICachedModelControl, Poi this.plainPoiModel = new PlainPoiModel(model); } - bulkUpsert(proofs: ProofOfIndex[]): void { + // eslint-disable-next-line @typescript-eslint/require-await + async bulkUpsert(proofs: ProofOfIndex[]): Promise { for (const proof of proofs) { if (proof.chainBlockHash !== null) { proof.chainBlockHash = u8aToBuffer(proof.chainBlockHash); diff --git a/packages/node-core/src/indexer/storeModelProvider/poi/index.ts b/packages/node-core/src/indexer/storeModelProvider/poi/index.ts new file mode 100644 index 0000000000..8c876af2bd --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/poi/index.ts @@ -0,0 +1,5 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export * from './cachePoi'; +export * from './poi'; diff --git a/packages/node-core/src/indexer/poi/poiModel.ts b/packages/node-core/src/indexer/storeModelProvider/poi/poi.ts similarity index 90% rename from packages/node-core/src/indexer/poi/poiModel.ts rename to packages/node-core/src/indexer/storeModelProvider/poi/poi.ts index d31f998510..5be2d37662 100644 --- a/packages/node-core/src/indexer/poi/poiModel.ts +++ b/packages/node-core/src/indexer/storeModelProvider/poi/poi.ts @@ -4,12 +4,14 @@ import {DEFAULT_FETCH_RANGE} from '@subql/common'; import {u8aToBuffer} from '@subql/utils'; import {Op, Transaction} from '@subql/x-sequelize'; -import {BlockRangeDtoInterface} from '../../admin'; -import {PoiRepo, ProofOfIndex} from '../entities'; +import {BlockRangeDtoInterface} from '../../../admin'; +import {PoiRepo, ProofOfIndex} from '../../entities'; -export interface PoiInterface { - model: PoiRepo; - bulkUpsert(proofs: ProofOfIndex[], tx?: Transaction): Promise | void; +export const POI_ENTITY_NAME = '_poi'; + +export interface IPoi { + model: PoiRepo; // TODO remove + bulkUpsert(proofs: ProofOfIndex[], tx?: Transaction): Promise; /** * Gets the 100 blocks <= to the start height where there is an operation. * This can be used to determine the last blocks that had data to index. @@ -19,14 +21,14 @@ export interface PoiInterface { // When using cockroach db, poi id is store in bigint format, and sequelize toJSON() can not convert id correctly (to string) // This will ensure after toJSON Poi id converted to number -export function ensureProofOfIndexId(poi: ProofOfIndex): ProofOfIndex { +function ensureProofOfIndexId(poi: ProofOfIndex): ProofOfIndex { if (typeof poi?.id === 'string') { poi.id = Number(poi.id); } return poi; } -export class PlainPoiModel implements PoiInterface { +export class PlainPoiModel implements IPoi { constructor(readonly model: PoiRepo) {} async getFirst(): Promise { diff --git a/packages/node-core/src/indexer/storeCache/setValueModel.spec.ts b/packages/node-core/src/indexer/storeModelProvider/setValueModel.spec.ts similarity index 100% rename from packages/node-core/src/indexer/storeCache/setValueModel.spec.ts rename to packages/node-core/src/indexer/storeModelProvider/setValueModel.spec.ts diff --git a/packages/node-core/src/indexer/storeCache/setValueModel.ts b/packages/node-core/src/indexer/storeModelProvider/setValueModel.ts similarity index 100% rename from packages/node-core/src/indexer/storeCache/setValueModel.ts rename to packages/node-core/src/indexer/storeModelProvider/setValueModel.ts diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts similarity index 68% rename from packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts rename to packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts index 8b0cba5f7c..87efc16a10 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts +++ b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts @@ -6,10 +6,13 @@ import {SchedulerRegistry} from '@nestjs/schedule'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; import {delay} from '../../utils'; +import {BaseEntity} from './model'; import {StoreCacheService} from './storeCache.service'; const eventEmitter = new EventEmitter2(); +type TestEntity = BaseEntity & {field1: string}; + jest.mock('@subql/x-sequelize', () => { const mSequelize = { authenticate: jest.fn(), @@ -56,26 +59,26 @@ jest.mock('@subql/x-sequelize', () => { jest.setTimeout(200000); describe('Store Cache Service historical', () => { - let storeService: StoreCacheService; + let storeCacheService: StoreCacheService; const sequelize = new Sequelize(); const nodeConfig: NodeConfig = {} as any; beforeEach(() => { - storeService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); }); it('could init store cache service and init cache for models', () => { - storeService.getModel('entity1'); - expect((storeService as any).cachedModels.entity1).toBeDefined(); - expect((storeService as any).cachedModels.entity2).toBeUndefined(); + storeCacheService.getModel('entity1'); + expect((storeCacheService as any).cachedModels.entity1).toBeDefined(); + expect((storeCacheService as any).cachedModels.entity2).toBeUndefined(); }); it('could set cache for multiple entities, also get from it', async () => { - const entity1Model = storeService.getModel('entity1'); - const entity2Model = storeService.getModel('entity2'); + const entity1Model = storeCacheService.getModel('entity1'); + const entity2Model = storeCacheService.getModel('entity2'); - entity1Model.set( + await entity1Model.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -83,7 +86,7 @@ describe('Store Cache Service historical', () => { }, 1 ); - entity2Model.set( + await entity2Model.set( 'entity2_id_0x02', { id: 'entity2_id_0x02', @@ -93,7 +96,7 @@ describe('Store Cache Service historical', () => { ); // check from cache - expect((storeService as any).cachedModels.entity1.setCache.entity1_id_0x01).toBeDefined(); + expect((storeCacheService as any).cachedModels.entity1.setCache.entity1_id_0x01).toBeDefined(); const entity1Block1 = (await entity1Model.get('entity1_id_0x01')) as any; const entity2Block2 = (await entity2Model.get('entity2_id_0x02')) as any; expect(entity1Block1.field1).toBe('set at block 1'); @@ -102,9 +105,9 @@ describe('Store Cache Service historical', () => { // TODO move this test to cacheModel it('set at different block height, will create historical records', async () => { - const appleModel = storeService.getModel('apple'); + const appleModel = storeCacheService.getModel('apple'); - appleModel.set( + await appleModel.set( 'apple-01', { id: 'apple-01', @@ -113,10 +116,10 @@ describe('Store Cache Service historical', () => { 1 ); - const appleEntity_b1 = (await appleModel.get('apple-01')) as any; - expect(appleEntity_b1.field1).toBe('set apple at block 1'); + const appleEntity_b1 = await appleModel.get('apple-01'); + expect(appleEntity_b1!.field1).toBe('set apple at block 1'); // Add new record, should create historical records for same id entity - appleModel.set( + await appleModel.set( 'apple-01', { id: 'apple-01', @@ -128,10 +131,10 @@ describe('Store Cache Service historical', () => { expect(appleEntity_b5.field1).toBe('updated apple at block 5'); // Been pushed two records, the latest index should point to 1 - expect((storeService as any).cachedModels.apple.setCache['apple-01']._latestIndex).toBe(1); + expect((storeCacheService as any).cachedModels.apple.setCache['apple-01']._latestIndex).toBe(1); // Historical values - const historicalValue = (storeService as any).cachedModels.apple.setCache['apple-01'].historicalValues; + const historicalValue = (storeCacheService as any).cachedModels.apple.setCache['apple-01'].historicalValues; // should close the range index 0 historical record expect(historicalValue[0].startHeight).toBe(1); expect(historicalValue[0].endHeight).toBe(5); @@ -142,21 +145,21 @@ describe('Store Cache Service historical', () => { }); describe('Store Cache flush with order', () => { - let storeService: StoreCacheService; + let storeCacheService: StoreCacheService; const sequelize = new Sequelize(); const nodeConfig: NodeConfig = {} as any; beforeEach(() => { - storeService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); - storeService.init(false, true, {} as any, undefined); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService.init(false, true, {} as any, undefined); }); - it('when set/remove multiple model entities, operation index should added to record in sequential order', () => { - const entity1Model = storeService.getModel('entity1'); - const entity2Model = storeService.getModel('entity2'); + it('when set/remove multiple model entities, operation index should added to record in sequential order', async () => { + const entity1Model = storeCacheService.getModel('entity1'); + const entity2Model = storeCacheService.getModel('entity2'); - entity1Model.set( + await entity1Model.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -164,7 +167,7 @@ describe('Store Cache flush with order', () => { }, 1 ); - entity2Model.set( + await entity2Model.set( 'entity2_id_0x02', { id: 'entity2_id_0x02', @@ -172,28 +175,28 @@ describe('Store Cache flush with order', () => { }, 2 ); - entity1Model.remove('entity1_id_0x01', 3); - const entity1 = (storeService as any).cachedModels.entity1; + await entity1Model.bulkRemove(['entity1_id_0x01'], 3); + const entity1 = (storeCacheService as any).cachedModels.entity1; expect(entity1.removeCache.entity1_id_0x01.operationIndex).toBe(3); }); }); describe('Store Cache flush with non-historical', () => { - let storeService: StoreCacheService; + let storeCacheService: StoreCacheService; const sequelize = new Sequelize(); const nodeConfig: NodeConfig = {disableHistorical: true} as any; beforeEach(() => { - storeService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); - storeService.init(false, false, {} as any, undefined); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService.init(false, false, {} as any, undefined); }); it('Same Id with multiple operations, when flush it should always pick up the latest operation', async () => { - const entity1Model = storeService.getModel('entity1'); + const entity1Model = storeCacheService.getModel('entity1'); //create Id 1 - entity1Model.set( + await entity1Model.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -202,11 +205,11 @@ describe('Store Cache flush with non-historical', () => { 1 ); // remove Id 1 and 2 - entity1Model.remove('entity1_id_0x02', 2); - entity1Model.remove('entity1_id_0x01', 3); + await entity1Model.bulkRemove(['entity1_id_0x02'], 2); + await entity1Model.bulkRemove(['entity1_id_0x01'], 3); // recreate id 1 again - entity1Model.set( + await entity1Model.set( 'entity1_id_0x01', { id: 'entity1_id_0x01', @@ -234,7 +237,7 @@ describe('Store Cache flush with non-historical', () => { }); describe('Store cache upper threshold', () => { - let storeService: StoreCacheService; + let storeCacheService: StoreCacheService; const sequelize = new Sequelize(); const nodeConfig = { @@ -243,15 +246,15 @@ describe('Store cache upper threshold', () => { } as NodeConfig; beforeEach(() => { - storeService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); - storeService.init(false, false, {findByPk: () => Promise.resolve({toJSON: () => 1})} as any, undefined); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService.init(false, false, {findByPk: () => Promise.resolve({toJSON: () => 1})} as any, undefined); }); it('doesnt wait for flushing cache when threshold not met', async () => { - const entity1Model = storeService.getModel('entity1'); + const entity1Model = storeCacheService.getModel('entity1'); for (let i = 0; i < 5; i++) { - entity1Model.set( + await entity1Model.set( `entity1_id_0x0${i}`, { id: `entity1_id_0x0${i}`, @@ -262,7 +265,7 @@ describe('Store cache upper threshold', () => { } const start = new Date().getTime(); - await storeService.flushAndWaitForCapacity(true); + await storeCacheService.flushAndWaitForCapacity(true); const end = new Date().getTime(); // Should be less than 1s, we're not waiting @@ -270,10 +273,10 @@ describe('Store cache upper threshold', () => { }); it('waits for flushing when threshold is met', async () => { - const entity1Model = storeService.getModel('entity1'); + const entity1Model = storeCacheService.getModel('entity1'); for (let i = 0; i < 15; i++) { - entity1Model.set( + await entity1Model.set( `entity1_id_0x0${i}`, { id: `entity1_id_0x0${i}`, @@ -284,7 +287,7 @@ describe('Store cache upper threshold', () => { } const start = new Date().getTime(); - await storeService.flushAndWaitForCapacity(true); + await storeCacheService.flushAndWaitForCapacity(true); const end = new Date().getTime(); // Should be more than 1s, we set the db tx.commit to take 1s diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.ts b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts similarity index 71% rename from packages/node-core/src/indexer/storeCache/storeCache.service.ts rename to packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts index feefb2c0c7..8b65628ba3 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.ts +++ b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts @@ -14,26 +14,21 @@ import {exitWithError} from '../../process'; import {profiler} from '../../profiler'; import {MetadataRepo, PoiRepo} from '../entities'; import {BaseCacheService} from './baseCache.service'; -import {CacheMetadataModel} from './cacheMetadata'; -import {CachedModel} from './cacheModel'; -import {CachePoiModel} from './cachePoi'; import {CsvStoreService} from './csvStore.service'; -import {Exporter, ICachedModel, ICachedModelControl} from './types'; +import {CacheMetadataModel} from './metadata'; +import {METADATA_ENTITY_NAME} from './metadata/utils'; +import {CachedModel} from './model'; +import {CachePoiModel, POI_ENTITY_NAME} from './poi'; +import {ICachedModelControl, IStoreModelProvider} from './types'; const logger = getLogger('StoreCacheService'); @Injectable() -export class StoreCacheService extends BaseCacheService { - private cachedModels: Record = {}; - private metadataRepo?: MetadataRepo; - private poiRepo?: PoiRepo; +export class StoreCacheService extends BaseCacheService implements IStoreModelProvider { private readonly storeCacheThreshold: number; private readonly cacheUpperLimit: number; - private _historical = true; - private _useCockroachDb?: boolean; private _storeOperationIndex = 0; private _lastFlushedOperationIndex = 0; - private exports: Exporter[] = []; constructor( private sequelize: Sequelize, @@ -51,16 +46,13 @@ export class StoreCacheService extends BaseCacheService { } init(historical: boolean, useCockroachDb: boolean, meta: MetadataRepo, poi?: PoiRepo): void { - this._useCockroachDb = useCockroachDb; - this._historical = historical; - this.metadataRepo = meta; - this.poiRepo = poi; + super.init(historical, useCockroachDb, meta, poi); if (this.config.storeFlushInterval > 0) { this.schedulerRegistry.addInterval( 'storeFlushInterval', setInterval(() => { - this.flushCache(true).catch((e) => logger.warn(`storeFlushInterval failed ${e.message}`)); + this.flushData(true).catch((e) => logger.warn(`storeFlushInterval failed ${e.message}`)); }, this.config.storeFlushInterval * 1000) ); } @@ -80,25 +72,12 @@ export class StoreCacheService extends BaseCacheService { return this._storeOperationIndex; } - getModel(entity: string): ICachedModel { - if (entity === '_metadata') { - throw new Error('Please use getMetadataModel instead'); - } - if (entity === '_poi') { - throw new Error('Please use getPoiModel instead'); - } - if (!this.cachedModels[entity]) { - this.cachedModels[entity] = this.createModel(entity); - } - return this.cachedModels[entity] as unknown as ICachedModel; - } - - private createModel(entityName: string): CachedModel { + protected createModel(entityName: string): CachedModel { const model = this.sequelize.model(entityName); assert(model, `model ${entityName} not exists`); const cachedModel = new CachedModel( model, - this._historical, + this.historical, this.config, this.getNextStoreOperationIndex.bind(this) ); @@ -110,44 +89,31 @@ export class StoreCacheService extends BaseCacheService { return cachedModel; } - addExporter(cachedModel: CachedModel, exporterStore: CsvStoreService): void { + private addExporter(cachedModel: CachedModel, exporterStore: CsvStoreService): void { cachedModel.addExporterStore(exporterStore); this.exports.push(exporterStore); } - async flushExportStores(): Promise { - await Promise.all(this.exports.map((f) => f.shutdown())); - } - - updateModels({modifiedModels, removedModels}: {modifiedModels: ModelStatic[]; removedModels: string[]}): void { - modifiedModels.forEach((m) => { - this.cachedModels[m.name] = this.createModel(m.name); - }); - removedModels.forEach((r) => delete this.cachedModels[r]); - } - get metadata(): CacheMetadataModel { - const entity = '_metadata'; - if (!this.cachedModels[entity]) { + if (!this.cachedModels[METADATA_ENTITY_NAME]) { if (!this.metadataRepo) { throw new Error('Metadata entity has not been set on store cache'); } - this.cachedModels[entity] = new CacheMetadataModel(this.metadataRepo); + this.cachedModels[METADATA_ENTITY_NAME] = new CacheMetadataModel(this.metadataRepo); } - return this.cachedModels[entity] as unknown as CacheMetadataModel; + return this.cachedModels[METADATA_ENTITY_NAME] as unknown as CacheMetadataModel; } get poi(): CachePoiModel | null { - const entity = '_poi'; - if (!this.cachedModels[entity]) { + if (!this.cachedModels[POI_ENTITY_NAME]) { if (!this.poiRepo) { return null; // throw new Error('Poi entity has not been set on store cache'); } - this.cachedModels[entity] = new CachePoiModel(this.poiRepo); + this.cachedModels[POI_ENTITY_NAME] = new CachePoiModel(this.poiRepo); } - return this.cachedModels[entity] as unknown as CachePoiModel; + return this.cachedModels[POI_ENTITY_NAME] as unknown as CachePoiModel; } private async flushRelationalModelsInOrder(updatableModels: ICachedModelControl[], tx: Transaction): Promise { @@ -168,14 +134,14 @@ export class StoreCacheService extends BaseCacheService { this.logger.debug('Flushing cache'); // With historical disabled we defer the constraints check so that it doesn't matter what order entities are modified const tx = await this.sequelize.transaction({ - deferrable: this._historical || this._useCockroachDb ? undefined : Deferrable.SET_DEFERRED(), + deferrable: this.historical || this.useCockroachDb ? undefined : Deferrable.SET_DEFERRED(), }); try { // Get the block height of all data we want to flush up to const blockHeight = await this.metadata.find('lastProcessedHeight'); // Get models that have data to flush const updatableModels = Object.values(this.cachedModels).filter((m) => m.isFlushable); - if (this._useCockroachDb) { + if (this.useCockroachDb) { // 1. Independent(no associations) models can flush simultaneously await Promise.all( updatableModels.filter((m) => !m.hasAssociations).map((model) => model.flush(tx, blockHeight)) @@ -205,7 +171,7 @@ export class StoreCacheService extends BaseCacheService { async flushAndWaitForCapacity(forceFlush?: boolean): Promise { const flushableRecords = this.flushableRecords; - const pendingFlush = this.flushCache(forceFlush); + const pendingFlush = this.flushData(forceFlush); if (flushableRecords >= this.cacheUpperLimit) { await pendingFlush; @@ -224,4 +190,22 @@ export class StoreCacheService extends BaseCacheService { }); return numOfRecords >= this.storeCacheThreshold; } + + async applyPendingChanges(height: number, dataSourcesCompleted: boolean): Promise { + if (this.config.storeCacheAsync) { + // Flush all completed block data and don't wait + await this.flushAndWaitForCapacity(false)?.catch((e) => { + exitWithError(new Error(`Flushing cache failed`, {cause: e}), logger); + }); + } else { + // Flush all data from cache and wait + await this.flushData(false); + } + + if (dataSourcesCompleted) { + const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`; + await this.flushData(false); + exitWithError(msg, logger, 0); + } + } } diff --git a/packages/node-core/src/indexer/storeModelProvider/storeModel.service.ts b/packages/node-core/src/indexer/storeModelProvider/storeModel.service.ts new file mode 100644 index 0000000000..0d20af80fe --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/storeModel.service.ts @@ -0,0 +1,81 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {Injectable} from '@nestjs/common'; +import {Sequelize, Transaction} from '@subql/x-sequelize'; +import {NodeConfig} from '../../configure'; +import {getLogger} from '../../logger'; +import {exitWithError} from '../../process'; +import {BaseStoreModelService} from './baseStoreModel.service'; +import {CsvStoreService} from './csvStore.service'; +import {MetadataModel} from './metadata/metadata'; +import {METADATA_ENTITY_NAME} from './metadata/utils'; +import {IModel} from './model'; +import {PlainModel} from './model/model'; +import {PlainPoiModel, POI_ENTITY_NAME} from './poi'; +import {IStoreModelProvider} from './types'; + +const logger = getLogger('PlainStoreModelService'); + +@Injectable() +export class PlainStoreModelService extends BaseStoreModelService implements IStoreModelProvider { + constructor( + private sequelize: Sequelize, + private config: NodeConfig + ) { + super(); + } + + get metadata(): MetadataModel { + if (!this.cachedModels[METADATA_ENTITY_NAME]) { + if (!this.metadataRepo) { + throw new Error('Metadata entity has not been set'); + } + this.cachedModels[METADATA_ENTITY_NAME] = new MetadataModel(this.metadataRepo) as any; + } + + return this.cachedModels[METADATA_ENTITY_NAME] as unknown as MetadataModel; + } + + get poi(): PlainPoiModel | null { + if (!this.cachedModels[POI_ENTITY_NAME]) { + if (!this.poiRepo) { + return null; + // throw new Error('Poi entity has not been set on store cache'); + } + this.cachedModels[POI_ENTITY_NAME] = new PlainPoiModel(this.poiRepo) as any; + } + + return this.cachedModels[POI_ENTITY_NAME] as unknown as PlainPoiModel; + } + + protected createModel(entityName: string): IModel { + const model = this.sequelize.model(entityName); + + const plainModel = new PlainModel(model, this.historical); + + if (this.config.csvOutDir) { + const exporterStore = new CsvStoreService(entityName, this.config.csvOutDir); + this.addExporter(plainModel, exporterStore); + } + + return plainModel; + } + + private addExporter(model: PlainModel, exporterStore: CsvStoreService): void { + model.addExporterStore(exporterStore); + this.exports.push(exporterStore); + } + + async applyPendingChanges(height: number, dataSourcesCompleted: boolean, tx: Transaction): Promise { + if (!tx) { + exitWithError(new Error('Transaction not found'), logger, 1); + } + await tx.commit(); + + if (dataSourcesCompleted) { + const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`; + exitWithError(msg, logger, 0); + } + } +} diff --git a/packages/node-core/src/indexer/storeCache/types.ts b/packages/node-core/src/indexer/storeModelProvider/types.ts similarity index 70% rename from packages/node-core/src/indexer/storeCache/types.ts rename to packages/node-core/src/indexer/storeModelProvider/types.ts index 360e0aed73..22dcb2232f 100644 --- a/packages/node-core/src/indexer/storeCache/types.ts +++ b/packages/node-core/src/indexer/storeModelProvider/types.ts @@ -1,25 +1,31 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import {FieldsExpression, GetOptions} from '@subql/types-core'; -import {Transaction} from '@subql/x-sequelize'; +import {ENUM, ModelStatic, Transaction} from '@subql/x-sequelize'; import {LRUCache} from 'lru-cache'; +import {MetadataRepo, PoiRepo} from '../entities'; +import {IMetadata} from './metadata'; +import {BaseEntity, IModel} from './model'; +import {IPoi} from './poi'; import {SetValueModel} from './setValueModel'; export type HistoricalModel = {__block_range: any}; -export interface ICachedModel { - get: (id: string) => Promise; - // limit always defined from store - getByField: (field: keyof T, value: T[keyof T] | T[keyof T][], options?: GetOptions) => Promise; - getByFields: (filter: FieldsExpression[], options?: GetOptions) => Promise; - getOneByField: (field: keyof T, value: T[keyof T]) => Promise; - set: (id: string, data: T, blockHeight: number) => void; - bulkCreate: (data: T[], blockHeight: number) => void; - bulkUpdate: (data: T[], blockHeight: number, fields?: string[]) => void; - remove: (id: string, blockHeight: number) => void; - bulkRemove: (ids: string[], blockHeight: number) => void; +export interface IStoreModelProvider { + poi: IPoi | null; + metadata: IMetadata; + + init(historical: boolean, useCockroachDb: boolean, meta: MetadataRepo, poi?: PoiRepo): void; + + getModel(entity: string): IModel; + + // addExporter(entity: string, exporterStore: CsvStoreService): void; + + applyPendingChanges(height: number, dataSourcesCompleted: boolean, tx?: Transaction): Promise; + + updateModels({modifiedModels, removedModels}: {modifiedModels: ModelStatic[]; removedModels: string[]}): void; } + export interface ICachedModelControl { isFlushable: boolean; hasAssociations?: boolean; diff --git a/packages/node-core/src/indexer/storeModelProvider/utils.ts b/packages/node-core/src/indexer/storeModelProvider/utils.ts new file mode 100644 index 0000000000..0ca72be4cb --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/utils.ts @@ -0,0 +1,16 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {StoreCacheService} from './storeCache.service'; +import {IStoreModelProvider} from './types'; + +export async function cacheProviderFlushData(modelProvider: IStoreModelProvider, forceFlush?: boolean) { + if (modelProvider instanceof StoreCacheService) { + await modelProvider.flushData(forceFlush); + } +} +export async function cacheProviderResetData(modelProvider: IStoreModelProvider) { + if (modelProvider instanceof StoreCacheService) { + await modelProvider.resetData(); + } +} diff --git a/packages/node-core/src/indexer/test.runner.spec.ts b/packages/node-core/src/indexer/test.runner.spec.ts index 1a977c03c6..c166f90ce2 100644 --- a/packages/node-core/src/indexer/test.runner.spec.ts +++ b/packages/node-core/src/indexer/test.runner.spec.ts @@ -27,7 +27,7 @@ describe('TestRunner', () => { storeServiceMock = { setBlockHeight: jest.fn(), getStore: jest.fn().mockReturnValue({}), - storeCache: mockStoreCache, + modelProvider: mockStoreCache, }; sandboxMock = { @@ -88,7 +88,7 @@ describe('TestRunner', () => { (testRunner as any).storeService = { getStore: () => storeMock, setBlockHeight: jest.fn(), - storeCache: mockStoreCache, + modelProvider: mockStoreCache, } as any; await testRunner.runTest(testMock, sandboxMock, indexBlock); @@ -123,7 +123,7 @@ describe('TestRunner', () => { (testRunner as any).storeService = { getStore: () => storeMock, setBlockHeight: jest.fn(), - storeCache: mockStoreCache, + modelProvider: mockStoreCache, } as any; await testRunner.runTest(testMock, sandboxMock, indexBlock); @@ -180,7 +180,7 @@ describe('TestRunner', () => { (testRunner as any).storeService = { getStore: () => storeMock, setBlockHeight: jest.fn(), - storeCache: mockStoreCache, + modelProvider: mockStoreCache, } as any; await testRunner.runTest(testMock, sandboxMock, indexBlock); diff --git a/packages/node-core/src/indexer/test.runner.ts b/packages/node-core/src/indexer/test.runner.ts index 84de108059..37df47af63 100644 --- a/packages/node-core/src/indexer/test.runner.ts +++ b/packages/node-core/src/indexer/test.runner.ts @@ -11,6 +11,7 @@ import {NodeConfig} from '../configure/NodeConfig'; import {getLogger} from '../logger'; import {TestSandbox} from './sandbox'; import {StoreService} from './store.service'; +import {cacheProviderFlushData} from './storeModelProvider'; import {IBlock, IIndexerManager} from './types'; const logger = getLogger('test-runner'); @@ -56,9 +57,9 @@ export class TestRunner { logger.debug('Fetching block'); const [block] = await this.apiService.fetchBlocks([test.blockHeight]); - this.storeService.setBlockHeight(test.blockHeight); + await this.storeService.setBlockHeight(test.blockHeight); // Ensure a block height is set so that data is flushed correctly - this.storeService.storeCache.metadata.set('lastProcessedHeight', test.blockHeight - 1); + await this.storeService.modelProvider.metadata.set('lastProcessedHeight', test.blockHeight - 1); const store = this.storeService.getStore(); sandbox.freeze(store, 'store'); @@ -70,7 +71,7 @@ export class TestRunner { try { await indexBlock(block, test.handler, this.indexerManager, this.apiService); - await this.storeService.storeCache.flushCache(true); + await cacheProviderFlushData(this.storeService.modelProvider, true); } catch (e: any) { logger.warn(`Test: ${test.name} field due to runtime error`, e); this.failedTestSummary = { @@ -98,7 +99,7 @@ export class TestRunner { } else { Object.keys(actualEntity).forEach((attr) => { // EntityClass has private store on it, don't need to check it. - if (attr === 'store') return; + if (attr === '#store') return; const expectedAttr = (expectedEntity as Record)[attr] ?? null; const actualAttr = (actualEntity as Record)[attr] ?? null; @@ -136,7 +137,7 @@ export class TestRunner { } } - await this.storeService.storeCache.flushCache(true); + await cacheProviderFlushData(this.storeService.modelProvider, true); logger.info( `Test: ${test.name} completed with ${chalk.green(`${this.passedTests} passed`)} and ${chalk.red( `${this.failedTests} failed` diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index 45f21d1ed3..3b22684897 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -16,7 +16,7 @@ export interface ISubqueryProject< N extends IProjectNetworkConfig = IProjectNetworkConfig, DS extends BaseDataSource = BaseDataSource, T extends BaseTemplateDataSource = BaseTemplateDataSource, - C = unknown + C = unknown, > extends Omit, 'schema' | 'version' | 'name' | 'specVersion' | 'description'> { readonly schema: GraphQLSchema; applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts index 68907abd00..a8fc670fe4 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts @@ -5,7 +5,7 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {Header, IBlock} from '../indexer'; -import {StoreCacheService, CacheMetadataModel} from './storeCache'; +import {StoreCacheService, CacheMetadataModel} from './storeModelProvider'; import { METADATA_LAST_FINALIZED_PROCESSED_KEY, METADATA_UNFINALIZED_BLOCKS_KEY, @@ -151,7 +151,7 @@ describe('UnfinalizedBlocksService', () => { // After this the call stack is something like: // indexerManager -> blockDispatcher -> project -> project -> reindex -> blockDispatcher.resetUnfinalizedBlocks - unfinalizedBlocksService.resetUnfinalizedBlocks(); + await unfinalizedBlocksService.resetUnfinalizedBlocks(); expect((unfinalizedBlocksService as any).unfinalizedBlocks).toEqual([]); }); @@ -251,7 +251,7 @@ describe('UnfinalizedBlocksService', () => { storeCache.init(true, false, {} as any, undefined); - storeCache.metadata.set( + await storeCache.metadata.set( METADATA_UNFINALIZED_BLOCKS_KEY, JSON.stringify([ {blockHeight: 90, blockHash: '0xabcd'}, @@ -259,7 +259,7 @@ describe('UnfinalizedBlocksService', () => { {blockHeight: 92, blockHash: '0xabc92'}, ]) ); - storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, 90); + await storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, 90); const unfinalizedBlocksService2 = new UnfinalizedBlocksService({unfinalizedBlocks: false} as any, storeCache); const reindex = jest.fn().mockReturnValue(Promise.resolve()); diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index f523d8f985..51d20f3ecc 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; +import {Transaction} from '@subql/x-sequelize'; import {isEqual, last} from 'lodash'; import {NodeConfig} from '../configure'; import {Header, IBlock} from '../indexer/types'; @@ -10,7 +11,7 @@ import {exitWithError} from '../process'; import {mainThreadOnly} from '../utils'; import {ProofOfIndex} from './entities'; import {PoiBlock} from './poi'; -import {StoreCacheService} from './storeCache'; +import {IStoreModelProvider} from './storeModelProvider'; const logger = getLogger('UnfinalizedBlocks'); @@ -27,8 +28,8 @@ export interface IUnfinalizedBlocksService extends IUnfinalizedBlocksServiceU init(reindex: (targetHeight: number) => Promise): Promise; processUnfinalizedBlocks(block: IBlock | undefined): Promise; processUnfinalizedBlockHeader(header: Header | undefined): Promise; - resetUnfinalizedBlocks(): void; - resetLastFinalizedVerifiedHeight(): void; + resetUnfinalizedBlocks(tx?: Transaction): Promise; + resetLastFinalizedVerifiedHeight(tx?: Transaction): Promise; getMetadataUnfinalizedBlocks(): Promise; } @@ -69,7 +70,10 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo return this._finalizedHeader; } - constructor(protected readonly nodeConfig: NodeConfig, protected readonly storeCache: StoreCacheService) {} + constructor( + protected readonly nodeConfig: NodeConfig, + protected readonly storeModelProvider: IStoreModelProvider + ) {} async init(reindex: (targetHeight: number) => Promise): Promise { logger.info(`Unfinalized blocks is ${this.nodeConfig.unfinalizedBlocks ? 'enabled' : 'disabled'}`); @@ -91,8 +95,8 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo logger.info(`Successful rewind to block ${rewindHeight}!`); return rewindHeight; } else { - this.resetUnfinalizedBlocks(); - this.resetLastFinalizedVerifiedHeight(); + await this.resetUnfinalizedBlocks(); + await this.resetLastFinalizedVerifiedHeight(); } } } @@ -103,14 +107,14 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo async processUnfinalizedBlockHeader(header?: Header): Promise { if (header) { - this.registerUnfinalizedBlock(header); + await this.registerUnfinalizedBlock(header); } const forkedHeader = await this.hasForked(); if (!forkedHeader) { // Remove blocks that are now confirmed finalized - this.deleteFinalizedBlock(); + await this.deleteFinalizedBlock(); } else { // Get the last unfinalized block that is now finalized return this.getLastCorrectFinalizedBlock(forkedHeader); @@ -130,7 +134,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo this.finalizedHeader = header; } - private registerUnfinalizedBlock(header: Header): void { + private async registerUnfinalizedBlock(header: Header): Promise { if (header.blockHeight <= this.finalizedBlockNumber) return; // Ensure order @@ -143,14 +147,14 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo } this.unfinalizedBlocks.push(header); - this.saveUnfinalizedBlocks(this.unfinalizedBlocks); + await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); } - private deleteFinalizedBlock(): void { + private async deleteFinalizedBlock(): Promise { if (this.lastCheckedBlockHeight !== undefined && this.lastCheckedBlockHeight < this.finalizedBlockNumber) { this.removeFinalized(this.finalizedBlockNumber); - this.saveLastFinalizedVerifiedHeight(this.finalizedBlockNumber); - this.saveUnfinalizedBlocks(this.unfinalizedBlocks); + await this.saveLastFinalizedVerifiedHeight(this.finalizedBlockNumber); + await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); } this.lastCheckedBlockHeight = this.finalizedBlockNumber; } @@ -239,7 +243,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo // Finds the last POI that had a correct block hash, this is used with the Eth sdk protected async findFinalizedUsingPOI(header: Header): Promise

{ - const poiModel = this.storeCache.poi; + const poiModel = this.storeModelProvider.poi; if (!poiModel) { throw new Error(POI_NOT_ENABLED_ERROR_MESSAGE); } @@ -279,26 +283,26 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo throw new Error('Unable to find a POI block with matching block hash'); } - private saveUnfinalizedBlocks(unfinalizedBlocks: UnfinalizedBlocks): void { - return this.storeCache.metadata.set(METADATA_UNFINALIZED_BLOCKS_KEY, JSON.stringify(unfinalizedBlocks)); + private async saveUnfinalizedBlocks(unfinalizedBlocks: UnfinalizedBlocks): Promise { + return this.storeModelProvider.metadata.set(METADATA_UNFINALIZED_BLOCKS_KEY, JSON.stringify(unfinalizedBlocks)); } - private saveLastFinalizedVerifiedHeight(height: number): void { - return this.storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, height); + private async saveLastFinalizedVerifiedHeight(height: number): Promise { + return this.storeModelProvider.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, height); } - resetUnfinalizedBlocks(): void { - this.storeCache.metadata.set(METADATA_UNFINALIZED_BLOCKS_KEY, '[]'); + async resetUnfinalizedBlocks(tx?: Transaction): Promise { + await this.storeModelProvider.metadata.set(METADATA_UNFINALIZED_BLOCKS_KEY, '[]', tx); this.unfinalizedBlocks = []; } - resetLastFinalizedVerifiedHeight(): void { - return this.storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, null as any); + async resetLastFinalizedVerifiedHeight(tx?: Transaction): Promise { + return this.storeModelProvider.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, null as any, tx); } //string should be jsonb object async getMetadataUnfinalizedBlocks(): Promise { - const val = await this.storeCache.metadata.find(METADATA_UNFINALIZED_BLOCKS_KEY); + const val = await this.storeModelProvider.metadata.find(METADATA_UNFINALIZED_BLOCKS_KEY); if (val) { return JSON.parse(val) as UnfinalizedBlocks; } @@ -306,6 +310,6 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo } async getLastFinalizedVerifiedHeight(): Promise { - return this.storeCache.metadata.find(METADATA_LAST_FINALIZED_PROCESSED_KEY); + return this.storeModelProvider.metadata.find(METADATA_LAST_FINALIZED_PROCESSED_KEY); } } diff --git a/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts index 59a0b46556..adfd3262de 100644 --- a/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts @@ -31,10 +31,12 @@ export class WorkerUnfinalizedBlocksService implements IUnfinalizedBlocksServ init(reindex: (targetHeight: number) => Promise): Promise { throw new Error('This method should not be called from a worker'); } - resetUnfinalizedBlocks(): void { + // eslint-disable-next-line @typescript-eslint/require-await + async resetUnfinalizedBlocks(): Promise { throw new Error('This method should not be called from a worker'); } - resetLastFinalizedVerifiedHeight(): void { + // eslint-disable-next-line @typescript-eslint/require-await + async resetLastFinalizedVerifiedHeight(): Promise { throw new Error('This method should not be called from a worker'); } diff --git a/packages/node-core/src/meta/health.service.ts b/packages/node-core/src/meta/health.service.ts index e23cf542be..f6b7bdb2f2 100644 --- a/packages/node-core/src/meta/health.service.ts +++ b/packages/node-core/src/meta/health.service.ts @@ -22,7 +22,10 @@ export class HealthService { private healthTimeout: number; private indexerHealthy?: boolean; - constructor(protected nodeConfig: NodeConfig, private storeService: StoreService) { + constructor( + protected nodeConfig: NodeConfig, + private storeService: StoreService + ) { this.healthTimeout = Math.max(DEFAULT_TIMEOUT, this.nodeConfig.timeout * 1000); this.blockTime = Math.max(DEFAULT_BLOCK_TIME, this.nodeConfig.blockTime); } @@ -39,7 +42,7 @@ export class HealthService { } if (healthy !== this.indexerHealthy) { - await this.storeService.storeCache.metadata.model.upsert({key: 'indexerHealthy', value: healthy}); + await this.storeService.modelProvider.metadata.model.upsert({key: 'indexerHealthy', value: healthy}); this.indexerHealthy = healthy; } } diff --git a/packages/node-core/src/subcommands/reindex.service.ts b/packages/node-core/src/subcommands/reindex.service.ts index a93fb2c855..d35cf42af5 100644 --- a/packages/node-core/src/subcommands/reindex.service.ts +++ b/packages/node-core/src/subcommands/reindex.service.ts @@ -6,7 +6,14 @@ import {Inject, Injectable} from '@nestjs/common'; import {BaseDataSource} from '@subql/types-core'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig, ProjectUpgradeService} from '../configure'; -import {CacheMetadataModel, IUnfinalizedBlocksService, StoreService, ISubqueryProject, PoiService} from '../indexer'; +import { + IUnfinalizedBlocksService, + StoreService, + ISubqueryProject, + PoiService, + IMetadata, + cacheProviderFlushData, +} from '../indexer'; import {DynamicDsService} from '../indexer/dynamic-ds.service'; import {getLogger} from '../logger'; import {exitWithError, monitorWrite} from '../process'; @@ -17,7 +24,7 @@ const logger = getLogger('Reindex'); @Injectable() export class ReindexService

{ - private _metadataRepo?: CacheMetadataModel; + private _metadataRepo?: IMetadata; private _lastProcessedHeight?: number; constructor( @@ -32,7 +39,7 @@ export class ReindexService

) {} - private get metadataRepo(): CacheMetadataModel { + private get metadataRepo(): IMetadata { assert(this._metadataRepo, 'BaseReindexService has not been init'); return this._metadataRepo; } @@ -51,7 +58,7 @@ export class ReindexService