Skip to content

Commit

Permalink
Use RampQueue with workers
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Dec 1, 2024
1 parent 379b16c commit 0c969d4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
22 changes: 14 additions & 8 deletions packages/node-core/src/indexer/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,41 @@ export abstract class BaseWorkerService<
private fetchedBlocks: Record<string, IBlock<B>> = {};
private _isIndexing = false;

private queue: AutoQueue<R>;
private queue: AutoQueue<IBlock<B>>;

protected abstract fetchChainBlock(heights: number, extra: E): Promise<IBlock<B>>;
protected abstract toBlockResponse(block: B): R;
protected abstract processFetchedBlock(block: IBlock<B>, dataSources: DS[]): Promise<ProcessBlockResponse>;
// protected abstract getBlockSize(block: IBlock<B>): number;
protected abstract getBlockSize(block: IBlock<B>): number;

constructor(
private projectService: IProjectService<DS>,
private projectUpgradeService: IProjectUpgradeService,
nodeConfig: NodeConfig
) {
// this.queue = new RampQueue(this.getBlockSize.bind(this), nodeConfig.batchSize, undefined, nodeConfig.timeout, 'WorkerService');
this.queue = new AutoQueue(undefined, nodeConfig.batchSize, nodeConfig.timeout, 'Worker Service');
this.queue = new RampQueue(
this.getBlockSize.bind(this),
nodeConfig.batchSize,
undefined,
nodeConfig.timeout,
'WorkerService'
);
}

async fetchBlock(height: number, extra: E): Promise<R> {
try {
return await this.queue.put(async () => {
const block = await this.queue.put(async () => {
// If a dynamic ds is created we might be asked to fetch blocks again, use existing result
if (!this.fetchedBlocks[height]) {
const block = await this.fetchChainBlock(height, extra);
this.fetchedBlocks[height] = block;
}

const block = this.fetchedBlocks[height];
// Return info to get the runtime version, this lets the worker thread know
return this.toBlockResponse(block.block);
return this.fetchedBlocks[height];
});

// Return info to get the runtime version, this lets the worker thread know
return this.toBlockResponse(block.block);
} catch (e: any) {
if (!isTaskFlushedError(e)) {
logger.error(e, `Failed to fetch block ${height}`);
Expand Down
10 changes: 10 additions & 0 deletions packages/node/src/indexer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { ApiPromise } from '@polkadot/api';
import { ApiDecoration } from '@polkadot/api/types';
import type { HexString } from '@polkadot/util/types';
import { IBlock } from '@subql/node-core';
import {
BlockHeader,
LightSubstrateEvent,
Expand Down Expand Up @@ -32,3 +33,12 @@ export function isFullBlock(
): block is BlockContent {
return (block as BlockContent).extrinsics !== undefined;
}

export function getBlockSize(
block: IBlock<BlockContent | LightBlockContent>,
): number {
return block.block.events.reduce(
(acc, evt) => acc + evt.encodedLength,
(block.block.block as SubstrateBlock)?.encodedLength ?? 0,
);
}
13 changes: 12 additions & 1 deletion packages/node/src/indexer/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import { ApiService } from '../api.service';
import { SpecVersion } from '../dictionary';
import { IndexerManager } from '../indexer.manager';
import { WorkerRuntimeService } from '../runtime/workerRuntimeService';
import { BlockContent, isFullBlock, LightBlockContent } from '../types';
import {
BlockContent,
getBlockSize,
isFullBlock,
LightBlockContent,
} from '../types';

export type FetchBlockResponse = Header & { specVersion?: number };

Expand Down Expand Up @@ -68,6 +73,12 @@ export class WorkerService extends BaseWorkerService<
};
}

protected getBlockSize(
block: IBlock<BlockContent | LightBlockContent>,
): number {
return getBlockSize(block);
}

protected async processFetchedBlock(
block: IBlock<BlockContent | LightBlockContent>,
dataSources: SubstrateDatasource[],
Expand Down

0 comments on commit 0c969d4

Please sign in to comment.