Skip to content

Commit

Permalink
Add skipBlock option to improve performance with event handlers (#1968)
Browse files Browse the repository at this point in the history
* Add skipBlock option to improve performance with event handlers

* Uncomment spec version

* Update changelog
  • Loading branch information
stwiname authored Aug 28, 2023
1 parent b1270d0 commit f045e32
Show file tree
Hide file tree
Showing 31 changed files with 346 additions and 128 deletions.
5 changes: 4 additions & 1 deletion packages/common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- move block filters to common (#1969)

### Added
- Add spec for project upgrades (#1797)
- skipBlock node runner option (#1968)

## [2.6.0] - 2023-08-25
### Changed
Expand Down Expand Up @@ -305,7 +307,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- init commit

[Unreleased]: https://github.com/subquery/subql/compare/common/2.5.0...HEAD
[Unreleased]: https://github.com/subquery/subql/compare/common/2.6.0...HEAD
[2.6.0]: https://github.com/subquery/subql/compare/common/2.5.0...common/2.6.0
[2.5.0]: https://github.com/subquery/subql/compare/common/2.4.0...common/2.5.0
[2.4.0]: https://github.com/subquery/subql/compare/common/2.3.0...common/2.4.0
[2.3.0]: https://github.com/subquery/subql/compare/common/2.2.2...common/2.3.0
Expand Down
3 changes: 3 additions & 0 deletions packages/common/src/project/versioned/v1_0_0/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ export class RunnerNodeOptionsModel implements NodeOptions {
@IsOptional()
@IsBoolean()
unfinalizedBlocks?: boolean;
@IsOptional()
@IsBoolean()
skipBlock?: boolean;
}

export class BlockFilterImpl implements BlockFilter {
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/project/versioned/v1_0_0/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface NodeOptions {
historical?: boolean;
unsafe?: boolean;
unfinalizedBlocks?: boolean;
skipBlock?: boolean;
}

export interface ParentProject {
Expand Down
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Move more code from node to node-core. Including configure module, workers (#1797)
- Update api service generics to support multiple block types (#1968)

### Added
- Project upgrades feature and many other changes to support it (#1797)
- skipBlock option to NodeConfig (#1968)

## [4.2.3] - 2023-08-17
### Fixed
Expand Down
11 changes: 5 additions & 6 deletions packages/node-core/src/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: GPL-3.0

import {ApiConnectionError, ApiErrorType} from './api.connection.error';
import {NodeConfig} from './configure';
import {NetworkMetadataPayload} from './events';
import {ConnectionPoolService} from './indexer';
import {getLogger} from './logger';
Expand All @@ -11,25 +10,25 @@ const logger = getLogger('api');

const MAX_RECONNECT_ATTEMPTS = 5;

export interface IApi<A = any, SA = any, B = any> {
fetchBlocks(heights: number[], ...args: any): Promise<B[]>;
export interface IApi<A = any, SA = any, B extends Array<any> = any[]> {
fetchBlocks(heights: number[], ...args: any): Promise<B>;
safeApi(height: number): SA;
unsafeApi: A;
networkMeta: NetworkMetadataPayload;
}

export interface IApiConnectionSpecific<A = any, SA = any, B = any> extends IApi<A, SA, B> {
export interface IApiConnectionSpecific<A = any, SA = any, B extends Array<any> = any[]> extends IApi<A, SA, B> {
handleError(error: Error): ApiConnectionError;
apiConnect(): Promise<void>;
apiDisconnect(): Promise<void>;
}

export abstract class ApiService<A = any, SA = any, B = any> implements IApi<A, SA, B> {
export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]> implements IApi<A, SA, B> {
constructor(protected connectionPoolService: ConnectionPoolService<IApiConnectionSpecific<A, SA, B>>) {}

abstract networkMeta: NetworkMetadataPayload;

async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B[]> {
async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B> {
let reconnectAttempts = 0;
while (reconnectAttempts < numAttempts) {
try {
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export interface IConfig {
readonly storeFlushInterval: number;
readonly isTest?: boolean;
readonly root?: string;
readonly skipBlock?: boolean;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> & Pick<IConfig, 'subquery'>;
Expand Down Expand Up @@ -86,6 +87,7 @@ const DEFAULT_CONFIG = {
storeGetCacheSize: 500,
storeCacheAsync: true,
storeFlushInterval: 5,
skipBlock: false,
};

export class NodeConfig implements IConfig {
Expand Down Expand Up @@ -273,6 +275,10 @@ export class NodeConfig implements IConfig {
return !!this.scaleBatchSize;
}

get skipBlock(): boolean {
return !!this._config.skipBlock;
}

get postgresCACert(): string | undefined {
if (!this._config.pgCa) {
return undefined;
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export async function registerApp<P extends ISubqueryProject>(
config = NodeConfig.rebaseWithArgs(config, yargsToIConfig(argv, nameMapping), isTest);
} else {
if (!argv.subquery) {
logger.error('Subquery path is missing neither in cli options nor in config file');
logger.error('Subquery path is missing in both cli options and config file');
showHelp();
process.exit(1);
}
Expand Down
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/dictionary.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const nodeConfig = new NodeConfig({
subqueryName: 'asdf',
networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
dictionaryTimeout: 10,
// dictionaryResolver: 'https://kepler-auth.subquery.network'
});

jest.setTimeout(10000);
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export abstract class BaseIndexerManager<
SA, // Api Type
A, // SafeApi Type
B, // Block Type
API extends IApi<A, SA, B>,
API extends IApi<A, SA, B[]>,
DS extends BaseDataSource,
CDS extends DS & BaseCustomDataSource, // Custom datasource
FilterMap extends FilterTypeMap,
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo

protected abstract packageVersion: string;
protected abstract getBlockTimestamp(height: number): Promise<Date>;
protected abstract onProjectChange(project: ISubqueryProject<IProjectNetworkConfig, DS>): void | Promise<void>;

constructor(
private readonly dsProcessorService: BaseDsProcessorService,
Expand Down Expand Up @@ -331,8 +332,13 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo
// Reload the dynamic ds with new project
// TODO are we going to run into problems with this being non blocking
await this.dynamicDsService.getDynamicDatasources(true);

await this.onProjectChange(this.project);
});

// Called to allow handling the first project
await this.onProjectChange(this.project);

if (isMainThread) {
const lastProcessedHeight = await this.getLastProcessedHeight();

Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/test.runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class TestRunner<A, SA, B, DS> {
private passedTests = 0;
private failedTests = 0;
constructor(
@Inject('IApi') protected readonly apiService: IApi<A, SA, B>,
@Inject('IApi') protected readonly apiService: IApi<A, SA, B[]>,
protected readonly storeService: StoreService,
protected readonly sequelize: Sequelize,
protected readonly nodeConfig: NodeConfig,
Expand All @@ -42,7 +42,7 @@ export class TestRunner<A, SA, B, DS> {
block: B,
handler: string,
indexerManager: IIndexerManager<B, DS>,
apiService?: IApi<A, SA, B>
apiService?: IApi<A, SA, B[]>
) => Promise<void>
): Promise<{
passedTests: number;
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/testing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export abstract class TestingService<A, SA, B, DS extends BaseDataSource> {
block: B,
handler: string,
indexerManager: IIndexerManager<B, DS>,
apiService?: IApi<A, SA, B>
apiService?: IApi<A, SA, B[]>
): Promise<void> {
await indexerManager.indexBlock(block, this.getDsWithHandler(handler));
}
Expand Down
21 changes: 15 additions & 6 deletions packages/node-core/src/utils/configure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface ArgvOverrideOptions {
unsafe?: boolean;
disableHistorical?: boolean;
unfinalizedBlocks?: boolean;
skipBlock?: boolean;
}

export function defaultSubqueryName(config: Partial<IConfig>): MinConfig {
Expand All @@ -29,20 +30,28 @@ export function defaultSubqueryName(config: Partial<IConfig>): MinConfig {
} as MinConfig;
}

function applyArgs(
argvs: ArgvOverrideOptions,
options: RunnerNodeOptionsModel,
key: keyof Omit<ArgvOverrideOptions, 'disableHistorical'>
) {
if (argvs[key] === undefined && options[key] !== undefined) {
argvs[key] = options[key];
}
}

export function rebaseArgsWithManifest(argvs: ArgvOverrideOptions, rawManifest: unknown): void {
const options = plainToClass(RunnerNodeOptionsModel, (rawManifest as any)?.runner?.node?.options);
if (!options) {
return;
}

// we override them if they are not provided in args/flag
if (argvs.unsafe === undefined && options.unsafe !== undefined) {
argvs.unsafe = options.unsafe;
}
if (argvs.disableHistorical === undefined && options.historical !== undefined) {
// THIS IS OPPOSITE
argvs.disableHistorical = !options.historical;
}
if (argvs.unfinalizedBlocks === undefined && options.unfinalizedBlocks !== undefined) {
argvs.unfinalizedBlocks = options.unfinalizedBlocks;
}
applyArgs(argvs, options, 'unsafe');
applyArgs(argvs, options, 'unfinalizedBlocks');
applyArgs(argvs, options, 'skipBlock');
}
4 changes: 4 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Project upgrades feature which allows upgrading projects at specific heights (#1797)
- Support for skipBlock and LightBlock (#1968)

### Fixed
- Project node runner options being overwritten by yargs defaults (#1967)

## [2.12.2] - 2023-08-17
### Fixed
Expand Down
14 changes: 7 additions & 7 deletions packages/node/src/indexer/api.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe('ApiService', () => {
const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock;
const runtimeVersion = { specVersion: 1 } as unknown as RuntimeVersion;
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
const [patchedValidators, currentValidators] = await Promise.all([
Expand All @@ -121,7 +121,7 @@ describe('ApiService', () => {
const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock;
const runtimeVersion = { specVersion: 13 } as unknown as RuntimeVersion;
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
const apiResults = await api.query.staking.erasStakers.at(
Expand Down Expand Up @@ -151,7 +151,7 @@ describe('ApiService', () => {
const apiResults = await api.rpc.state.getRuntimeVersion(earlyBlockhash);
// step 2, api get patched result with block height
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
const patchedResult = await patchedApi.rpc.state.getRuntimeVersion(
Expand Down Expand Up @@ -179,7 +179,7 @@ describe('ApiService', () => {
const futureBlockhash = await api.rpc.chain.getBlockHash(6721195);
// step 2, api get patched result with block height
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
await expect(
Expand All @@ -206,7 +206,7 @@ describe('ApiService', () => {
const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock;
const runtimeVersion = { specVersion: 28 } as unknown as RuntimeVersion;
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
expect(
Expand Down Expand Up @@ -379,7 +379,7 @@ describe('ApiService', () => {
const runtimeVersion = { specVersion: 1 } as unknown as RuntimeVersion;

const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
await expect(patchedApi.query.system.events()).resolves.toHaveLength(2);
Expand All @@ -399,7 +399,7 @@ describe('ApiService', () => {
const runtimeVersion = { specVersion: 1103 } as unknown as RuntimeVersion;

const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
/* If Block number not provided should be ignored and `blockNumber` above used */
Expand Down
Loading

0 comments on commit f045e32

Please sign in to comment.