Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable cache #2568

Merged
merged 41 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a58f4e1
WIP implment non cache metadata
stwiname Sep 29, 2024
58672be
WIP extracting out interfaces and implementing non-cache versions
stwiname Oct 6, 2024
838ae83
Fix remaining build errors and update interfaces to abstract between …
yoozo Oct 20, 2024
358baff
flag `cache-disable` and Modifying inheritance relationships
yoozo Oct 21, 2024
10a4a57
rename storeStore to modelProvider
yoozo Oct 22, 2024
64a90cf
rename #storeModel to #modelProvider
yoozo Oct 22, 2024
33ad32c
update cacheProvider flushData
yoozo Oct 25, 2024
3a39e78
Merge branch 'main' into disable-cache
yoozo Oct 30, 2024
d2af4bb
remane storeModelProvider: IStoreModelProvider
yoozo Oct 31, 2024
0d51561
Rename the directory file to storeModelProvider; rename MetadataMod…
yoozo Nov 1, 2024
b428e6e
node package update provider StoreCacheService to IStoreModelProvider
yoozo Nov 1, 2024
200361b
fix  storeService dependency loop and repeat db transaction creation
yoozo Nov 3, 2024
c53eaf3
remove isCachePolicy and update enable-cache flag
yoozo Nov 4, 2024
fe23d05
fix poi Inject issues
yoozo Nov 4, 2024
95b56c7
fix plainModel _block_height
yoozo Nov 7, 2024
9306d35
model unit test
yoozo Nov 8, 2024
c975ee5
unit test
yoozo Nov 10, 2024
b5774f0
Merge branch 'main' into disable-cache
yoozo Nov 11, 2024
708cb04
fix build issues
yoozo Nov 11, 2024
162ea87
fix model block range
yoozo Nov 13, 2024
4058698
Fix the issue of duplicate data writing in the db store.
yoozo Nov 14, 2024
71affcb
Simplify the logic
yoozo Nov 14, 2024
f7023ea
There may be an issue with the store field in the poi table.
yoozo Nov 14, 2024
4d09b2d
set disable cache hook
yoozo Nov 15, 2024
4c9a450
db store unit test
yoozo Nov 15, 2024
0ce7161
some change
yoozo Nov 15, 2024
b2b5d19
fix unit test
yoozo Nov 16, 2024
311e2fb
rename test file
yoozo Nov 18, 2024
5ad72ee
fix health check api
yoozo Nov 18, 2024
8943a29
remove currentTxCache
yoozo Nov 18, 2024
2a7d87e
rename model test
yoozo Nov 18, 2024
337a427
function cacheProviderFlushData
yoozo Nov 19, 2024
6356909
db store csv export
yoozo Nov 19, 2024
a77b6ec
some improve
yoozo Nov 19, 2024
b84f8c6
Fix the issue where data is written incorrectly after being deleted a…
yoozo Nov 19, 2024
afaa536
remove batch size and beforeBulkDestroy hook
yoozo Nov 20, 2024
719c49f
cacheModel delete test
yoozo Nov 20, 2024
24340e5
change log
yoozo Nov 21, 2024
cd2375b
fix test
yoozo Nov 21, 2024
023c10a
fix unit test
yoozo Nov 21, 2024
c42541a
unit test fix
yoozo Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface IConfig {
readonly csvOutDir?: string;
readonly monitorOutDir: string;
readonly monitorFileSize?: number;
readonly cacheDisable?: boolean;
yoozo marked this conversation as resolved.
Show resolved Hide resolved
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> & Pick<IConfig, 'subquery'>;
Expand Down Expand Up @@ -328,6 +329,10 @@ export class NodeConfig<C extends IConfig = IConfig> implements IConfig {
return this._config.monitorFileSize ?? this._config.proofOfIndex ? defaultMonitorFileSize : 0;
}

get cacheDisable(): boolean {
return this._config.cacheDisable || false;
}

merge(config: Partial<IConfig>): this {
assign(this._config, config);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import {SchedulerRegistry} from '@nestjs/schedule';
import {Sequelize} from '@subql/x-sequelize';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {CacheMetadataModel, IStoreModelProvider, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {NodeConfig} from './NodeConfig';
import {IProjectUpgradeService, ProjectUpgradeService, upgradableSubqueryProject} from './ProjectUpgrade.service';

Expand Down Expand Up @@ -289,7 +289,7 @@ describe('Project Upgrades', () => {
describe('Upgradable subquery project', () => {
let upgradeService: ProjectUpgradeService<ISubqueryProject>;
let project: ISubqueryProject & IProjectUpgradeService<ISubqueryProject>;
let storeCache: StoreCacheService;
let storeCache: IStoreModelProvider;

beforeEach(async () => {
storeCache = new StoreCacheService({} as any, {} as any, {} as any, new SchedulerRegistry());
Expand Down
37 changes: 18 additions & 19 deletions packages/node-core/src/configure/ProjectUpgrade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {ParentProject} from '@subql/types-core';
import {Sequelize, Transaction} from '@subql/x-sequelize';
import {findLast, last, parseInt} from 'lodash';
import {SchemaMigrationService} from '../db';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {IMetadata, IStoreModelProvider, ISubqueryProject, StoreService} from '../indexer';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {getStartHeight, mainThreadOnly} from '../utils';
Expand Down Expand Up @@ -107,7 +107,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
#currentHeight: number;
#currentProject: P;

#storeCache?: StoreCacheService;
#modelProvider?: IStoreModelProvider;
#initialized = false;

private config?: NodeConfig;
Expand Down Expand Up @@ -150,16 +150,10 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return;
}
this.#initialized = true;
this.#storeCache = storeService.storeCache;
this.#modelProvider = storeService.modelProvider;
this.config = config;

this.migrationService = new SchemaMigrationService(
sequelize,
storeService,
storeService.storeCache._flushCache.bind(storeService.storeCache),
schema,
config
);
this.migrationService = new SchemaMigrationService(sequelize, storeService, schema, config);

const indexedDeployments = await this.getDeploymentsMetadata();

Expand All @@ -172,7 +166,9 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
} catch (e: any) {
if (e instanceof EntryNotFoundError) {
throw new Error(
`Unable to find project for height ${this.#currentHeight}. If the project start height is increased it will not jump to that block. Please either reindex or specify blocks to bypass.`,
`Unable to find project for height ${
this.#currentHeight
}. If the project start height is increased it will not jump to that block. Please either reindex or specify blocks to bypass.`,
{cause: e}
);
}
Expand Down Expand Up @@ -207,9 +203,12 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return this.#currentHeight;
}

private get metadata(): CacheMetadataModel {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
return this.#storeCache.metadata;
private get metadata(): IMetadata {
assert(
this.#modelProvider?.metadata,
'Project Upgrades service has not been initialized, unable to update metadata'
);
return this.#modelProvider.metadata;
}

async rewind(
Expand Down Expand Up @@ -243,7 +242,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
nextId = iterator.next();
}
// this remove any deployments in metadata beyond target height
await this.removeIndexedDeployments(targetBlockHeight);
await this.removeIndexedDeployments(targetBlockHeight, transaction);
}

private async migrate(
Expand All @@ -256,7 +255,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
assert(this.migrationService, 'MigrationService is undefined');
if (this.config.allowSchemaMigration) {
await this.migrationService.run(project.schema, newProject.schema, transaction);
this.metadata.setIncrement('schemaMigrationCount');
await this.metadata.setIncrement('schemaMigrationCount', undefined, transaction);
}
}
}
Expand Down Expand Up @@ -450,11 +449,11 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject

deployments[blockHeight] = id;

this.metadata.set('deployments', JSON.stringify(deployments));
await this.metadata.set('deployments', JSON.stringify(deployments));
}

// Remove metadata deployments beyond this blockHeight
async removeIndexedDeployments(blockHeight: number): Promise<void> {
async removeIndexedDeployments(blockHeight: number, tx?: Transaction): Promise<void> {
const deployments = await this.getDeploymentsMetadata();

// remove all future block heights
Expand All @@ -466,6 +465,6 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
}
});

this.metadata.set('deployments', JSON.stringify(deployments));
await this.metadata.set('deployments', JSON.stringify(deployments), tx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ async function setup(
await storeService.initCoreTables(schemaName);
await initDbSchema(schemaName, storeService);

return new SchemaMigrationService(
sequelize,
storeService,
storeCache._flushCache.bind(storeCache),
schemaName,
config
);
return new SchemaMigrationService(sequelize, storeService, schemaName, config);
}

function loadGqlSchema(fileName: string): GraphQLSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import {SUPPORT_DB} from '@subql/common';
import {getAllEntitiesRelations, GraphQLModelsType, GraphQLRelationsType} from '@subql/utils';
import {ModelStatic, Sequelize, Transaction} from '@subql/x-sequelize';
import {Sequelize, Transaction} from '@subql/x-sequelize';
import {GraphQLSchema} from 'graphql';
import {NodeConfig} from '../../configure';
import {StoreService} from '../../indexer';
import {isCachePolicy, StoreService} from '../../indexer';
import {getLogger} from '../../logger';
import {sortModels} from '../sync-helper';
import {Migration} from './migration';
Expand All @@ -29,7 +29,6 @@ export class SchemaMigrationService {
constructor(
private sequelize: Sequelize,
private storeService: StoreService,
private flushCache: (flushAll?: boolean) => Promise<void>,
private dbSchema: string,
private config: NodeConfig,
private dbType: SUPPORT_DB = SUPPORT_DB.postgres
Expand Down Expand Up @@ -115,9 +114,9 @@ export class SchemaMigrationService {
const sortedAddedModels = alignModelOrder<GraphQLModelsType[]>(sortedSchemaModels, addedModels);
const sortedModifiedModels = alignModelOrder<ModifiedModels>(sortedSchemaModels, modifiedModels);

// Flush any pending data before running the migration
await this.flushCache(true);

if (isCachePolicy(this.storeService.modelProvider)) {
await this.storeService.modelProvider.flushData(true);
}
const migrationAction = await Migration.create(
this.sequelize,
this.storeService,
Expand Down Expand Up @@ -186,10 +185,12 @@ export class SchemaMigrationService {
const modelChanges = await migrationAction.run(transaction);

// Update any relevant application state so the right models are used
this.storeService.storeCache.updateModels(modelChanges);
this.storeService.modelProvider.updateModels(modelChanges);
await this.storeService.updateModels(this.dbSchema, getAllEntitiesRelations(nextSchema));

await this.flushCache();
if (isCachePolicy(this.storeService.modelProvider)) {
await this.storeService.modelProvider.flushData(true);
}
} catch (e: any) {
logger.error(e, 'Failed to execute Schema Migration');
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import assert from 'assert';

import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {hexToU8a, u8aEq} from '@subql/utils';
import {Transaction} from '@subql/x-sequelize';
import {NodeConfig, IProjectUpgradeService} from '../../configure';
import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../events';
import {getLogger} from '../../logger';
import {exitWithError, monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {IQueue, mainThreadOnly} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {PoiBlock, PoiSyncService} from '../poi';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {CachePoiModel} from '../storeCache/cachePoi';
import {IStoreModelProvider} from '../storeModelProvider';
import {IPoi} from '../storeModelProvider/poi';
import {IBlock, IProjectService, ISubqueryProject} from '../types';

const logger = getLogger('BaseBlockDispatcherService');
Expand Down Expand Up @@ -63,7 +64,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
private projectUpgradeService: IProjectUpgradeService,
protected queue: Q,
protected storeService: StoreService,
private storeCacheService: StoreCacheService,
private storeModelProvider: IStoreModelProvider,
private poiSyncService: PoiSyncService,
protected monitorService?: MonitorServiceInterface
) {
Expand All @@ -75,7 +76,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
async init(onDynamicDsCreated: (height: number) => void): Promise<void> {
this._onDynamicDsCreated = onDynamicDsCreated;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.setProcessedBlockCount((await this.storeCacheService.metadata.find('processedBlockCount', 0))!);
this.setProcessedBlockCount((await this.storeModelProvider.metadata.find('processedBlockCount', 0))!);
}

get queueSize(): number {
Expand Down Expand Up @@ -157,7 +158,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
@mainThreadOnly()
protected async preProcessBlock(height: number): Promise<void> {
monitorCreateBlockStart(height);
this.storeService.setBlockHeight(height);
await this.storeService.setBlockHeight(height);

await this.projectUpgradeService.setCurrentHeight(height);

Expand Down Expand Up @@ -198,10 +199,10 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
throw e;
}
} else {
this.updateStoreMetadata(height);
await this.updateStoreMetadata(height, undefined, this.storeService.transaction);

const operationHash = this.storeService.getOperationMerkleRoot();
this.createPOI(height, blockHash, operationHash);
await this.createPOI(height, blockHash, operationHash, this.storeService.transaction);

if (dynamicDsCreated) {
this.onDynamicDsCreated(height);
Expand All @@ -215,21 +216,42 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
this.setLatestProcessedHeight(height);
}

if (this.nodeConfig.storeCacheAsync) {
// Flush all completed block data and don't wait
await this.storeCacheService.flushAndWaitForCapacity(false)?.catch((e) => {
exitWithError(new Error(`Flushing cache failed`, {cause: e}), logger);
});
} else {
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false);
}

if (!this.projectService.hasDataSourcesAfterHeight(height)) {
const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
await this.storeCacheService.flushCache(false);
exitWithError(msg, logger, 0);
}
await this.storeModelProvider.applyPendingChanges(
height,
!this.projectService.hasDataSourcesAfterHeight(height),
this.storeService.transaction
);

// if (this.storeModelService instanceof StorCeacheService) {
yoozo marked this conversation as resolved.
Show resolved Hide resolved
// if (this.nodeConfig.storeCacheAsync) {
// // Flush all completed block data and don't wait
// await this.storeModelService.flushAndWaitForCapacity(false)?.catch((e) => {
// exitWithError(new Error(`Flushing cache failed`, { cause: e }), logger);
// });
// } else {
// // Flush all data from cache and wait
// await this.storeModelService.flushCache(false);
// }

// if (!this.projectService.hasDataSourcesAfterHeight(height)) {
// const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
// await this.storeModelService.flushCache(false);
// exitWithError(msg, logger, 0);
// }
// } else if (this.storeModelService instanceof PlainStoreModelService) {
// const tx = this.storeService.transaction;
// if (!tx) {
// exitWithError(new Error('Transaction not found'), logger, 1);
// }
// await tx.commit();

// if (!this.projectService.hasDataSourcesAfterHeight(height)) {
// const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
// exitWithError(msg, logger, 0);
// }
// } else {
// exitWithError(new Error('Unknown store model service'), logger, 1);
// }
}

@OnEvent(AdminEvent.rewindTarget)
Expand Down Expand Up @@ -258,7 +280,12 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
* @param operationHash
* @private
*/
private createPOI(height: number, blockHash: string, operationHash: Uint8Array): void {
private async createPOI(
height: number,
blockHash: string,
operationHash: Uint8Array,
tx?: Transaction
): Promise<void> {
if (!this.nodeConfig.proofOfIndex) {
return;
}
Expand All @@ -268,30 +295,33 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB

const poiBlock = PoiBlock.create(height, blockHash, operationHash, this.project.id);
// This is the first creation of POI
this.poi.bulkUpsert([poiBlock]);
this.storeCacheService.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}]);
await this.poi.bulkUpsert([poiBlock], tx);
await this.storeModelProvider.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}], tx);
this.eventEmitter.emit(PoiEvent.PoiTarget, {
height,
timestamp: Date.now(),
});
}

@mainThreadOnly()
private updateStoreMetadata(height: number, updateProcessed = true): void {
const meta = this.storeCacheService.metadata;
private async updateStoreMetadata(height: number, updateProcessed = true, tx?: Transaction): Promise<void> {
const meta = this.storeModelProvider.metadata;
// Update store metadata
meta.setBulk([
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
]);
await meta.setBulk(
[
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
],
tx
);
// Db Metadata increase BlockCount, in memory ref to block-dispatcher _processedBlockCount
if (updateProcessed) {
meta.setIncrement('processedBlockCount');
await meta.setIncrement('processedBlockCount', undefined, tx);
}
}

private get poi(): CachePoiModel {
const poi = this.storeCacheService.poi;
private get poi(): IPoi {
const poi = this.storeModelProvider.poi;
if (!poi) {
throw new Error('Poi service expected poi repo but it was not found');
}
Expand Down
Loading
Loading