Skip to content

Commit

Permalink
Disable cache (#2568)
Browse files Browse the repository at this point in the history
* WIP implment non cache metadata

* WIP extracting out interfaces and implementing non-cache versions

* Fix remaining build errors and update interfaces to abstract between flushing cache/managing db transaction

* flag `cache-disable`  and  Modifying inheritance relationships

* rename storeStore to modelProvider

* rename #storeModel to #modelProvider

* update cacheProvider flushData

* remane storeModelProvider: IStoreModelProvider

* Rename the directory file to storeModelProvider;   rename MetadataModel to MetadataEntity

* node package update provider StoreCacheService to IStoreModelProvider

* fix  storeService dependency loop and repeat db transaction creation

* remove isCachePolicy and update enable-cache flag

* fix poi Inject issues

* fix plainModel _block_height

* model unit test

* unit test

* fix build issues

* fix model block range

* Fix the issue of duplicate data writing in the db store.

* Simplify the logic

* There may be an issue with the store field in the poi table.

* set disable cache hook

* db store unit test

* some change

* fix unit test

* rename test file

* fix health check api

* remove currentTxCache

* rename model test

* function cacheProviderFlushData

* db store csv export

* some improve

* Fix the issue where data is written incorrectly after being deleted and recreated in cache mode.

* remove batch size and beforeBulkDestroy hook

* cacheModel delete test

* change log

* fix test

* fix unit test

* unit test fix

---------

Co-authored-by: Tate <[email protected]>
  • Loading branch information
stwiname and yoozo authored Nov 21, 2024
1 parent 0103f5e commit 91cbc63
Show file tree
Hide file tree
Showing 77 changed files with 1,471 additions and 569 deletions.
4 changes: 4 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- lazy loading for monitor service (#2583)
- Add an `--enable-cache` flag, allowing you to choose between DB or cache for IO operations.

### Fixed
- When using a GET query to retrieve an entity, it will include a “store” field.

### Fixed
- When configuring multiple endpoints, poor network conditions may lead to block crawling delays. (#2572)
Expand Down
5 changes: 5 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface IConfig {
readonly csvOutDir?: string;
readonly monitorOutDir: string;
readonly monitorFileSize?: number;
readonly enableCache?: boolean;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> & Pick<IConfig, 'subquery'>;
Expand Down Expand Up @@ -328,6 +329,10 @@ export class NodeConfig<C extends IConfig = IConfig> implements IConfig {
return this._config.monitorFileSize ?? this._config.proofOfIndex ? defaultMonitorFileSize : 0;
}

get enableCache(): boolean {
return this._config.enableCache ?? true;
}

merge(config: Partial<IConfig>): this {
assign(this._config, config);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import {SchedulerRegistry} from '@nestjs/schedule';
import {Sequelize} from '@subql/x-sequelize';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {CacheMetadataModel, IStoreModelProvider, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {NodeConfig} from './NodeConfig';
import {IProjectUpgradeService, ProjectUpgradeService, upgradableSubqueryProject} from './ProjectUpgrade.service';

Expand Down Expand Up @@ -289,7 +289,7 @@ describe('Project Upgrades', () => {
describe('Upgradable subquery project', () => {
let upgradeService: ProjectUpgradeService<ISubqueryProject>;
let project: ISubqueryProject & IProjectUpgradeService<ISubqueryProject>;
let storeCache: StoreCacheService;
let storeCache: IStoreModelProvider;

beforeEach(async () => {
storeCache = new StoreCacheService({} as any, {} as any, {} as any, new SchedulerRegistry());
Expand Down
37 changes: 18 additions & 19 deletions packages/node-core/src/configure/ProjectUpgrade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {ParentProject} from '@subql/types-core';
import {Sequelize, Transaction} from '@subql/x-sequelize';
import {findLast, last, parseInt} from 'lodash';
import {SchemaMigrationService} from '../db';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {IMetadata, IStoreModelProvider, ISubqueryProject, StoreService} from '../indexer';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {getStartHeight, mainThreadOnly} from '../utils';
Expand Down Expand Up @@ -107,7 +107,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
#currentHeight: number;
#currentProject: P;

#storeCache?: StoreCacheService;
#modelProvider?: IStoreModelProvider;
#initialized = false;

private config?: NodeConfig;
Expand Down Expand Up @@ -150,16 +150,10 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return;
}
this.#initialized = true;
this.#storeCache = storeService.storeCache;
this.#modelProvider = storeService.modelProvider;
this.config = config;

this.migrationService = new SchemaMigrationService(
sequelize,
storeService,
storeService.storeCache._flushCache.bind(storeService.storeCache),
schema,
config
);
this.migrationService = new SchemaMigrationService(sequelize, storeService, schema, config);

const indexedDeployments = await this.getDeploymentsMetadata();

Expand All @@ -172,7 +166,9 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
} catch (e: any) {
if (e instanceof EntryNotFoundError) {
throw new Error(
`Unable to find project for height ${this.#currentHeight}. If the project start height is increased it will not jump to that block. Please either reindex or specify blocks to bypass.`,
`Unable to find project for height ${
this.#currentHeight
}. If the project start height is increased it will not jump to that block. Please either reindex or specify blocks to bypass.`,
{cause: e}
);
}
Expand Down Expand Up @@ -207,9 +203,12 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return this.#currentHeight;
}

private get metadata(): CacheMetadataModel {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
return this.#storeCache.metadata;
private get metadata(): IMetadata {
assert(
this.#modelProvider?.metadata,
'Project Upgrades service has not been initialized, unable to update metadata'
);
return this.#modelProvider.metadata;
}

async rewind(
Expand Down Expand Up @@ -243,7 +242,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
nextId = iterator.next();
}
// this remove any deployments in metadata beyond target height
await this.removeIndexedDeployments(targetBlockHeight);
await this.removeIndexedDeployments(targetBlockHeight, transaction);
}

private async migrate(
Expand All @@ -256,7 +255,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
assert(this.migrationService, 'MigrationService is undefined');
if (this.config.allowSchemaMigration) {
await this.migrationService.run(project.schema, newProject.schema, transaction);
this.metadata.setIncrement('schemaMigrationCount');
await this.metadata.setIncrement('schemaMigrationCount', undefined, transaction);
}
}
}
Expand Down Expand Up @@ -450,11 +449,11 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject

deployments[blockHeight] = id;

this.metadata.set('deployments', JSON.stringify(deployments));
await this.metadata.set('deployments', JSON.stringify(deployments));
}

// Remove metadata deployments beyond this blockHeight
async removeIndexedDeployments(blockHeight: number): Promise<void> {
async removeIndexedDeployments(blockHeight: number, tx?: Transaction): Promise<void> {
const deployments = await this.getDeploymentsMetadata();

// remove all future block heights
Expand All @@ -466,6 +465,6 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
}
});

this.metadata.set('deployments', JSON.stringify(deployments));
await this.metadata.set('deployments', JSON.stringify(deployments), tx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ async function setup(
await storeService.initCoreTables(schemaName);
await initDbSchema(schemaName, storeService);

return new SchemaMigrationService(
sequelize,
storeService,
storeCache._flushCache.bind(storeCache),
schemaName,
config
);
return new SchemaMigrationService(sequelize, storeService, schemaName, config);
}

function loadGqlSchema(fileName: string): GraphQLSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import {SUPPORT_DB} from '@subql/common';
import {getAllEntitiesRelations, GraphQLModelsType, GraphQLRelationsType} from '@subql/utils';
import {ModelStatic, Sequelize, Transaction} from '@subql/x-sequelize';
import {Sequelize, Transaction} from '@subql/x-sequelize';
import {GraphQLSchema} from 'graphql';
import {NodeConfig} from '../../configure';
import {StoreService} from '../../indexer';
import {cacheProviderFlushData, StoreService} from '../../indexer';
import {getLogger} from '../../logger';
import {sortModels} from '../sync-helper';
import {Migration} from './migration';
Expand All @@ -29,7 +29,6 @@ export class SchemaMigrationService {
constructor(
private sequelize: Sequelize,
private storeService: StoreService,
private flushCache: (flushAll?: boolean) => Promise<void>,
private dbSchema: string,
private config: NodeConfig,
private dbType: SUPPORT_DB = SUPPORT_DB.postgres
Expand Down Expand Up @@ -115,8 +114,7 @@ export class SchemaMigrationService {
const sortedAddedModels = alignModelOrder<GraphQLModelsType[]>(sortedSchemaModels, addedModels);
const sortedModifiedModels = alignModelOrder<ModifiedModels>(sortedSchemaModels, modifiedModels);

// Flush any pending data before running the migration
await this.flushCache(true);
await cacheProviderFlushData(this.storeService.modelProvider, true);

const migrationAction = await Migration.create(
this.sequelize,
Expand Down Expand Up @@ -186,10 +184,10 @@ export class SchemaMigrationService {
const modelChanges = await migrationAction.run(transaction);

// Update any relevant application state so the right models are used
this.storeService.storeCache.updateModels(modelChanges);
this.storeService.modelProvider.updateModels(modelChanges);
await this.storeService.updateModels(this.dbSchema, getAllEntitiesRelations(nextSchema));

await this.flushCache();
await cacheProviderFlushData(this.storeService.modelProvider, true);
} catch (e: any) {
logger.error(e, 'Failed to execute Schema Migration');
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import assert from 'assert';

import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {hexToU8a, u8aEq} from '@subql/utils';
import {Transaction} from '@subql/x-sequelize';
import {NodeConfig, IProjectUpgradeService} from '../../configure';
import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../events';
import {getLogger} from '../../logger';
import {exitWithError, monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {IQueue, mainThreadOnly} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {PoiBlock, PoiSyncService} from '../poi';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {CachePoiModel} from '../storeCache/cachePoi';
import {IStoreModelProvider} from '../storeModelProvider';
import {IPoi} from '../storeModelProvider/poi';
import {IBlock, IProjectService, ISubqueryProject} from '../types';

const logger = getLogger('BaseBlockDispatcherService');
Expand Down Expand Up @@ -63,7 +64,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
private projectUpgradeService: IProjectUpgradeService,
protected queue: Q,
protected storeService: StoreService,
private storeCacheService: StoreCacheService,
private storeModelProvider: IStoreModelProvider,
private poiSyncService: PoiSyncService,
protected monitorService?: MonitorServiceInterface
) {
Expand All @@ -75,7 +76,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
async init(onDynamicDsCreated: (height: number) => void): Promise<void> {
this._onDynamicDsCreated = onDynamicDsCreated;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.setProcessedBlockCount((await this.storeCacheService.metadata.find('processedBlockCount', 0))!);
this.setProcessedBlockCount((await this.storeModelProvider.metadata.find('processedBlockCount', 0))!);
}

get queueSize(): number {
Expand Down Expand Up @@ -157,7 +158,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
@mainThreadOnly()
protected async preProcessBlock(height: number): Promise<void> {
monitorCreateBlockStart(height);
this.storeService.setBlockHeight(height);
await this.storeService.setBlockHeight(height);

await this.projectUpgradeService.setCurrentHeight(height);

Expand Down Expand Up @@ -198,10 +199,10 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
throw e;
}
} else {
this.updateStoreMetadata(height);
await this.updateStoreMetadata(height, undefined, this.storeService.transaction);

const operationHash = this.storeService.getOperationMerkleRoot();
this.createPOI(height, blockHash, operationHash);
await this.createPOI(height, blockHash, operationHash, this.storeService.transaction);

if (dynamicDsCreated) {
this.onDynamicDsCreated(height);
Expand All @@ -215,21 +216,11 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
this.setLatestProcessedHeight(height);
}

if (this.nodeConfig.storeCacheAsync) {
// Flush all completed block data and don't wait
await this.storeCacheService.flushAndWaitForCapacity(false)?.catch((e) => {
exitWithError(new Error(`Flushing cache failed`, {cause: e}), logger);
});
} else {
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false);
}

if (!this.projectService.hasDataSourcesAfterHeight(height)) {
const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
await this.storeCacheService.flushCache(false);
exitWithError(msg, logger, 0);
}
await this.storeModelProvider.applyPendingChanges(
height,
!this.projectService.hasDataSourcesAfterHeight(height),
this.storeService.transaction
);
}

@OnEvent(AdminEvent.rewindTarget)
Expand Down Expand Up @@ -258,7 +249,12 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
* @param operationHash
* @private
*/
private createPOI(height: number, blockHash: string, operationHash: Uint8Array): void {
private async createPOI(
height: number,
blockHash: string,
operationHash: Uint8Array,
tx?: Transaction
): Promise<void> {
if (!this.nodeConfig.proofOfIndex) {
return;
}
Expand All @@ -268,30 +264,33 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB

const poiBlock = PoiBlock.create(height, blockHash, operationHash, this.project.id);
// This is the first creation of POI
this.poi.bulkUpsert([poiBlock]);
this.storeCacheService.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}]);
await this.poi.bulkUpsert([poiBlock], tx);
await this.storeModelProvider.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}], tx);
this.eventEmitter.emit(PoiEvent.PoiTarget, {
height,
timestamp: Date.now(),
});
}

@mainThreadOnly()
private updateStoreMetadata(height: number, updateProcessed = true): void {
const meta = this.storeCacheService.metadata;
private async updateStoreMetadata(height: number, updateProcessed = true, tx?: Transaction): Promise<void> {
const meta = this.storeModelProvider.metadata;
// Update store metadata
meta.setBulk([
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
]);
await meta.setBulk(
[
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
],
tx
);
// Db Metadata increase BlockCount, in memory ref to block-dispatcher _processedBlockCount
if (updateProcessed) {
meta.setIncrement('processedBlockCount');
await meta.setIncrement('processedBlockCount', undefined, tx);
}
}

private get poi(): CachePoiModel {
const poi = this.storeCacheService.poi;
private get poi(): IPoi {
const poi = this.storeModelProvider.poi;
if (!poi) {
throw new Error('Poi service expected poi repo but it was not found');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {exitWithError, monitorWrite} from '../../process';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IStoreModelProvider} from '../storeModelProvider';
import {IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';

Expand Down Expand Up @@ -45,7 +45,7 @@ export abstract class BlockDispatcher<B, DS>
projectService: IProjectService<DS>,
projectUpgradeService: IProjectUpgradeService,
storeService: StoreService,
storeCacheService: StoreCacheService,
storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
fetchBlocksBatches: BatchBlockFetcher<B>
Expand All @@ -58,7 +58,7 @@ export abstract class BlockDispatcher<B, DS>
projectUpgradeService,
new Queue(nodeConfig.batchSize * 3),
storeService,
storeCacheService,
storeModelProvider,
poiSyncService
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {EventEmitter2} from '@nestjs/event-emitter';
import {IProjectUpgradeService, NodeConfig} from '../../configure';
import {PoiSyncService} from '../poi';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {StoreCacheService} from '../storeModelProvider';
import {IProjectService, ISubqueryProject} from '../types';
import {WorkerBlockDispatcher} from './worker-block-dispatcher';

Expand Down
Loading

0 comments on commit 91cbc63

Please sign in to comment.