Skip to content

Commit

Permalink
Improve event iteration performance (#2601)
Browse files Browse the repository at this point in the history
* Improve event iteration performance

* Update changelog

* Address comments
  • Loading branch information
stwiname authored Nov 25, 2024
1 parent b4306c1 commit 0378dcc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
2 changes: 2 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Improve event iteration, this improves performance with large blocks (#2601)

## [5.3.0] - 2024-11-22
### Added
Expand Down
33 changes: 29 additions & 4 deletions packages/node/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
ProcessBlockResponse,
BaseIndexerManager,
IBlock,
getLogger,
} from '@subql/node-core';
import {
LightSubstrateEvent,
Expand All @@ -39,6 +40,8 @@ import { DynamicDsService } from './dynamic-ds.service';
import { ApiAt, BlockContent, isFullBlock, LightBlockContent } from './types';
import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';

const logger = getLogger('indexer');

@Injectable()
export class IndexerManager extends BaseIndexerManager<
ApiPromise,
Expand Down Expand Up @@ -105,9 +108,32 @@ export class IndexerManager extends BaseIndexerManager<
const { block, events, extrinsics } = blockContent;
await this.indexBlockContent(block, dataSources, getVM);

// Group the events so they only need to be iterated over a single time
const groupedEvents = events.reduce(
(acc, evt, idx) => {
if (evt.phase.isInitialization) {
acc.init.push(evt);
} else if (evt.phase.isFinalization) {
acc.finalize.push(evt);
} else if (evt.extrinsic?.idx) {
const idx = evt.extrinsic.idx;
acc[idx] ??= [];
acc[idx].push(evt);
} else {
logger.warn(
`Unrecognized event type, skipping. block="${block.block.header.number.toNumber()}" eventIdx="${idx}"`,
);
}
return acc;
},
{ init: [], finalize: [] } as Record<
number | 'init' | 'finalize',
SubstrateEvent[]
>,
);

// Run initialization events
const initEvents = events.filter((evt) => evt.phase.isInitialization);
for (const event of initEvents) {
for (const event of groupedEvents.init) {
await this.indexEvent(event, dataSources, getVM);
}

Expand All @@ -125,8 +151,7 @@ export class IndexerManager extends BaseIndexerManager<
}

// Run finalization events
const finalizeEvents = events.filter((evt) => evt.phase.isFinalization);
for (const event of finalizeEvents) {
for (const event of groupedEvents.finalize) {
await this.indexEvent(event, dataSources, getVM);
}
} else {
Expand Down
36 changes: 17 additions & 19 deletions packages/node/src/utils/substrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ export function wrapExtrinsics(
wrappedBlock: SubstrateBlock,
allEvents: EventRecord[],
): SubstrateExtrinsic[] {
const groupedEvents = groupEventsByExtrinsic(allEvents);
return wrappedBlock.block.extrinsics.map((extrinsic, idx) => {
const events = filterExtrinsicEvents(idx, allEvents);
const events = groupedEvents[idx];
return {
idx,
extrinsic,
Expand All @@ -109,13 +110,22 @@ function getExtrinsicSuccess(events: EventRecord[]): boolean {
);
}

function filterExtrinsicEvents(
extrinsicIdx: number,
function groupEventsByExtrinsic(
events: EventRecord[],
): EventRecord[] {
return events.filter(
({ phase }) =>
phase.isApplyExtrinsic && phase.asApplyExtrinsic.eqn(extrinsicIdx),
): Record<number, EventRecord[]> {
return events.reduce(
(acc, event) => {
const extrinsicIdx = event.phase.isApplyExtrinsic
? event.phase.asApplyExtrinsic.toNumber()
: undefined;
if (extrinsicIdx === undefined) {
return acc;
}
acc[extrinsicIdx] ??= [];
acc[extrinsicIdx].push(event);
return acc;
},
{} as Record<number, EventRecord[]>,
);
}

Expand Down Expand Up @@ -311,18 +321,6 @@ export async function getHeaderByHeight(
return header;
}

export async function fetchBlocksRange(
api: ApiPromise,
startHeight: number,
endHeight: number,
): Promise<SignedBlock[]> {
return Promise.all(
range(startHeight, endHeight + 1).map(async (height) =>
getBlockByHeight(api, height),
),
);
}

export async function fetchBlocksArray(
api: ApiPromise,
blockArray: number[],
Expand Down

0 comments on commit 0378dcc

Please sign in to comment.