Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skipBlock option to improve performance with event handlers #1968

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- move block filters to common (#1969)

### Added
- Add spec for project upgrades (#1797)
- skipBlock node runner option (#1968)

## [2.6.0] - 2023-08-25
### Changed
Expand Down Expand Up @@ -305,7 +307,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- init commit

[Unreleased]: https://github.com/subquery/subql/compare/common/2.5.0...HEAD
[Unreleased]: https://github.com/subquery/subql/compare/common/2.6.0...HEAD
[2.6.0]: https://github.com/subquery/subql/compare/common/2.5.0...common/2.6.0
[2.5.0]: https://github.com/subquery/subql/compare/common/2.4.0...common/2.5.0
[2.4.0]: https://github.com/subquery/subql/compare/common/2.3.0...common/2.4.0
[2.3.0]: https://github.com/subquery/subql/compare/common/2.2.2...common/2.3.0
Expand Down
3 changes: 3 additions & 0 deletions packages/common/src/project/versioned/v1_0_0/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ export class RunnerNodeOptionsModel implements NodeOptions {
@IsOptional()
@IsBoolean()
unfinalizedBlocks?: boolean;
@IsOptional()
@IsBoolean()
skipBlock?: boolean;
}

export class BlockFilterImpl implements BlockFilter {
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/project/versioned/v1_0_0/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface NodeOptions {
historical?: boolean;
unsafe?: boolean;
unfinalizedBlocks?: boolean;
skipBlock?: boolean;
}

export interface ParentProject {
Expand Down
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Move more code from node to node-core. Including configure module, workers (#1797)
- Update api service generics to support multiple block types (#1968)

### Added
- Project upgrades feature and many other changes to support it (#1797)
- skipBlock option to NodeConfig (#1968)

## [4.2.3] - 2023-08-17
### Fixed
Expand Down
11 changes: 5 additions & 6 deletions packages/node-core/src/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: GPL-3.0

import {ApiConnectionError, ApiErrorType} from './api.connection.error';
import {NodeConfig} from './configure';
import {NetworkMetadataPayload} from './events';
import {ConnectionPoolService} from './indexer';
import {getLogger} from './logger';
Expand All @@ -11,25 +10,25 @@ const logger = getLogger('api');

const MAX_RECONNECT_ATTEMPTS = 5;

export interface IApi<A = any, SA = any, B = any> {
fetchBlocks(heights: number[], ...args: any): Promise<B[]>;
export interface IApi<A = any, SA = any, B extends Array<any> = any[]> {
fetchBlocks(heights: number[], ...args: any): Promise<B>;
safeApi(height: number): SA;
unsafeApi: A;
networkMeta: NetworkMetadataPayload;
}

export interface IApiConnectionSpecific<A = any, SA = any, B = any> extends IApi<A, SA, B> {
export interface IApiConnectionSpecific<A = any, SA = any, B extends Array<any> = any[]> extends IApi<A, SA, B> {
handleError(error: Error): ApiConnectionError;
apiConnect(): Promise<void>;
apiDisconnect(): Promise<void>;
}

export abstract class ApiService<A = any, SA = any, B = any> implements IApi<A, SA, B> {
export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]> implements IApi<A, SA, B> {
constructor(protected connectionPoolService: ConnectionPoolService<IApiConnectionSpecific<A, SA, B>>) {}

abstract networkMeta: NetworkMetadataPayload;

async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B[]> {
async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B> {
let reconnectAttempts = 0;
while (reconnectAttempts < numAttempts) {
try {
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export interface IConfig {
readonly storeFlushInterval: number;
readonly isTest?: boolean;
readonly root?: string;
readonly skipBlock?: boolean;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> & Pick<IConfig, 'subquery'>;
Expand Down Expand Up @@ -86,6 +87,7 @@ const DEFAULT_CONFIG = {
storeGetCacheSize: 500,
storeCacheAsync: true,
storeFlushInterval: 5,
skipBlock: false,
};

export class NodeConfig implements IConfig {
Expand Down Expand Up @@ -273,6 +275,10 @@ export class NodeConfig implements IConfig {
return !!this.scaleBatchSize;
}

get skipBlock(): boolean {
return !!this._config.skipBlock;
}

get postgresCACert(): string | undefined {
if (!this._config.pgCa) {
return undefined;
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export async function registerApp<P extends ISubqueryProject>(
config = NodeConfig.rebaseWithArgs(config, yargsToIConfig(argv, nameMapping), isTest);
} else {
if (!argv.subquery) {
logger.error('Subquery path is missing neither in cli options nor in config file');
logger.error('Subquery path is missing in both cli options and config file');
showHelp();
process.exit(1);
}
Expand Down
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/dictionary.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const nodeConfig = new NodeConfig({
subqueryName: 'asdf',
networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
dictionaryTimeout: 10,
// dictionaryResolver: 'https://kepler-auth.subquery.network'
});

jest.setTimeout(10000);
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export abstract class BaseIndexerManager<
SA, // Api Type
A, // SafeApi Type
B, // Block Type
API extends IApi<A, SA, B>,
API extends IApi<A, SA, B[]>,
DS extends BaseDataSource,
CDS extends DS & BaseCustomDataSource, // Custom datasource
FilterMap extends FilterTypeMap,
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo

protected abstract packageVersion: string;
protected abstract getBlockTimestamp(height: number): Promise<Date>;
protected abstract onProjectChange(project: ISubqueryProject<IProjectNetworkConfig, DS>): void | Promise<void>;

constructor(
private readonly dsProcessorService: BaseDsProcessorService,
Expand Down Expand Up @@ -331,8 +332,13 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo
// Reload the dynamic ds with new project
// TODO are we going to run into problems with this being non blocking
await this.dynamicDsService.getDynamicDatasources(true);

await this.onProjectChange(this.project);
});

// Called to allow handling the first project
await this.onProjectChange(this.project);

if (isMainThread) {
const lastProcessedHeight = await this.getLastProcessedHeight();

Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/test.runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class TestRunner<A, SA, B, DS> {
private passedTests = 0;
private failedTests = 0;
constructor(
@Inject('IApi') protected readonly apiService: IApi<A, SA, B>,
@Inject('IApi') protected readonly apiService: IApi<A, SA, B[]>,
protected readonly storeService: StoreService,
protected readonly sequelize: Sequelize,
protected readonly nodeConfig: NodeConfig,
Expand All @@ -42,7 +42,7 @@ export class TestRunner<A, SA, B, DS> {
block: B,
handler: string,
indexerManager: IIndexerManager<B, DS>,
apiService?: IApi<A, SA, B>
apiService?: IApi<A, SA, B[]>
) => Promise<void>
): Promise<{
passedTests: number;
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/testing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export abstract class TestingService<A, SA, B, DS extends BaseDataSource> {
block: B,
handler: string,
indexerManager: IIndexerManager<B, DS>,
apiService?: IApi<A, SA, B>
apiService?: IApi<A, SA, B[]>
): Promise<void> {
await indexerManager.indexBlock(block, this.getDsWithHandler(handler));
}
Expand Down
21 changes: 15 additions & 6 deletions packages/node-core/src/utils/configure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface ArgvOverrideOptions {
unsafe?: boolean;
disableHistorical?: boolean;
unfinalizedBlocks?: boolean;
skipBlock?: boolean;
}

export function defaultSubqueryName(config: Partial<IConfig>): MinConfig {
Expand All @@ -29,20 +30,28 @@ export function defaultSubqueryName(config: Partial<IConfig>): MinConfig {
} as MinConfig;
}

function applyArgs(
argvs: ArgvOverrideOptions,
options: RunnerNodeOptionsModel,
key: keyof Omit<ArgvOverrideOptions, 'disableHistorical'>
) {
if (argvs[key] === undefined && options[key] !== undefined) {
argvs[key] = options[key];
}
}

export function rebaseArgsWithManifest(argvs: ArgvOverrideOptions, rawManifest: unknown): void {
const options = plainToClass(RunnerNodeOptionsModel, (rawManifest as any)?.runner?.node?.options);
if (!options) {
return;
}

// we override them if they are not provided in args/flag
if (argvs.unsafe === undefined && options.unsafe !== undefined) {
argvs.unsafe = options.unsafe;
}
if (argvs.disableHistorical === undefined && options.historical !== undefined) {
// THIS IS OPPOSITE
argvs.disableHistorical = !options.historical;
}
if (argvs.unfinalizedBlocks === undefined && options.unfinalizedBlocks !== undefined) {
argvs.unfinalizedBlocks = options.unfinalizedBlocks;
}
applyArgs(argvs, options, 'unsafe');
applyArgs(argvs, options, 'unfinalizedBlocks');
applyArgs(argvs, options, 'skipBlock');
}
4 changes: 4 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Project upgrades feature which allows upgrading projects at specific heights (#1797)
- Support for skipBlock and LightBlock (#1968)

### Fixed
- Project node runner options being overwritten by yargs defaults (#1967)

## [2.12.2] - 2023-08-17
### Fixed
Expand Down
14 changes: 7 additions & 7 deletions packages/node/src/indexer/api.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe('ApiService', () => {
const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock;
const runtimeVersion = { specVersion: 1 } as unknown as RuntimeVersion;
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
const [patchedValidators, currentValidators] = await Promise.all([
Expand All @@ -121,7 +121,7 @@ describe('ApiService', () => {
const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock;
const runtimeVersion = { specVersion: 13 } as unknown as RuntimeVersion;
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
const apiResults = await api.query.staking.erasStakers.at(
Expand Down Expand Up @@ -151,7 +151,7 @@ describe('ApiService', () => {
const apiResults = await api.rpc.state.getRuntimeVersion(earlyBlockhash);
// step 2, api get patched result with block height
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
const patchedResult = await patchedApi.rpc.state.getRuntimeVersion(
Expand Down Expand Up @@ -179,7 +179,7 @@ describe('ApiService', () => {
const futureBlockhash = await api.rpc.chain.getBlockHash(6721195);
// step 2, api get patched result with block height
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
await expect(
Expand All @@ -206,7 +206,7 @@ describe('ApiService', () => {
const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock;
const runtimeVersion = { specVersion: 28 } as unknown as RuntimeVersion;
const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
expect(
Expand Down Expand Up @@ -379,7 +379,7 @@ describe('ApiService', () => {
const runtimeVersion = { specVersion: 1 } as unknown as RuntimeVersion;

const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
await expect(patchedApi.query.system.events()).resolves.toHaveLength(2);
Expand All @@ -399,7 +399,7 @@ describe('ApiService', () => {
const runtimeVersion = { specVersion: 1103 } as unknown as RuntimeVersion;

const patchedApi = await apiService.getPatchedApi(
mockBlock,
mockBlock.block.header,
runtimeVersion,
);
/* If Block number not provided should be ignored and `blockNumber` above used */
Expand Down
Loading