From 642a440b77e4eb70053acc6a9a2cf733c875d4f9 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Wed, 23 Aug 2023 17:22:17 +1200 Subject: [PATCH 1/3] Add skipBlock option to improve performance with event handlers --- .../src/project/versioned/v1_0_0/models.ts | 3 + .../src/project/versioned/v1_0_0/types.ts | 1 + packages/node-core/src/api.service.ts | 11 ++- .../node-core/src/configure/NodeConfig.ts | 6 ++ .../src/configure/configure.module.ts | 2 +- .../src/indexer/dictionary.service.test.ts | 1 + .../node-core/src/indexer/indexer.manager.ts | 2 +- .../node-core/src/indexer/project.service.ts | 6 ++ packages/node-core/src/indexer/test.runner.ts | 4 +- .../node-core/src/indexer/testing.service.ts | 2 +- packages/node-core/src/utils/configure.ts | 21 +++-- packages/node/src/indexer/api.service.test.ts | 14 ++-- packages/node/src/indexer/api.service.ts | 73 ++++++++++++----- .../node/src/indexer/apiPromise.connection.ts | 17 ++-- .../block-dispatcher.service.ts | 18 +++-- packages/node/src/indexer/indexer.manager.ts | 67 +++++++++------- packages/node/src/indexer/project.service.ts | 5 ++ packages/node/src/indexer/types.ts | 13 +++ .../src/indexer/unfinalizedBlocks.service.ts | 6 +- .../node/src/indexer/worker/worker.service.ts | 14 ++-- .../node/src/subcommands/testing.module.ts | 27 +------ .../node/src/subcommands/testing.service.ts | 6 +- packages/node/src/utils/project.ts | 29 +++++++ packages/node/src/utils/substrate.ts | 80 ++++++++++++++++++- packages/node/src/yargs.ts | 12 ++- packages/types/src/interfaces.ts | 19 ++++- packages/types/src/project.ts | 4 +- 27 files changed, 335 insertions(+), 128 deletions(-) diff --git a/packages/common/src/project/versioned/v1_0_0/models.ts b/packages/common/src/project/versioned/v1_0_0/models.ts index 6a8c9b4bcc..acc87547ff 100644 --- a/packages/common/src/project/versioned/v1_0_0/models.ts +++ b/packages/common/src/project/versioned/v1_0_0/models.ts @@ -51,6 +51,9 @@ export class RunnerNodeOptionsModel implements NodeOptions { @IsOptional() @IsBoolean() unfinalizedBlocks?: boolean; + @IsOptional() + @IsBoolean() + skipBlock?: boolean; } export class BlockFilterImpl implements BlockFilter { diff --git a/packages/common/src/project/versioned/v1_0_0/types.ts b/packages/common/src/project/versioned/v1_0_0/types.ts index b647ad23a9..164517b9c1 100644 --- a/packages/common/src/project/versioned/v1_0_0/types.ts +++ b/packages/common/src/project/versioned/v1_0_0/types.ts @@ -24,6 +24,7 @@ export interface NodeOptions { historical?: boolean; unsafe?: boolean; unfinalizedBlocks?: boolean; + skipBlock?: boolean; } export interface ParentProject { diff --git a/packages/node-core/src/api.service.ts b/packages/node-core/src/api.service.ts index 4e2f35275a..ae3f849fc6 100644 --- a/packages/node-core/src/api.service.ts +++ b/packages/node-core/src/api.service.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: GPL-3.0 import {ApiConnectionError, ApiErrorType} from './api.connection.error'; -import {NodeConfig} from './configure'; import {NetworkMetadataPayload} from './events'; import {ConnectionPoolService} from './indexer'; import {getLogger} from './logger'; @@ -11,25 +10,25 @@ const logger = getLogger('api'); const MAX_RECONNECT_ATTEMPTS = 5; -export interface IApi { - fetchBlocks(heights: number[], ...args: any): Promise; +export interface IApi = any[]> { + fetchBlocks(heights: number[], ...args: any): Promise; safeApi(height: number): SA; unsafeApi: A; networkMeta: NetworkMetadataPayload; } -export interface IApiConnectionSpecific extends IApi { +export interface IApiConnectionSpecific = any[]> extends IApi { handleError(error: Error): ApiConnectionError; apiConnect(): Promise; apiDisconnect(): Promise; } -export abstract class ApiService implements IApi { +export abstract class ApiService = any[]> implements IApi { constructor(protected connectionPoolService: ConnectionPoolService>) {} abstract networkMeta: NetworkMetadataPayload; - async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise { + async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise { let reconnectAttempts = 0; while (reconnectAttempts < numAttempts) { try { diff --git a/packages/node-core/src/configure/NodeConfig.ts b/packages/node-core/src/configure/NodeConfig.ts index f53bd4645b..1563bf91e6 100644 --- a/packages/node-core/src/configure/NodeConfig.ts +++ b/packages/node-core/src/configure/NodeConfig.ts @@ -59,6 +59,7 @@ export interface IConfig { readonly storeFlushInterval: number; readonly isTest?: boolean; readonly root?: string; + readonly skipBlock?: boolean; } export type MinConfig = Partial> & Pick; @@ -86,6 +87,7 @@ const DEFAULT_CONFIG = { storeGetCacheSize: 500, storeCacheAsync: true, storeFlushInterval: 5, + skipBlock: false, }; export class NodeConfig implements IConfig { @@ -273,6 +275,10 @@ export class NodeConfig implements IConfig { return !!this.scaleBatchSize; } + get skipBlock(): boolean { + return !!this._config.skipBlock; + } + get postgresCACert(): string | undefined { if (!this._config.pgCa) { return undefined; diff --git a/packages/node-core/src/configure/configure.module.ts b/packages/node-core/src/configure/configure.module.ts index db45878a8a..9bd4effb42 100644 --- a/packages/node-core/src/configure/configure.module.ts +++ b/packages/node-core/src/configure/configure.module.ts @@ -105,7 +105,7 @@ export async function registerApp

( config = NodeConfig.rebaseWithArgs(config, yargsToIConfig(argv, nameMapping), isTest); } else { if (!argv.subquery) { - logger.error('Subquery path is missing neither in cli options nor in config file'); + logger.error('Subquery path is missing in both cli options and config file'); showHelp(); process.exit(1); } diff --git a/packages/node-core/src/indexer/dictionary.service.test.ts b/packages/node-core/src/indexer/dictionary.service.test.ts index 1ef2197a21..4a707bfa2b 100644 --- a/packages/node-core/src/indexer/dictionary.service.test.ts +++ b/packages/node-core/src/indexer/dictionary.service.test.ts @@ -110,6 +110,7 @@ const nodeConfig = new NodeConfig({ subqueryName: 'asdf', networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'], dictionaryTimeout: 10, + // dictionaryResolver: 'https://kepler-auth.subquery.network' }); jest.setTimeout(10000); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 72697dd020..e93baa2578 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -34,7 +34,7 @@ export abstract class BaseIndexerManager< SA, // Api Type A, // SafeApi Type B, // Block Type - API extends IApi, + API extends IApi, DS extends BaseDataSource, CDS extends DS & BaseCustomDataSource, // Custom datasource FilterMap extends FilterTypeMap, diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index f7a4f6124b..3092e648f8 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -36,6 +36,7 @@ export abstract class BaseProjectService; + protected abstract onProjectChange(project: ISubqueryProject): void | Promise; constructor( private readonly dsProcessorService: BaseDsProcessorService, @@ -331,8 +332,13 @@ export abstract class BaseProjectService { private passedTests = 0; private failedTests = 0; constructor( - @Inject('IApi') protected readonly apiService: IApi, + @Inject('IApi') protected readonly apiService: IApi, protected readonly storeService: StoreService, protected readonly sequelize: Sequelize, protected readonly nodeConfig: NodeConfig, @@ -42,7 +42,7 @@ export class TestRunner { block: B, handler: string, indexerManager: IIndexerManager, - apiService?: IApi + apiService?: IApi ) => Promise ): Promise<{ passedTests: number; diff --git a/packages/node-core/src/indexer/testing.service.ts b/packages/node-core/src/indexer/testing.service.ts index 98fa652225..61037c0df8 100644 --- a/packages/node-core/src/indexer/testing.service.ts +++ b/packages/node-core/src/indexer/testing.service.ts @@ -59,7 +59,7 @@ export abstract class TestingService { block: B, handler: string, indexerManager: IIndexerManager, - apiService?: IApi + apiService?: IApi ): Promise { await indexerManager.indexBlock(block, this.getDsWithHandler(handler)); } diff --git a/packages/node-core/src/utils/configure.ts b/packages/node-core/src/utils/configure.ts index 56896e0630..bcd30d18e0 100644 --- a/packages/node-core/src/utils/configure.ts +++ b/packages/node-core/src/utils/configure.ts @@ -12,6 +12,7 @@ export interface ArgvOverrideOptions { unsafe?: boolean; disableHistorical?: boolean; unfinalizedBlocks?: boolean; + skipBlock?: boolean; } export function defaultSubqueryName(config: Partial): MinConfig { @@ -29,20 +30,28 @@ export function defaultSubqueryName(config: Partial): MinConfig { } as MinConfig; } +function applyArgs( + argvs: ArgvOverrideOptions, + options: RunnerNodeOptionsModel, + key: keyof Omit +) { + if (argvs[key] === undefined && options[key] !== undefined) { + argvs[key] = options[key]; + } +} + export function rebaseArgsWithManifest(argvs: ArgvOverrideOptions, rawManifest: unknown): void { const options = plainToClass(RunnerNodeOptionsModel, (rawManifest as any)?.runner?.node?.options); if (!options) { return; } + // we override them if they are not provided in args/flag - if (argvs.unsafe === undefined && options.unsafe !== undefined) { - argvs.unsafe = options.unsafe; - } if (argvs.disableHistorical === undefined && options.historical !== undefined) { // THIS IS OPPOSITE argvs.disableHistorical = !options.historical; } - if (argvs.unfinalizedBlocks === undefined && options.unfinalizedBlocks !== undefined) { - argvs.unfinalizedBlocks = options.unfinalizedBlocks; - } + applyArgs(argvs, options, 'unsafe'); + applyArgs(argvs, options, 'unfinalizedBlocks'); + applyArgs(argvs, options, 'skipBlock'); } diff --git a/packages/node/src/indexer/api.service.test.ts b/packages/node/src/indexer/api.service.test.ts index 1bc9298b2a..5f5315bcc4 100644 --- a/packages/node/src/indexer/api.service.test.ts +++ b/packages/node/src/indexer/api.service.test.ts @@ -102,7 +102,7 @@ describe('ApiService', () => { const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; const runtimeVersion = { specVersion: 1 } as unknown as RuntimeVersion; const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); const [patchedValidators, currentValidators] = await Promise.all([ @@ -121,7 +121,7 @@ describe('ApiService', () => { const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; const runtimeVersion = { specVersion: 13 } as unknown as RuntimeVersion; const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); const apiResults = await api.query.staking.erasStakers.at( @@ -151,7 +151,7 @@ describe('ApiService', () => { const apiResults = await api.rpc.state.getRuntimeVersion(earlyBlockhash); // step 2, api get patched result with block height const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); const patchedResult = await patchedApi.rpc.state.getRuntimeVersion( @@ -179,7 +179,7 @@ describe('ApiService', () => { const futureBlockhash = await api.rpc.chain.getBlockHash(6721195); // step 2, api get patched result with block height const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); await expect( @@ -206,7 +206,7 @@ describe('ApiService', () => { const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; const runtimeVersion = { specVersion: 28 } as unknown as RuntimeVersion; const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); expect( @@ -379,7 +379,7 @@ describe('ApiService', () => { const runtimeVersion = { specVersion: 1 } as unknown as RuntimeVersion; const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); await expect(patchedApi.query.system.events()).resolves.toHaveLength(2); @@ -399,7 +399,7 @@ describe('ApiService', () => { const runtimeVersion = { specVersion: 1103 } as unknown as RuntimeVersion; const patchedApi = await apiService.getPatchedApi( - mockBlock, + mockBlock.block.header, runtimeVersion, ); /* If Block number not provided should be ignored and `blockNumber` above used */ diff --git a/packages/node/src/indexer/api.service.ts b/packages/node/src/indexer/api.service.ts index 3858314a23..6f1d3efc39 100644 --- a/packages/node/src/indexer/api.service.ts +++ b/packages/node/src/indexer/api.service.ts @@ -5,7 +5,7 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { ApiPromise } from '@polkadot/api'; import { RpcMethodResult } from '@polkadot/api/types'; -import { RuntimeVersion } from '@polkadot/types/interfaces'; +import { RuntimeVersion, Header } from '@polkadot/types/interfaces'; import { AnyFunction, DefinitionRpcExt } from '@polkadot/types/types'; import { IndexerEvent, @@ -16,11 +16,11 @@ import { ConnectionPoolService, ApiService as BaseApiService, } from '@subql/node-core'; -import { SubstrateBlock } from '@subql/types'; import { SubqueryProject } from '../configure/SubqueryProject'; +import { isOnlyEventHandlers } from '../utils/project'; import * as SubstrateUtil from '../utils/substrate'; -import { ApiPromiseConnection } from './apiPromise.connection'; -import { ApiAt, BlockContent } from './types'; +import { ApiPromiseConnection, FetchFunc } from './apiPromise.connection'; +import { ApiAt, BlockContent, LightBlockContent } from './types'; const NOT_SUPPORT = (name: string) => () => { throw new Error(`${name}() is not supported`); @@ -34,10 +34,14 @@ const logger = getLogger('api'); @Injectable() export class ApiService - extends BaseApiService + extends BaseApiService< + ApiPromise, + ApiAt, + BlockContent[] | LightBlockContent[] + > implements OnApplicationShutdown { - private fetchBlocksBatches = SubstrateUtil.fetchBlocksBatches; + private fetchBlocksBatches: FetchFunc; private currentBlockHash: string; private currentBlockNumber: number; networkMeta: NetworkMetadataPayload; @@ -49,6 +53,8 @@ export class ApiService private nodeConfig: NodeConfig, ) { super(connectionPoolService); + + this.updateBlockFetching(); } async onApplicationShutdown(): Promise { @@ -81,14 +87,6 @@ export class ApiService process.exit(1); } - if (this.nodeConfig?.profiler) { - this.fetchBlocksBatches = profilerWrap( - SubstrateUtil.fetchBlocksBatches, - 'SubstrateUtil', - 'fetchBlocksBatches', - ); - } - const endpointToApiIndex: Record = {}; if (chainTypes) { @@ -162,16 +160,55 @@ export class ApiService return this; } + updateBlockFetching(): void { + const onlyEventHandlers = isOnlyEventHandlers(this.project); + const skipBlock = this.nodeConfig.skipBlock && onlyEventHandlers; + + if (this.nodeConfig.skipBlock) { + if (onlyEventHandlers) { + logger.info( + 'skipBlock is enabled, only events and block headers will be fetched.', + ); + } else { + logger.info( + `skipBlock is disabled, the project contains handlers that aren't event handlers.`, + ); + } + } else { + if (onlyEventHandlers) { + logger.warn( + 'skipBlock is disabled, the project contains only event handlers, it could be enabled to improve indexing performance.', + ); + } else { + logger.info(`skipBlock is disabled.`); + } + } + + const fetchFunc = skipBlock + ? SubstrateUtil.fetchBlocksBatchesLight + : SubstrateUtil.fetchBlocksBatches; + + if (this.nodeConfig?.profiler) { + this.fetchBlocksBatches = profilerWrap( + fetchFunc, + 'SubstrateUtil', + 'fetchBlocksBatches', + ); + } else { + this.fetchBlocksBatches = fetchFunc; + } + } + get api(): ApiPromise { return this.unsafeApi; } async getPatchedApi( - block: SubstrateBlock, + header: Header, runtimeVersion: RuntimeVersion, ): Promise { - this.currentBlockHash = block.block.hash.toString(); - this.currentBlockNumber = block.block.header.number.toNumber(); + this.currentBlockHash = header.hash.toString(); + this.currentBlockNumber = header.number.toNumber(); const api = this.api; const apiAt = (await api.at( @@ -267,7 +304,7 @@ export class ApiService heights: number[], overallSpecVer?: number, numAttempts = MAX_RECONNECT_ATTEMPTS, - ): Promise { + ): Promise { let reconnectAttempts = 0; while (reconnectAttempts < numAttempts) { try { diff --git a/packages/node/src/indexer/apiPromise.connection.ts b/packages/node/src/indexer/apiPromise.connection.ts index 06aeba1ae1..59cd4dd29c 100644 --- a/packages/node/src/indexer/apiPromise.connection.ts +++ b/packages/node/src/indexer/apiPromise.connection.ts @@ -16,7 +16,7 @@ import { IApiConnectionSpecific, } from '@subql/node-core'; import * as SubstrateUtil from '../utils/substrate'; -import { ApiAt, BlockContent } from './types'; +import { ApiAt, BlockContent, LightBlockContent } from './types'; import { createCachedProvider } from './x-provider/cachedProvider'; import { HttpProvider } from './x-provider/http'; @@ -25,10 +25,17 @@ const { version: packageVersion } = require('../../package.json'); const RETRY_DELAY = 2_500; -type FetchFunc = typeof SubstrateUtil.fetchBlocksBatches; +export type FetchFunc = + | typeof SubstrateUtil.fetchBlocksBatches + | typeof SubstrateUtil.fetchBlocksBatchesLight; export class ApiPromiseConnection - implements IApiConnectionSpecific + implements + IApiConnectionSpecific< + ApiPromise, + ApiAt, + BlockContent[] | LightBlockContent[] + > { readonly networkMeta: NetworkMetadataPayload; @@ -88,11 +95,11 @@ export class ApiPromiseConnection async fetchBlocks( heights: number[], overallSpecVer?: number, - ): Promise { + ): Promise { const blocks = await this.fetchBlocksBatches( this.unsafeApi, heights, - overallSpecVer, + // overallSpecVer, ); return blocks; } diff --git a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts index 41a8b13456..e01b45a9a4 100644 --- a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts @@ -20,14 +20,14 @@ import { ApiService } from '../api.service'; import { DynamicDsService } from '../dynamic-ds.service'; import { IndexerManager } from '../indexer.manager'; import { RuntimeService } from '../runtime/runtimeService'; -import { BlockContent } from '../types'; +import { BlockContent, isFullBlock, LightBlockContent } from '../types'; /** * @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing */ @Injectable() export class BlockDispatcherService - extends BlockDispatcher + extends BlockDispatcher implements OnApplicationShutdown { private runtimeService: RuntimeService; @@ -59,7 +59,9 @@ export class BlockDispatcherService poiService, project, dynamicDsService, - async (blockNums: number[]): Promise => { + async ( + blockNums: number[], + ): Promise => { const specChanged = await this.runtimeService.specChanged( blockNums[blockNums.length - 1], ); @@ -82,16 +84,16 @@ export class BlockDispatcherService this.runtimeService = runtimeService; } - protected getBlockHeight(block: BlockContent): number { + protected getBlockHeight(block: BlockContent | LightBlockContent): number { return block.block.block.header.number.toNumber(); } protected async indexBlock( - block: BlockContent, + block: BlockContent | LightBlockContent, ): Promise { - const runtimeVersion = await this.runtimeService.getRuntimeVersion( - block.block, - ); + const runtimeVersion = !isFullBlock(block) + ? undefined + : await this.runtimeService.getRuntimeVersion(block.block); return this.indexerManager.indexBlock( block, await this.projectService.getDataSources(this.getBlockHeight(block)), diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index dbc8929f70..2d4f271afe 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -23,6 +23,7 @@ import { BaseIndexerManager, } from '@subql/node-core'; import { + LightSubstrateEvent, SubstrateBlock, SubstrateBlockFilter, SubstrateDatasource, @@ -39,7 +40,7 @@ import { import { DynamicDsService } from './dynamic-ds.service'; import { ProjectService } from './project.service'; import { SandboxService } from './sandbox.service'; -import { ApiAt, BlockContent } from './types'; +import { ApiAt, BlockContent, isFullBlock, LightBlockContent } from './types'; import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; const logger = getLogger('indexer'); @@ -48,7 +49,7 @@ const logger = getLogger('indexer'); export class IndexerManager extends BaseIndexerManager< ApiAt, ApiPromise, - BlockContent, + BlockContent | LightBlockContent, ApiService, SubstrateDatasource, SubstrateCustomDataSource, @@ -88,7 +89,7 @@ export class IndexerManager extends BaseIndexerManager< @profiler() async indexBlock( - block: BlockContent, + block: BlockContent | LightBlockContent, dataSources: SubstrateDatasource[], runtimeVersion: RuntimeVersion, ): Promise { @@ -97,52 +98,62 @@ export class IndexerManager extends BaseIndexerManager< ); } - getBlockHeight(block: BlockContent): number { + getBlockHeight(block: LightBlockContent | BlockContent): number { return block.block.block.header.number.toNumber(); } - getBlockHash(block: BlockContent): string { + getBlockHash(block: LightBlockContent | BlockContent): string { return block.block.block.header.hash.toHex(); } // eslint-disable-next-line @typescript-eslint/require-await private async getApi( - block: BlockContent, + block: LightBlockContent | BlockContent, runtimeVersion: RuntimeVersion, ): Promise { - return this.apiService.getPatchedApi(block.block, runtimeVersion); + return this.apiService.getPatchedApi( + block.block.block.header, + runtimeVersion, + ); } protected async indexBlockData( - { block, events, extrinsics }: BlockContent, + blockContent: LightBlockContent | BlockContent, dataSources: SubstrateProjectDs[], getVM: (d: SubstrateProjectDs) => Promise, ): Promise { - await this.indexBlockContent(block, dataSources, getVM); + if (isFullBlock(blockContent)) { + const { block, events, extrinsics } = blockContent; + await this.indexBlockContent(block, dataSources, getVM); - // Run initialization events - const initEvents = events.filter((evt) => evt.phase.isInitialization); - for (const event of initEvents) { - await this.indexEvent(event, dataSources, getVM); - } + // Run initialization events + const initEvents = events.filter((evt) => evt.phase.isInitialization); + for (const event of initEvents) { + await this.indexEvent(event, dataSources, getVM); + } - for (const extrinsic of extrinsics) { - await this.indexExtrinsic(extrinsic, dataSources, getVM); + for (const extrinsic of extrinsics) { + await this.indexExtrinsic(extrinsic, dataSources, getVM); - // Process extrinsic events - const extrinsicEvents = events - .filter((e) => e.extrinsic?.idx === extrinsic.idx) - .sort((a, b) => a.idx - b.idx); + // Process extrinsic events + const extrinsicEvents = events + .filter((e) => e.extrinsic?.idx === extrinsic.idx) + .sort((a, b) => a.idx - b.idx); - for (const event of extrinsicEvents) { - await this.indexEvent(event, dataSources, getVM); + for (const event of extrinsicEvents) { + await this.indexEvent(event, dataSources, getVM); + } } - } - // Run finalization events - const finalizeEvents = events.filter((evt) => evt.phase.isFinalization); - for (const event of finalizeEvents) { - await this.indexEvent(event, dataSources, getVM); + // Run finalization events + const finalizeEvents = events.filter((evt) => evt.phase.isFinalization); + for (const event of finalizeEvents) { + await this.indexEvent(event, dataSources, getVM); + } + } else { + for (const event of blockContent.events) { + await this.indexEvent(event, dataSources, getVM); + } } } @@ -167,7 +178,7 @@ export class IndexerManager extends BaseIndexerManager< } private async indexEvent( - event: SubstrateEvent, + event: SubstrateEvent | LightSubstrateEvent, dataSources: SubstrateProjectDs[], getVM: (d: SubstrateProjectDs) => Promise, ): Promise { diff --git a/packages/node/src/indexer/project.service.ts b/packages/node/src/indexer/project.service.ts index ca5977f8e6..affbe1700a 100644 --- a/packages/node/src/indexer/project.service.ts +++ b/packages/node/src/indexer/project.service.ts @@ -15,6 +15,7 @@ import { import { SubstrateDatasource } from '@subql/types'; import { Sequelize } from '@subql/x-sequelize'; import { SubqueryProject } from '../configure/SubqueryProject'; +import { isOnlyEventHandlers } from '../utils/project'; import { getBlockByHeight, getTimestamp } from '../utils/substrate'; import { ApiService } from './api.service'; import { DsProcessorService } from './ds-processor.service'; @@ -68,4 +69,8 @@ export class ProjectService extends BaseProjectService< const block = await getBlockByHeight(this.apiService.api, height); return getTimestamp(block); } + + protected onProjectChange(project: SubqueryProject): void | Promise { + this.apiService.updateBlockFetching(); + } } diff --git a/packages/node/src/indexer/types.ts b/packages/node/src/indexer/types.ts index 3e58fdde34..b8a60dd3cb 100644 --- a/packages/node/src/indexer/types.ts +++ b/packages/node/src/indexer/types.ts @@ -5,6 +5,8 @@ import { ApiPromise } from '@polkadot/api'; import { ApiDecoration } from '@polkadot/api/types'; import type { HexString } from '@polkadot/util/types'; import { + BlockHeader, + LightSubstrateEvent, SubstrateBlock, SubstrateEvent, SubstrateExtrinsic, @@ -16,6 +18,17 @@ export interface BlockContent { events: SubstrateEvent[]; } +export interface LightBlockContent { + block: BlockHeader; // A subset of SubstrateBlock + events: LightSubstrateEvent[]; +} + export type BestBlocks = Record; export type ApiAt = ApiDecoration<'promise'> & { rpc: ApiPromise['rpc'] }; + +export function isFullBlock( + block: BlockContent | LightBlockContent, +): block is BlockContent { + return (block as BlockContent).extrinsics !== undefined; +} diff --git a/packages/node/src/indexer/unfinalizedBlocks.service.ts b/packages/node/src/indexer/unfinalizedBlocks.service.ts index fbeda71ab3..349102ad81 100644 --- a/packages/node/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node/src/indexer/unfinalizedBlocks.service.ts @@ -10,7 +10,7 @@ import { StoreCacheService, } from '@subql/node-core'; import { ApiService } from './api.service'; -import { BlockContent } from './types'; +import { BlockContent, LightBlockContent } from './types'; export function substrateHeaderToHeader(header: SubstrateHeader): Header { return { @@ -21,7 +21,9 @@ export function substrateHeaderToHeader(header: SubstrateHeader): Header { } @Injectable() -export class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService { +export class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService< + BlockContent | LightBlockContent +> { constructor( private readonly apiService: ApiService, nodeConfig: NodeConfig, diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index 0b7dc3c38d..cde6961ad7 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -14,13 +14,13 @@ import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary.service'; import { IndexerManager } from '../indexer.manager'; import { WorkerRuntimeService } from '../runtime/workerRuntimeService'; -import { BlockContent } from '../types'; +import { BlockContent, isFullBlock, LightBlockContent } from '../types'; export type FetchBlockResponse = { specVersion: number; parentHash: string }; @Injectable() export class WorkerService extends BaseWorkerService< - BlockContent, + BlockContent | LightBlockContent, FetchBlockResponse, SubstrateDatasource, { specVersion: number } @@ -41,7 +41,7 @@ export class WorkerService extends BaseWorkerService< protected async fetchChainBlock( height: number, { specVersion }, - ): Promise { + ): Promise { const specChanged = await this.workerRuntimeService.specChanged( height, specVersion, @@ -63,12 +63,12 @@ export class WorkerService extends BaseWorkerService< } protected async processFetchedBlock( - block: BlockContent, + block: BlockContent | LightBlockContent, dataSources: SubstrateDatasource[], ): Promise { - const runtimeVersion = await this.workerRuntimeService.getRuntimeVersion( - block.block, - ); + const runtimeVersion = !isFullBlock(block) + ? undefined + : await this.workerRuntimeService.getRuntimeVersion(block.block); return this.indexerManager.indexBlock(block, dataSources, runtimeVersion); } diff --git a/packages/node/src/subcommands/testing.module.ts b/packages/node/src/subcommands/testing.module.ts index c6d1e3efb9..3ca82ad8dc 100644 --- a/packages/node/src/subcommands/testing.module.ts +++ b/packages/node/src/subcommands/testing.module.ts @@ -14,9 +14,7 @@ import { TestRunner, } from '@subql/node-core'; import { ConfigureModule } from '../configure/configure.module'; -import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from '../indexer/api.service'; -import { ApiPromiseConnection } from '../indexer/apiPromise.connection'; import { DsProcessorService } from '../indexer/ds-processor.service'; import { DynamicDsService } from '../indexer/dynamic-ds.service'; import { FetchModule } from '../indexer/fetch.module'; @@ -42,30 +40,7 @@ import { MetaModule } from '../meta/meta.module'; provide: 'IProjectService', useClass: ProjectService, }, - { - provide: ApiService, - useFactory: async ( - project: SubqueryProject, - connectionPoolService: ConnectionPoolService, - eventEmitter: EventEmitter2, - config: NodeConfig, - ) => { - const apiService = new ApiService( - project, - connectionPoolService, - eventEmitter, - config, - ); - await apiService.init(); - return apiService; - }, - inject: [ - 'ISubqueryProject', - ConnectionPoolService, - EventEmitter2, - NodeConfig, - ], - }, + ApiService, SchedulerRegistry, TestRunner, { diff --git a/packages/node/src/subcommands/testing.service.ts b/packages/node/src/subcommands/testing.service.ts index f9b66f4164..8aa4e9d429 100644 --- a/packages/node/src/subcommands/testing.service.ts +++ b/packages/node/src/subcommands/testing.service.ts @@ -16,14 +16,14 @@ import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from '../indexer/api.service'; import { IndexerManager } from '../indexer/indexer.manager'; import { ProjectService } from '../indexer/project.service'; -import { ApiAt, BlockContent } from '../indexer/types'; +import { ApiAt, BlockContent, LightBlockContent } from '../indexer/types'; import { TestingModule } from './testing.module'; @Injectable() export class TestingService extends BaseTestingService< ApiPromise, ApiAt, - BlockContent, + BlockContent | LightBlockContent, SubqlProjectDs > { constructor( @@ -61,7 +61,7 @@ export class TestingService extends BaseTestingService< } async indexBlock( - block: BlockContent, + block: BlockContent | LightBlockContent, handler: string, indexerManager: IndexerManager, apiService: ApiService, diff --git a/packages/node/src/utils/project.ts b/packages/node/src/utils/project.ts index d3cdd2d545..1c2ab04fd8 100644 --- a/packages/node/src/utils/project.ts +++ b/packages/node/src/utils/project.ts @@ -11,11 +11,17 @@ import { SubstrateCustomHandler, SubstrateHandler, SubstrateHandlerKind, + isRuntimeDs, + isCustomDs, } from '@subql/common-substrate'; import { saveFile } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; import yaml from 'js-yaml'; import { NodeVM, VMScript } from 'vm2'; +import { + SubqueryProject, + SubstrateProjectDs, +} from '../configure/SubqueryProject'; export function isBaseHandler( handler: SubstrateHandler, @@ -112,3 +118,26 @@ export function loadChainTypesFromJs( } return rawContent; } + +function dsContainsNonEventHandlers(ds: SubstrateProjectDs): boolean { + if (isRuntimeDs(ds)) { + return !!ds.mapping.handlers.find( + (handler) => handler.kind !== SubstrateHandlerKind.Event, + ); + } else if (isCustomDs(ds)) { + // TODO this can be improved upon in the future. + return true; + } + return true; +} + +export function isOnlyEventHandlers(project: SubqueryProject): boolean { + const hasNonEventHandler = !!project.dataSources.find((ds) => + dsContainsNonEventHandlers(ds), + ); + const hasNonEventTemplate = !!project.templates.find((ds) => + dsContainsNonEventHandlers(ds), + ); + + return !hasNonEventHandler && !hasNonEventTemplate; +} diff --git a/packages/node/src/utils/substrate.ts b/packages/node/src/utils/substrate.ts index a869af0eec..929fd4a8b1 100644 --- a/packages/node/src/utils/substrate.ts +++ b/packages/node/src/utils/substrate.ts @@ -9,6 +9,7 @@ import { EventRecord, RuntimeVersion, SignedBlock, + Header, } from '@polkadot/types/interfaces'; import { BN, BN_THOUSAND, BN_TWO, bnMin } from '@polkadot/util'; import { getLogger } from '@subql/node-core'; @@ -20,11 +21,12 @@ import { SubstrateBlock, SubstrateEvent, SubstrateExtrinsic, + BlockHeader, } from '@subql/types'; import { last, merge, range } from 'lodash'; import { SubqlProjectBlockFilter } from '../configure/SubqueryProject'; import { ApiPromiseConnection } from '../indexer/apiPromise.connection'; -import { BlockContent } from '../indexer/types'; +import { BlockContent, LightBlockContent } from '../indexer/types'; const logger = getLogger('fetch'); const INTERVAL_THRESHOLD = BN_THOUSAND.div(BN_TWO); const DEFAULT_TIME = new BN(6_000); @@ -275,6 +277,30 @@ export async function getBlockByHeight( return block; } +export async function getHeaderByHeight( + api: ApiPromise, + height: number, +): Promise

{ + const blockHash = await api.rpc.chain.getBlockHash(height).catch((e) => { + logger.error(`failed to fetch BlockHash ${height}`); + throw ApiPromiseConnection.handleError(e); + }); + + const header = await api.rpc.chain.getHeader(blockHash).catch((e) => { + logger.error( + `failed to fetch Block Header hash="${blockHash}" height="${height}"`, + ); + throw ApiPromiseConnection.handleError(e); + }); + // validate block is valid + if (header.hash.toHex() !== blockHash.toHex()) { + throw new Error( + `fetched block header hash ${header.hash.toHex()} is not match with blockHash ${blockHash.toHex()} at block ${height}. This is likely a problem with the rpc provider.`, + ); + } + return header; +} + export async function fetchBlocksRange( api: ApiPromise, startHeight: number, @@ -296,6 +322,15 @@ export async function fetchBlocksArray( ); } +export async function fetchHeaderArray( + api: ApiPromise, + blockArray: number[], +): Promise { + return Promise.all( + blockArray.map(async (height) => getHeaderByHeight(api, height)), + ); +} + export async function fetchEventsRange( api: ApiPromise, hashs: BlockHash[], @@ -351,6 +386,8 @@ export async function fetchBlocksBatches( const wrappedBlock = wrapBlock(block, events.toArray(), parentSpecVersion); const wrappedExtrinsics = wrapExtrinsics(wrappedBlock, events); const wrappedEvents = wrapEvents(wrappedExtrinsics, events, wrappedBlock); + + wrappedBlock.block.header; return { block: wrappedBlock, extrinsics: wrappedExtrinsics, @@ -359,6 +396,47 @@ export async function fetchBlocksBatches( }); } +// TODO why is fetchBlocksBatches a breadth first funciton rather than depth? +export async function fetchLightBlock( + api: ApiPromise, + height: number, +): Promise { + const blockHash = await api.rpc.chain.getBlockHash(height).catch((e) => { + logger.error(`failed to fetch BlockHash ${height}`); + throw ApiPromiseConnection.handleError(e); + }); + + const [header, events] = await Promise.all([ + api.rpc.chain.getHeader(blockHash).catch((e) => { + logger.error( + `failed to fetch Block Header hash="${blockHash}" height="${height}"`, + ); + throw ApiPromiseConnection.handleError(e); + }), + api.query.system.events.at(blockHash).catch((e) => { + logger.error(`failed to fetch events at block ${blockHash}`); + throw ApiPromiseConnection.handleError(e); + }), + ]); + + const blockHeader: BlockHeader = { + block: { header }, + events: events.toArray(), + }; + + return { + block: blockHeader, + events: events.map((evt, idx) => merge(evt, { idx, block: blockHeader })), + }; +} + +export async function fetchBlocksBatchesLight( + api: ApiPromise, + blockArray: number[], +): Promise { + return Promise.all(blockArray.map((height) => fetchLightBlock(api, height))); +} + export function calcInterval(api: ApiPromise): BN { return bnMin( A_DAY, diff --git a/packages/node/src/yargs.ts b/packages/node/src/yargs.ts index 4b2e451fc6..eb430803a5 100644 --- a/packages/node/src/yargs.ts +++ b/packages/node/src/yargs.ts @@ -162,9 +162,9 @@ export const yargsOptions = yargs(hideBin(process.argv)) }, 'disable-historical': { demandOption: false, - default: false, describe: 'Disable storing historical state entities', type: 'boolean', + // NOTE: don't set a default for this. It will break apply args from manifest. The default should be set in NodeConfig }, 'log-level': { demandOption: false, @@ -267,6 +267,13 @@ export const yargsOptions = yargs(hideBin(process.argv)) type: 'boolean', default: false, }, + skipBlock: { + demandOption: false, + describe: + 'If the project contains only event handlers and only accesses the events or block header then you can enable this option to reduce RPC requests and have a slight performance increase. This will be automatically disabled if handlers other than EventHandlers are detected.', + type: 'boolean', + // NOTE: don't set a default for this. It will break apply args from manifest. The default should be set in NodeConfig + }, timeout: { demandOption: false, describe: @@ -275,14 +282,15 @@ export const yargsOptions = yargs(hideBin(process.argv)) }, 'unfinalized-blocks': { demandOption: false, - default: false, describe: 'Enable to fetch and index unfinalized blocks', type: 'boolean', + // NOTE: don't set a default for this. It will break apply args from manifest. The default should be set in NodeConfig }, unsafe: { type: 'boolean', demandOption: false, describe: 'Allows usage of any built-in module within the sandbox', + // NOTE: don't set a default for this. It will break apply args from manifest. The default should be set in NodeConfig }, workers: { alias: 'w', diff --git a/packages/types/src/interfaces.ts b/packages/types/src/interfaces.ts index 97171cf56c..c588617ca9 100644 --- a/packages/types/src/interfaces.ts +++ b/packages/types/src/interfaces.ts @@ -3,7 +3,7 @@ import {AnyTuple, Codec} from '@polkadot/types-codec/types'; import {GenericExtrinsic} from '@polkadot/types/extrinsic'; -import {EventRecord, SignedBlock} from '@polkadot/types/interfaces'; +import {EventRecord, SignedBlock, Header} from '@polkadot/types/interfaces'; import {IEvent} from '@polkadot/types/types'; export interface Entity { @@ -44,9 +44,24 @@ export interface SubstrateExtrinsic { success: boolean; } -export interface SubstrateEvent extends TypedEventRecord { +interface BaseSubstrateEvent extends TypedEventRecord { // index in the block idx: number; +} + +// A subset of SubstrateBlock with just the header +export interface BlockHeader { + block: { + header: Header; + }; + events: EventRecord[]; +} + +export interface LightSubstrateEvent extends BaseSubstrateEvent { + block: BlockHeader; +} + +export interface SubstrateEvent extends BaseSubstrateEvent { extrinsic?: SubstrateExtrinsic; block: SubstrateBlock; } diff --git a/packages/types/src/project.ts b/packages/types/src/project.ts index 3d9e13d73f..2552604682 100644 --- a/packages/types/src/project.ts +++ b/packages/types/src/project.ts @@ -3,7 +3,7 @@ import {ApiPromise} from '@polkadot/api'; import {AnyTuple, RegistryTypes} from '@polkadot/types/types'; -import {SubstrateBlock, SubstrateEvent, SubstrateExtrinsic} from './interfaces'; +import {LightSubstrateEvent, SubstrateBlock, SubstrateEvent, SubstrateExtrinsic} from './interfaces'; export enum SubstrateDatasourceKind { Runtime = 'substrate/Runtime', @@ -17,7 +17,7 @@ export enum SubstrateHandlerKind { export type RuntimeHandlerInputMap = { [SubstrateHandlerKind.Block]: SubstrateBlock; - [SubstrateHandlerKind.Event]: SubstrateEvent; + [SubstrateHandlerKind.Event]: SubstrateEvent | LightSubstrateEvent; [SubstrateHandlerKind.Call]: SubstrateExtrinsic; }; From eac2aaf696615db5315506bd8a4aaa08bee20ef9 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Fri, 25 Aug 2023 09:25:01 +1200 Subject: [PATCH 2/3] Uncomment spec version --- packages/node/src/indexer/apiPromise.connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node/src/indexer/apiPromise.connection.ts b/packages/node/src/indexer/apiPromise.connection.ts index 59cd4dd29c..577cccbe84 100644 --- a/packages/node/src/indexer/apiPromise.connection.ts +++ b/packages/node/src/indexer/apiPromise.connection.ts @@ -99,7 +99,7 @@ export class ApiPromiseConnection const blocks = await this.fetchBlocksBatches( this.unsafeApi, heights, - // overallSpecVer, + overallSpecVer, ); return blocks; } From 45bb00ccdf2c1ae6f257e95ae1e81893c0137f9f Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 28 Aug 2023 15:19:45 +1200 Subject: [PATCH 3/3] Update changelog --- packages/common/CHANGELOG.md | 5 ++++- packages/node-core/CHANGELOG.md | 2 ++ packages/node/CHANGELOG.md | 4 ++++ packages/types/CHANGELOG.md | 2 ++ 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/common/CHANGELOG.md b/packages/common/CHANGELOG.md index bec6efea98..1d663d377e 100644 --- a/packages/common/CHANGELOG.md +++ b/packages/common/CHANGELOG.md @@ -7,8 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed - move block filters to common (#1969) + ### Added - Add spec for project upgrades (#1797) +- skipBlock node runner option (#1968) ## [2.6.0] - 2023-08-25 ### Changed @@ -305,7 +307,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - init commit -[Unreleased]: https://github.com/subquery/subql/compare/common/2.5.0...HEAD +[Unreleased]: https://github.com/subquery/subql/compare/common/2.6.0...HEAD +[2.6.0]: https://github.com/subquery/subql/compare/common/2.5.0...common/2.6.0 [2.5.0]: https://github.com/subquery/subql/compare/common/2.4.0...common/2.5.0 [2.4.0]: https://github.com/subquery/subql/compare/common/2.3.0...common/2.4.0 [2.3.0]: https://github.com/subquery/subql/compare/common/2.2.2...common/2.3.0 diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index 7b9d06fa8d..cabd9e147e 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -10,9 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Move more code from node to node-core. Including configure module, workers (#1797) +- Update api service generics to support multiple block types (#1968) ### Added - Project upgrades feature and many other changes to support it (#1797) +- skipBlock option to NodeConfig (#1968) ## [4.2.3] - 2023-08-17 ### Fixed diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index 5959c6a20d..73210b17a8 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Project upgrades feature which allows upgrading projects at specific heights (#1797) +- Support for skipBlock and LightBlock (#1968) + +### Fixed +- Project node runner options being overwritten by yargs defaults (#1967) ## [2.12.2] - 2023-08-17 ### Fixed diff --git a/packages/types/CHANGELOG.md b/packages/types/CHANGELOG.md index 83df75294b..56aa80e503 100644 --- a/packages/types/CHANGELOG.md +++ b/packages/types/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- New LightBlock type (#1968) ## [2.2.0] - 2023-08-16 ### Added