Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable cache #2568

Merged
merged 41 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a58f4e1
WIP implment non cache metadata
stwiname Sep 29, 2024
58672be
WIP extracting out interfaces and implementing non-cache versions
stwiname Oct 6, 2024
838ae83
Fix remaining build errors and update interfaces to abstract between …
yoozo Oct 20, 2024
358baff
flag `cache-disable` and Modifying inheritance relationships
yoozo Oct 21, 2024
10a4a57
rename storeStore to modelProvider
yoozo Oct 22, 2024
64a90cf
rename #storeModel to #modelProvider
yoozo Oct 22, 2024
33ad32c
update cacheProvider flushData
yoozo Oct 25, 2024
3a39e78
Merge branch 'main' into disable-cache
yoozo Oct 30, 2024
d2af4bb
remane storeModelProvider: IStoreModelProvider
yoozo Oct 31, 2024
0d51561
Rename the directory file to storeModelProvider; rename MetadataMod…
yoozo Nov 1, 2024
b428e6e
node package update provider StoreCacheService to IStoreModelProvider
yoozo Nov 1, 2024
200361b
fix  storeService dependency loop and repeat db transaction creation
yoozo Nov 3, 2024
c53eaf3
remove isCachePolicy and update enable-cache flag
yoozo Nov 4, 2024
fe23d05
fix poi Inject issues
yoozo Nov 4, 2024
95b56c7
fix plainModel _block_height
yoozo Nov 7, 2024
9306d35
model unit test
yoozo Nov 8, 2024
c975ee5
unit test
yoozo Nov 10, 2024
b5774f0
Merge branch 'main' into disable-cache
yoozo Nov 11, 2024
708cb04
fix build issues
yoozo Nov 11, 2024
162ea87
fix model block range
yoozo Nov 13, 2024
4058698
Fix the issue of duplicate data writing in the db store.
yoozo Nov 14, 2024
71affcb
Simplify the logic
yoozo Nov 14, 2024
f7023ea
There may be an issue with the store field in the poi table.
yoozo Nov 14, 2024
4d09b2d
set disable cache hook
yoozo Nov 15, 2024
4c9a450
db store unit test
yoozo Nov 15, 2024
0ce7161
some change
yoozo Nov 15, 2024
b2b5d19
fix unit test
yoozo Nov 16, 2024
311e2fb
rename test file
yoozo Nov 18, 2024
5ad72ee
fix health check api
yoozo Nov 18, 2024
8943a29
remove currentTxCache
yoozo Nov 18, 2024
2a7d87e
rename model test
yoozo Nov 18, 2024
337a427
function cacheProviderFlushData
yoozo Nov 19, 2024
6356909
db store csv export
yoozo Nov 19, 2024
a77b6ec
some improve
yoozo Nov 19, 2024
b84f8c6
Fix the issue where data is written incorrectly after being deleted a…
yoozo Nov 19, 2024
afaa536
remove batch size and beforeBulkDestroy hook
yoozo Nov 20, 2024
719c49f
cacheModel delete test
yoozo Nov 20, 2024
24340e5
change log
yoozo Nov 21, 2024
cd2375b
fix test
yoozo Nov 21, 2024
023c10a
fix unit test
yoozo Nov 21, 2024
c42541a
unit test fix
yoozo Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions packages/node-core/src/configure/ProjectUpgrade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,20 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
private onProjectUpgrade?: OnProjectUpgradeCallback<P>;
private migrationService?: SchemaMigrationService;

private constructor(private _projects: BlockHeightMap<P>, currentHeight: number, private _isRewindable = true) {
private constructor(
private _projects: BlockHeightMap<P>,
currentHeight: number,
private _isRewindable = true
) {
logger.info(
`Projects: ${JSON.stringify(
[..._projects.getAll().entries()].reduce((acc, curr) => {
acc[curr[0]] = curr[1].id;
return acc;
}, {} as Record<number, string>),
[..._projects.getAll().entries()].reduce(
(acc, curr) => {
acc[curr[0]] = curr[1].id;
return acc;
},
{} as Record<number, string>
),
undefined,
2
)}`
Expand All @@ -150,16 +157,10 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return;
}
this.#initialized = true;
this.#storeCache = storeService.storeCache;
this.#storeCache = storeService.storeModel;
yoozo marked this conversation as resolved.
Show resolved Hide resolved
this.config = config;

this.migrationService = new SchemaMigrationService(
sequelize,
storeService,
storeService.storeCache._flushCache.bind(storeService.storeCache),
schema,
config
);
this.migrationService = new SchemaMigrationService(sequelize, storeService, schema, config);

const indexedDeployments = await this.getDeploymentsMetadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ async function setup(
await storeService.initCoreTables(schemaName);
await initDbSchema(schemaName, storeService);

return new SchemaMigrationService(
sequelize,
storeService,
storeCache._flushCache.bind(storeCache),
schemaName,
config
);
return new SchemaMigrationService(sequelize, storeService, schemaName, config);
}

function loadGqlSchema(fileName: string): GraphQLSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export class SchemaMigrationService {
constructor(
private sequelize: Sequelize,
private storeService: StoreService,
private flushCache: (flushAll?: boolean) => Promise<void>,
private dbSchema: string,
private config: NodeConfig,
private dbType: SUPPORT_DB = SUPPORT_DB.postgres
Expand Down Expand Up @@ -115,7 +114,7 @@ export class SchemaMigrationService {
const sortedAddedModels = alignModelOrder<GraphQLModelsType[]>(sortedSchemaModels, addedModels);
const sortedModifiedModels = alignModelOrder<ModifiedModels>(sortedSchemaModels, modifiedModels);

await this.storeService.storeCache._flushCache(true);
await this.storeService.storeModel.flushData?.(true);
const migrationAction = await Migration.create(
this.sequelize,
this.storeService,
Expand Down Expand Up @@ -184,10 +183,10 @@ export class SchemaMigrationService {
const modelChanges = await migrationAction.run(transaction);

// Update any relevant application state so the right models are used
this.storeService.storeCache.updateModels(modelChanges);
this.storeService.storeModel.updateModels(modelChanges);
await this.storeService.updateModels(this.dbSchema, getAllEntitiesRelations(nextSchema));

await this.flushCache();
await this.storeService.storeModel.flushData?.(true);
} catch (e: any) {
logger.error(e, 'Failed to execute Schema Migration');
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB

await this.storeModelService.applyPendingChanges(height, !this.projectService.hasDataSourcesAfterHeight(height));

// if (this.storeModelService instanceof StoreCacheService) {
// if (this.storeModelService instanceof StorCeacheService) {
yoozo marked this conversation as resolved.
Show resolved Hide resolved
// if (this.nodeConfig.storeCacheAsync) {
// // Flush all completed block data and don't wait
// await this.storeModelService.flushAndWaitForCapacity(false)?.catch((e) => {
Expand Down
16 changes: 12 additions & 4 deletions packages/node-core/src/indexer/poi/poi.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ const logger = getLogger('PoiService');
@Injectable()
export class PoiService implements OnApplicationShutdown {
private isShutdown = false;
private _poiRepo?: CachePoiModel;
private _poiRepo?: IPoi;

constructor(protected readonly nodeConfig: NodeConfig, private storeCache: IStoreModelService) {}
constructor(
protected readonly nodeConfig: NodeConfig,
private storeCache: IStoreModelService
) {}

onApplicationShutdown(): void {
this.isShutdown = true;
Expand All @@ -40,15 +43,20 @@ export class PoiService implements OnApplicationShutdown {
};
}

get poiRepo(): CachePoiModel {
get poiRepo(): IPoi {
if (!this._poiRepo) {
throw new Error(`No poi repo inited`);
}
return this._poiRepo;
}

get plainPoiRepo(): PlainPoiModel {
return this.poiRepo.plainPoiModel;
if (this.poiRepo instanceof CachePoiModel) {
return this.poiRepo.plainPoiModel;
} else if (this.poiRepo instanceof PlainPoiModel) {
return this.poiRepo;
}
throw new Error(`No plainPoiRepo repo inited`);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export abstract class BaseProjectService<
await this.storeService.initCoreTables(this._schema);
await this.ensureMetadata();
// DynamicDsService is dependent on metadata so we need to ensure it exists first
await this.dynamicDsService.init(this.storeService.storeCache.metadata);
await this.dynamicDsService.init(this.storeService.storeModel.metadata);

/**
* WARNING: The order of the following steps is very important.
Expand Down Expand Up @@ -146,7 +146,7 @@ export abstract class BaseProjectService<
}

// Flush any pending operations to set up DB
await this.storeService.storeCache.flushCache(true);
await this.storeService.storeModel.flushData?.(true);
} else {
assert(startHeight, 'ProjectService must be initalized with a start height in workers');
this.projectUpgradeService.initWorker(startHeight, this.handleProjectChange.bind(this));
Expand Down Expand Up @@ -195,7 +195,7 @@ export abstract class BaseProjectService<
}

private async ensureMetadata(): Promise<void> {
const metadata = this.storeService.storeCache.metadata;
const metadata = this.storeService.storeModel.metadata;

this.eventEmitter.emit(IndexerEvent.NetworkMetadata, this.apiService.networkMeta);

Expand Down Expand Up @@ -269,7 +269,7 @@ export abstract class BaseProjectService<
}

protected async getLastProcessedHeight(): Promise<number | undefined> {
return this.storeService.storeCache.metadata.find('lastProcessedHeight');
return this.storeService.storeModel.metadata.find('lastProcessedHeight');
}

private async nextProcessHeight(): Promise<number | undefined> {
Expand Down
29 changes: 13 additions & 16 deletions packages/node-core/src/indexer/store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class StoreService {
constructor(
private sequelize: Sequelize,
private config: NodeConfig,
readonly storeCache: IStoreModelService,
readonly storeModel: IStoreModelService,
@Inject('ISubqueryProject') private subqueryProject: ISubqueryProject<IProjectNetworkConfig>
) {}

Expand Down Expand Up @@ -125,7 +125,7 @@ export class StoreService {
this._lastTimeDbSizeChecked = Date.now();
return getDbSizeAndUpdateMetadata(this.sequelize, this.schema);
} else {
return this.storeCache.metadata.find('dbSize').then((cachedDbSize) => {
return this.storeModel.metadata.find('dbSize').then((cachedDbSize) => {
if (cachedDbSize !== undefined) {
return cachedDbSize;
} else {
Expand Down Expand Up @@ -177,9 +177,9 @@ export class StoreService {
}
logger.info(`Historical state is ${this.historical ? 'enabled' : 'disabled'}`);

this.storeCache.init(this.historical, this.dbType === SUPPORT_DB.cockRoach, this.metaDataRepo, this.poiRepo);
this.storeModel.init(this.historical, this.dbType === SUPPORT_DB.cockRoach, this.metaDataRepo, this.poiRepo);

this._metadataModel = this.storeCache.metadata;
this._metadataModel = this.storeModel.metadata;

await this.initHotSchemaReloadQueries(schema);

Expand All @@ -199,13 +199,7 @@ export class StoreService {
On SyncSchema, if no schema migration is introduced, it would consider current schema to be null, and go all db operations again
every start up is a migration
*/
const schemaMigrationService = new SchemaMigrationService(
this.sequelize,
this,
this.storeCache._flushCache.bind(this.storeCache),
schema,
this.config
);
const schemaMigrationService = new SchemaMigrationService(this.sequelize, this, schema, this.config);

await schemaMigrationService.run(null, this.subqueryProject.schema, tx);

Expand Down Expand Up @@ -330,10 +324,13 @@ export class StoreService {
{type: QueryTypes.SELECT}
);

const store = res.reduce(function (total, current) {
total[current.key] = current.value;
return total;
}, {} as {[key: string]: string | boolean});
const store = res.reduce(
function (total, current) {
total[current.key] = current.value;
return total;
},
{} as {[key: string]: string | boolean}
);

const useHistorical =
store.historicalStateEnabled === undefined ? !disableHistorical : (store.historicalStateEnabled as boolean);
Expand Down Expand Up @@ -476,7 +473,7 @@ group by
}

getStore(): Store {
return new Store(this.config, this.storeCache, this);
return new Store(this.config, this.storeModel, this);
}
}

Expand Down
24 changes: 12 additions & 12 deletions packages/node-core/src/indexer/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ type Context = {
export class Store implements IStore {
/* These need to explicily be private using JS style private properties in order to not leak these in the sandbox */
#config: NodeConfig;
#storeCache: IStoreModelService;
#storeModel: IStoreModelService;
#context: Context;

constructor(config: NodeConfig, storeCache: IStoreModelService, context: Context) {
constructor(config: NodeConfig, storeModel: IStoreModelService, context: Context) {
this.#config = config;
this.#storeCache = storeCache;
this.#storeModel = storeModel;
this.#context = context;
}

Expand All @@ -52,7 +52,7 @@ export class Store implements IStore {

async get<T extends Entity>(entity: string, id: string): Promise<T | undefined> {
try {
const raw = await this.#storeCache.getModel<T>(entity).get(id);
const raw = await this.#storeModel.getModel<T>(entity).get(id);
monitorWrite(`-- [Store][get] Entity ${entity} ID ${id}, data: ${handledStringify(raw)}`);
return EntityClass.create<T>(entity, raw, this);
} catch (e) {
Expand All @@ -72,7 +72,7 @@ export class Store implements IStore {

this.#queryLimitCheck('getByField', entity, options);

const raw = await this.#storeCache
const raw = await this.#storeModel
.getModel<T>(entity)
.getByFields([Array.isArray(value) ? [field, 'in', value] : [field, '=', value]], options);
monitorWrite(`-- [Store][getByField] Entity ${entity}, data: ${handledStringify(raw)}`);
Expand All @@ -98,7 +98,7 @@ export class Store implements IStore {

this.#queryLimitCheck('getByFields', entity, options);

const raw = await this.#storeCache.getModel<T>(entity).getByFields(filter, options);
const raw = await this.#storeModel.getModel<T>(entity).getByFields(filter, options);
monitorWrite(`-- [Store][getByFields] Entity ${entity}, data: ${handledStringify(raw)}`);
return raw.map((v) => EntityClass.create<T>(entity, v, this)) as T[];
} catch (e) {
Expand All @@ -110,7 +110,7 @@ export class Store implements IStore {
try {
const indexed = this.#context.isIndexedHistorical(entity, field as string);
assert(indexed, `to query by field ${String(field)}, a unique index must be created on model ${entity}`);
const [raw] = await this.#storeCache
const [raw] = await this.#storeModel
.getModel<T>(entity)
.getByFields([Array.isArray(value) ? [field, 'in', value] : [field, '=', value]], {limit: 1});
monitorWrite(`-- [Store][getOneByField] Entity ${entity}, data: ${handledStringify(raw)}`);
Expand All @@ -122,7 +122,7 @@ export class Store implements IStore {

async set(entity: string, _id: string, data: Entity): Promise<void> {
try {
await this.#storeCache.getModel(entity).set(_id, data, this.#context.blockHeight, this.#context.transaction);
await this.#storeModel.getModel(entity).set(_id, data, this.#context.blockHeight, this.#context.transaction);
monitorWrite(
`-- [Store][set] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
);
Expand All @@ -134,7 +134,7 @@ export class Store implements IStore {

async bulkCreate(entity: string, data: Entity[]): Promise<void> {
try {
await this.#storeCache.getModel(entity).bulkCreate(data, this.#context.blockHeight, this.#context.transaction);
await this.#storeModel.getModel(entity).bulkCreate(data, this.#context.blockHeight, this.#context.transaction);
for (const item of data) {
this.#context.operationStack?.put(OperationType.Set, entity, item);
}
Expand All @@ -148,7 +148,7 @@ export class Store implements IStore {

async bulkUpdate(entity: string, data: Entity[], fields?: string[]): Promise<void> {
try {
await this.#storeCache
await this.#storeModel
.getModel(entity)
.bulkUpdate(data, this.#context.blockHeight, fields, this.#context.transaction);
for (const item of data) {
Expand All @@ -164,7 +164,7 @@ export class Store implements IStore {

async remove(entity: string, id: string): Promise<void> {
try {
await this.#storeCache.getModel(entity).bulkRemove([id], this.#context.blockHeight, this.#context.transaction);
await this.#storeModel.getModel(entity).bulkRemove([id], this.#context.blockHeight, this.#context.transaction);
this.#context.operationStack?.put(OperationType.Remove, entity, id);
monitorWrite(`-- [Store][remove] Entity ${entity}, height: ${this.#context.blockHeight}, id: ${id}`);
} catch (e) {
Expand All @@ -174,7 +174,7 @@ export class Store implements IStore {

async bulkRemove(entity: string, ids: string[]): Promise<void> {
try {
await this.#storeCache.getModel(entity).bulkRemove(ids, this.#context.blockHeight, this.#context.transaction);
await this.#storeModel.getModel(entity).bulkRemove(ids, this.#context.blockHeight, this.#context.transaction);

for (const id of ids) {
this.#context.operationStack?.put(OperationType.Remove, entity, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export abstract class BaseCacheService
}

@profiler()
async flushCache(forceFlush?: boolean): Promise<void> {
async flushData(forceFlush?: boolean): Promise<void> {
const flushCacheGuarded = async (forceFlush?: boolean): Promise<void> => {
// When we force flush, this will ensure not interrupt current block flushing,
// Force flush will continue after last block flush tx committed.
Expand All @@ -55,12 +55,12 @@ export abstract class BaseCacheService
return this.queuedFlush;
}

async resetCache(): Promise<void> {
async resetData(): Promise<void> {
await this._resetCache();
}

async beforeApplicationShutdown(): Promise<void> {
await timeout(this.flushCache(true), 60, 'Before shutdown flush cache timeout');
await timeout(this.flushData(true), 60, 'Before shutdown flush cache timeout');
this.logger.info(`Force flush cache successful!`);
await this.flushExportStores();
this.logger.info(`Force flush exports successful!`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export interface IMetadata {
setNewDynamicDatasource(item: DatasourceParams, tx?: Transaction): Promise<void>;

bulkRemove<K extends MetadataKey>(keys: K[], tx?: Transaction): Promise<void>;

flush?(tx: Transaction, blockHeight: number): Promise<void>;
}

export class MetadataModel implements IMetadata {
Expand Down
Loading