From 379b16cf8d9f0301c71f8afd827ef320dbc59309 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Thu, 28 Nov 2024 17:08:33 +1300 Subject: [PATCH 1/9] Implement dynamic block fetching based on block size --- .../blockDispatcher/base-block-dispatcher.ts | 19 +-- .../blockDispatcher/block-dispatcher.ts | 29 ++--- .../worker-block-dispatcher.ts | 71 ++++------- .../src/indexer/fetch.service.spec.ts | 4 +- .../node-core/src/indexer/fetch.service.ts | 34 ++++-- .../node-core/src/indexer/indexer.manager.ts | 4 +- .../src/indexer/smartBatch.service.ts | 115 ------------------ .../src/indexer/worker/worker.builder.ts | 7 +- .../src/indexer/worker/worker.service.ts | 11 +- .../node-core/src/indexer/worker/worker.ts | 17 --- packages/node-core/src/utils/batch-size.ts | 42 ------- .../node-core/src/utils/blockSizeBuffer.ts | 26 ---- packages/node-core/src/utils/index.ts | 4 +- .../src/utils/{ => queues}/autoQueue.spec.ts | 0 .../src/utils/{ => queues}/autoQueue.ts | 77 +----------- packages/node-core/src/utils/queues/index.ts | 6 + packages/node-core/src/utils/queues/queue.ts | 72 +++++++++++ .../src/utils/queues/rampQueue.spec.ts | 51 ++++++++ .../node-core/src/utils/queues/rampQueue.ts | 92 ++++++++++++++ .../block-dispatcher.service.ts | 11 +- packages/node/src/utils/substrate.ts | 3 +- 21 files changed, 315 insertions(+), 380 deletions(-) delete mode 100644 packages/node-core/src/indexer/smartBatch.service.ts delete mode 100644 packages/node-core/src/utils/batch-size.ts delete mode 100644 packages/node-core/src/utils/blockSizeBuffer.ts rename packages/node-core/src/utils/{ => queues}/autoQueue.spec.ts (100%) rename packages/node-core/src/utils/{ => queues}/autoQueue.ts (82%) create mode 100644 packages/node-core/src/utils/queues/index.ts create mode 100644 packages/node-core/src/utils/queues/queue.ts create mode 100644 packages/node-core/src/utils/queues/rampQueue.spec.ts create mode 100644 packages/node-core/src/utils/queues/rampQueue.ts diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 45c2794357..24825dd3b9 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -13,7 +13,6 @@ import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../ import {IQueue, mainThreadOnly} from '../../utils'; import {MonitorServiceInterface} from '../monitor.service'; import {PoiBlock, PoiSyncService} from '../poi'; -import {SmartBatchService} from '../smartBatch.service'; import {StoreService} from '../store.service'; import {IStoreModelProvider} from '../storeModelProvider'; import {IPoi} from '../storeModelProvider/poi'; @@ -32,8 +31,7 @@ export interface IBlockDispatcher { queueSize: number; freeSize: number; latestBufferedHeight: number; - smartBatchSize: number; - minimumHeapLimit: number; + batchSize: number; // Remove all enqueued blocks, used when a dynamic ds is created flushQueue(height: number): void; @@ -53,8 +51,6 @@ export abstract class BaseBlockDispatcher implements IB private _onDynamicDsCreated?: (height: number) => void; private _pendingRewindHeader?: Header; - protected smartBatchService: SmartBatchService; - constructor( protected nodeConfig: NodeConfig, protected eventEmitter: EventEmitter2, @@ -66,9 +62,7 @@ export abstract class BaseBlockDispatcher implements IB private storeModelProvider: IStoreModelProvider, private poiSyncService: PoiSyncService, protected monitorService?: MonitorServiceInterface - ) { - this.smartBatchService = new SmartBatchService(nodeConfig.batchSize); - } + ) {} abstract enqueueBlocks(heights: (IBlock | number)[], latestBufferHeight?: number): void | Promise; @@ -87,12 +81,9 @@ export abstract class BaseBlockDispatcher implements IB return this.queue.freeSpace; } - get smartBatchSize(): number { - return this.smartBatchService.getSafeBatchSize(); - } - - get minimumHeapLimit(): number { - return this.smartBatchService.minimumHeapRequired; + get batchSize(): number { + // TODO make this smarter + return this.nodeConfig.batchSize; } get latestProcessedHeight(): number { diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 3962d0c074..5c92cd7d71 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -1,7 +1,6 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import {getHeapStatistics} from 'v8'; import {OnApplicationShutdown} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; import {Interval} from '@nestjs/schedule'; @@ -12,11 +11,12 @@ import {getBlockHeight, IBlock, PoiSyncService} from '../../indexer'; import {getLogger} from '../../logger'; import {exitWithError, monitorWrite} from '../../process'; import {profilerWrap} from '../../profiler'; -import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils'; +import {Queue, AutoQueue, RampQueue, delay, isTaskFlushedError} from '../../utils'; import {StoreService} from '../store.service'; import {IStoreModelProvider} from '../storeModelProvider'; import {IProjectService, ISubqueryProject} from '../types'; import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; +// import { RampQueue } from '@subql/node-core/utils/rampQueue'; const logger = getLogger('BlockDispatcherService'); @@ -37,6 +37,7 @@ export abstract class BlockDispatcher private fetching = false; private isShutdown = false; + protected abstract getBlockSize(block: IBlock): number; protected abstract indexBlock(block: IBlock): Promise; constructor( @@ -62,7 +63,13 @@ export abstract class BlockDispatcher poiSyncService ); this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process'); - this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch'); + this.fetchQueue = new RampQueue( + this.getBlockSize.bind(this), + nodeConfig.batchSize, + nodeConfig.batchSize * 3, + nodeConfig.timeout, + 'Fetch' + ); if (this.nodeConfig.profiler) { this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches'); } else { @@ -96,10 +103,6 @@ export abstract class BlockDispatcher this.processQueue.flush(); } - private memoryleft(): number { - return this.smartBatchService.heapMemoryLimit() - getHeapStatistics().used_heap_size; - } - @Interval(10000) queueStats(stat: 'size' | 'freeSpace' = 'freeSpace'): void { // NOTE: If the free space of the process queue is low it means that processing is the limiting factor. If it is large then fetching blocks is the limitng factor. @@ -107,6 +110,7 @@ export abstract class BlockDispatcher `QUEUE INFO ${stat}: Block numbers: ${this.queue[stat]}, fetch: ${this.fetchQueue[stat]}, process: ${this.processQueue[stat]}` ); } + private async fetchBlocksFromQueue(): Promise { if (this.fetching || this.isShutdown) return; @@ -128,23 +132,14 @@ export abstract class BlockDispatcher // Used to compare before and after as a way to check if queue was flushed const bufferedHeight = this._latestBufferedHeight; - - if (this.memoryleft() < 0) { - //stop fetching until memory is freed - await waitForBatchSize(this.minimumHeapLimit); - } - void this.fetchQueue .put(async () => { - if (memoryLock.isLocked()) { - await memoryLock.waitForUnlock(); - } if (typeof blockOrNum !== 'number') { // Type is of block return blockOrNum; } const [block] = await this.fetchBlocksBatches([blockOrNum]); - this.smartBatchService.addToSizeBuffer([block]); + return block; }) .then( diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 3031994c8e..7ffa065f65 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -2,23 +2,23 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { OnApplicationShutdown } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { Interval } from '@nestjs/schedule'; -import { last } from 'lodash'; -import { NodeConfig } from '../../configure'; -import { IProjectUpgradeService } from '../../configure/ProjectUpgrade.service'; -import { IndexerEvent } from '../../events'; -import { IBlock, PoiSyncService } from '../../indexer'; -import { getLogger } from '../../logger'; -import { monitorWrite } from '../../process'; -import { AutoQueue, isTaskFlushedError } from '../../utils'; -import { MonitorServiceInterface } from '../monitor.service'; -import { StoreService } from '../store.service'; -import { IStoreModelProvider } from '../storeModelProvider'; -import { ISubqueryProject, IProjectService, Header } from '../types'; -import { isBlockUnavailableError } from '../worker/utils'; -import { BaseBlockDispatcher } from './base-block-dispatcher'; +import {OnApplicationShutdown} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {Interval} from '@nestjs/schedule'; +import {last} from 'lodash'; +import {NodeConfig} from '../../configure'; +import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service'; +import {IndexerEvent} from '../../events'; +import {IBlock, PoiSyncService} from '../../indexer'; +import {getLogger} from '../../logger'; +import {monitorWrite} from '../../process'; +import {AutoQueue, isTaskFlushedError} from '../../utils'; +import {MonitorServiceInterface} from '../monitor.service'; +import {StoreService} from '../store.service'; +import {IStoreModelProvider} from '../storeModelProvider'; +import {ISubqueryProject, IProjectService, Header} from '../types'; +import {isBlockUnavailableError} from '../worker/utils'; +import {BaseBlockDispatcher} from './base-block-dispatcher'; const logger = getLogger('WorkerBlockDispatcherService'); @@ -26,8 +26,6 @@ type Worker = { processBlock: (height: number) => Promise; getStatus: () => Promise; getMemoryLeft: () => Promise; - getBlocksLoaded: () => Promise; - waitForWorkerBatchSize: (heapSizeInBytes: number) => Promise; terminate: () => Promise; }; @@ -43,11 +41,11 @@ function initAutoQueue( export abstract class WorkerBlockDispatcher extends BaseBlockDispatcher, DS, B> - implements OnApplicationShutdown { + implements OnApplicationShutdown +{ protected workers: W[] = []; private numWorkers: number; private isShutdown = false; - private currentWorkerIndex = 0; protected abstract fetchBlock(worker: W, height: number): Promise
; @@ -115,7 +113,7 @@ export abstract class WorkerBlockDispatcher let startIndex = 0; while (startIndex < heights.length) { const workerIdx = await this.getNextWorkerIndex(); - const batchSize = Math.min(heights.length - startIndex, await this.maxBatchSize(workerIdx)); + const batchSize = heights.length - startIndex; await Promise.all( heights .slice(startIndex, startIndex + batchSize) @@ -135,6 +133,7 @@ export abstract class WorkerBlockDispatcher this.latestBufferedHeight = latestBufferHeight ?? last(heights as number[]) ?? this.latestBufferedHeight; } + // eslint-disable-next-line @typescript-eslint/require-await private async enqueueBlock(height: number, workerIdx: number): Promise { if (this.isShutdown) return; const worker = this.workers[workerIdx]; @@ -143,9 +142,6 @@ export abstract class WorkerBlockDispatcher // Used to compare before and after as a way to check if queue was flushed const bufferedHeight = this.latestBufferedHeight; - - await worker.waitForWorkerBatchSize(this.minimumHeapLimit); - const pendingBlock = this.fetchBlock(worker, height); const processBlock = async () => { @@ -159,7 +155,7 @@ export abstract class WorkerBlockDispatcher await this.preProcessBlock(header); monitorWrite(`Processing from worker #${workerIdx}`); - const { dynamicDsCreated, reindexBlockHeader } = await worker.processBlock(height); + const {dynamicDsCreated, reindexBlockHeader} = await worker.processBlock(height); await this.postProcessBlock(header, { dynamicDsCreated, @@ -211,25 +207,8 @@ export abstract class WorkerBlockDispatcher } private async getNextWorkerIndex(): Promise { - const startIndex = this.currentWorkerIndex; - do { - this.currentWorkerIndex = (this.currentWorkerIndex + 1) % this.workers.length; - const memLeft = await this.workers[this.currentWorkerIndex].getMemoryLeft(); - if (memLeft >= this.minimumHeapLimit) { - return this.currentWorkerIndex; - } - } while (this.currentWorkerIndex !== startIndex); - - // All workers have been tried and none have enough memory left. - // wait for any worker to free the memory before calling getNextWorkerIndex again - await Promise.race(this.workers.map((worker) => worker.waitForWorkerBatchSize(this.minimumHeapLimit))); - - return this.getNextWorkerIndex(); - } - - private async maxBatchSize(workerIdx: number): Promise { - const memLeft = await this.workers[workerIdx].getMemoryLeft(); - if (memLeft < this.minimumHeapLimit) return 0; - return this.smartBatchService.safeBatchSizeForRemainingMemory(memLeft); + return Promise.all(this.workers.map((worker) => worker.getMemoryLeft())).then((memoryLeftValues) => { + return memoryLeftValues.indexOf(Math.max(...memoryLeftValues)); + }); } } diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 3a5d7b45ea..30ff1e2176 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -4,7 +4,6 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry} from '@subql/types-core'; -import {range} from 'lodash'; import { BaseUnfinalizedBlocksService, BlockDispatcher, @@ -145,8 +144,7 @@ const getDictionaryService = () => const getBlockDispatcher = () => { const inst = { latestBufferedHeight: 0, - smartBatchSize: 10, - minimumHeapLimit: 1000, + batchSize: 10, freeSize: 10, enqueueBlocks: (heights: number[], latestBufferHeight: number) => { (inst as any).freeSize = inst.freeSize - heights.length; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 65a76e7ba2..8e0abf229f 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -10,7 +10,7 @@ import {range} from 'lodash'; import {NodeConfig} from '../configure'; import {IndexerEvent} from '../events'; import {getLogger} from '../logger'; -import {delay, filterBypassBlocks, waitForBatchSize} from '../utils'; +import {delay, filterBypassBlocks} from '../utils'; import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; @@ -28,6 +28,9 @@ export abstract class BaseFetchService; + #pendingBestBlockHead?: Promise; + // If the chain doesn't have a distinction between the 2 it should return the same value for finalized and best protected abstract getFinalizedHeader(): Promise
; protected abstract getBestHeight(): Promise; @@ -77,10 +80,24 @@ export abstract class BaseFetchService { + return (this.#pendingFinalizedBlockHead ??= this.getFinalizedBlockHead().finally( + () => (this.#pendingFinalizedBlockHead = undefined) + )); + } + + // Memoizes the request by not making new ones if one is already in progress + private async memoGetBestBlockHead(): Promise { + return (this.#pendingBestBlockHead ??= this.getBestBlockHead().finally( + () => (this.#pendingBestBlockHead = undefined) + )); + } + async init(startHeight: number): Promise { const interval = await this.getChainInterval(); - await Promise.all([this.getFinalizedBlockHead(), this.getBestBlockHead()]); + await Promise.all([this.memoGetFinalizedBlockHead(), this.memoGetBestBlockHead()]); const chainLatestHeight = this.latestHeight(); if (startHeight > chainLatestHeight) { @@ -100,11 +117,11 @@ export abstract class BaseFetchService void this.getFinalizedBlockHead(), interval) + setInterval(() => void this.memoGetFinalizedBlockHead(), interval) ); this.schedulerRegistry.addInterval( 'getBestBlockHead', - setInterval(() => void this.getBestBlockHead(), interval) + setInterval(() => void this.memoGetBestBlockHead(), interval) ); await this.dictionaryService.initDictionaries(); @@ -186,12 +203,7 @@ export abstract class BaseFetchService `- Handler: ${handler.handler}, args:${handledStringify(data)}`); await this.transformAndExecuteCustomDs(ds, vm, handler, data); } diff --git a/packages/node-core/src/indexer/smartBatch.service.ts b/packages/node-core/src/indexer/smartBatch.service.ts deleted file mode 100644 index 66ccf702c1..0000000000 --- a/packages/node-core/src/indexer/smartBatch.service.ts +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import {getHeapStatistics} from 'v8'; -import {Injectable} from '@nestjs/common'; -import {formatMBtoBytes} from '../utils'; -import {BlockSizeBuffer} from '../utils/blockSizeBuffer'; - -@Injectable() -export class SmartBatchService { - private blockSizeBuffer: BlockSizeBuffer; - - constructor(private maxBatchSize: number, private minHeapRequired: number = formatMBtoBytes(128)) { - this.blockSizeBuffer = new BlockSizeBuffer(maxBatchSize); - } - - get minimumHeapRequired(): number { - return this.minHeapRequired; - } - - addToSizeBuffer(blocks: any[]): void { - // Non-null assertion because we define a capacity - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const freeSpace = this.blockSizeBuffer.freeSpace!; - if (this.blockSizeBuffer.capacity && blocks.length > freeSpace) { - this.blockSizeBuffer.takeMany(blocks.length - freeSpace); - } - blocks.forEach((block) => this.blockSizeBuffer.put(this.blockSize(block))); - } - - blockSize(block: any): number { - let size = 0; - const stack: {obj: any; prop: any}[] = [ - {obj: block, prop: null}, - {obj: null, prop: null}, - ]; // Add sentinel value - - while (stack.length > 1) { - // Check for sentinel value - const item = stack.pop(); - if (!item) continue; - const {obj, prop} = item; - const type = typeof obj; - - if (type === 'string') { - size += Buffer.byteLength(obj); - } else if (type === 'number' || type === 'boolean' || obj === null || obj === undefined) { - size += String(obj).length; - } else if (type === 'bigint') { - size += obj.toString().length; - } else if (Array.isArray(obj)) { - size += 1; // opening bracket - stack.push({obj: null, prop: null}); // sentinel - for (let i = obj.length - 1; i >= 0; i--) { - stack.push({obj: obj[i], prop: i}); - } - } else if (type === 'object') { - size += 1; // opening brace - stack.push({obj: null, prop: null}); // sentinel - const keys = Object.keys(obj).sort(); - for (let i = keys.length - 1; i >= 0; i--) { - const key = keys[i]; - stack.push({obj: obj[key], prop: key}); - } - } else { - throw new Error(`Cannot serialize ${type}`); - } - - if (stack[stack.length - 1].prop !== prop && obj !== null && obj !== undefined) { - // Check for undefined/null values - size += 1; // comma or closing bracket/brace - } - } - - return size; - } - - heapMemoryLimit(): number { - return getHeapStatistics().heap_size_limit - this.minHeapRequired; - } - - getSafeBatchSize(): number { - const heapUsed = getHeapStatistics().used_heap_size; - let averageBlockSize; - - try { - averageBlockSize = this.blockSizeBuffer.average(); - } catch (e) { - return this.maxBatchSize; - } - - const heapleft = this.heapMemoryLimit() - heapUsed; - - //stop fetching until memory is freed - if (heapleft <= 0) { - return 0; - } - - const safeBatchSize = Math.floor(heapleft / averageBlockSize); - return Math.min(safeBatchSize, this.maxBatchSize); - } - - safeBatchSizeForRemainingMemory(memLeft: number): number { - let averageBlockSize; - - try { - averageBlockSize = this.blockSizeBuffer.average(); - } catch (e) { - return this.maxBatchSize; - } - - const safeBatchSize = Math.floor(memLeft / averageBlockSize); - return Math.min(safeBatchSize, this.maxBatchSize); - } -} diff --git a/packages/node-core/src/indexer/worker/worker.builder.ts b/packages/node-core/src/indexer/worker/worker.builder.ts index 7febb1cb40..12b5b7d2c6 100644 --- a/packages/node-core/src/indexer/worker/worker.builder.ts +++ b/packages/node-core/src/indexer/worker/worker.builder.ts @@ -177,7 +177,12 @@ export class Worker extends WorkerIO { * @param hostFns - functions the host exposes to the worker * @param exitMain - if true, when a worker exits the host will also exit * */ - private constructor(private worker: workers.Worker, workerFns: (keyof T)[], hostFns: AsyncMethods, exitMain = true) { + private constructor( + private worker: workers.Worker, + workerFns: (keyof T)[], + hostFns: AsyncMethods, + exitMain = true + ) { super(worker, workerFns as string[], hostFns, getLogger(`worker: ${worker.threadId}`)); this.worker.on('error', (error) => { diff --git a/packages/node-core/src/indexer/worker/worker.service.ts b/packages/node-core/src/indexer/worker/worker.service.ts index 9e8581c443..1332ee0134 100644 --- a/packages/node-core/src/indexer/worker/worker.service.ts +++ b/packages/node-core/src/indexer/worker/worker.service.ts @@ -5,7 +5,7 @@ import {BaseDataSource} from '@subql/types-core'; import {IProjectUpgradeService, NodeConfig} from '../../configure'; import {getLogger} from '../../logger'; import {monitorWrite} from '../../process'; -import {AutoQueue, isTaskFlushedError, memoryLock} from '../../utils'; +import {AutoQueue, isTaskFlushedError, RampQueue} from '../../utils'; import {ProcessBlockResponse} from '../blockDispatcher'; import {Header, IBlock, IProjectService} from '../types'; import {isBlockUnavailableError} from './utils'; @@ -35,12 +35,14 @@ export abstract class BaseWorkerService< protected abstract fetchChainBlock(heights: number, extra: E): Promise>; protected abstract toBlockResponse(block: B): R; protected abstract processFetchedBlock(block: IBlock, dataSources: DS[]): Promise; + // protected abstract getBlockSize(block: IBlock): number; constructor( private projectService: IProjectService, private projectUpgradeService: IProjectUpgradeService, nodeConfig: NodeConfig ) { + // this.queue = new RampQueue(this.getBlockSize.bind(this), nodeConfig.batchSize, undefined, nodeConfig.timeout, 'WorkerService'); this.queue = new AutoQueue(undefined, nodeConfig.batchSize, nodeConfig.timeout, 'Worker Service'); } @@ -49,13 +51,6 @@ export abstract class BaseWorkerService< return await this.queue.put(async () => { // If a dynamic ds is created we might be asked to fetch blocks again, use existing result if (!this.fetchedBlocks[height]) { - if (memoryLock.isLocked()) { - const start = Date.now(); - await memoryLock.waitForUnlock(); - const end = Date.now(); - logger.debug(`memory lock wait time: ${end - start}ms`); - } - const block = await this.fetchChainBlock(height, extra); this.fetchedBlocks[height] = block; } diff --git a/packages/node-core/src/indexer/worker/worker.ts b/packages/node-core/src/indexer/worker/worker.ts index 7d3854ec3e..d68036b210 100644 --- a/packages/node-core/src/indexer/worker/worker.ts +++ b/packages/node-core/src/indexer/worker/worker.ts @@ -8,7 +8,6 @@ import {INestApplication} from '@nestjs/common'; import {BaseDataSource, Store, Cache} from '@subql/types-core'; import {IApiConnectionSpecific} from '../../api.service'; import {getLogger} from '../../logger'; -import {waitForBatchSize} from '../../utils'; import {ProcessBlockResponse} from '../blockDispatcher'; import {ConnectionPoolStateManager} from '../connectionPoolState.manager'; import {IDynamicDsService} from '../dynamic-ds.service'; @@ -56,10 +55,6 @@ export function getWorkerService(): S { assert(workerService, 'Worker Not initialised'); return workerService as S; } -// eslint-disable-next-line @typescript-eslint/require-await -async function getBlocksLoaded(): Promise { - return workerService.numFetchedBlocks + workerService.numFetchingBlocks; -} // eslint-disable-next-line @typescript-eslint/require-await async function getMemoryLeft(): Promise { @@ -103,10 +98,6 @@ async function numFetchingBlocks(): Promise { return workerService.numFetchingBlocks; } -async function waitForWorkerBatchSize(heapSizeInBytes: number): Promise { - await waitForBatchSize(heapSizeInBytes); -} - // Export types to be used on the parent type FetchBlock = typeof fetchBlock; type ProcessBlock = typeof processBlock; @@ -114,8 +105,6 @@ type NumFetchedBlocks = typeof numFetchedBlocks; type NumFetchingBlocks = typeof numFetchingBlocks; type GetWorkerStatus = typeof getStatus; type GetMemoryLeft = typeof getMemoryLeft; -type GetBlocksLoaded = typeof getBlocksLoaded; -type WaitForWorkerBatchSize = typeof waitForWorkerBatchSize; export type IBaseIndexerWorker = { processBlock: ProcessBlock; @@ -124,8 +113,6 @@ export type IBaseIndexerWorker = { numFetchingBlocks: NumFetchingBlocks; getStatus: GetWorkerStatus; getMemoryLeft: GetMemoryLeft; - getBlocksLoaded: GetBlocksLoaded; - waitForWorkerBatchSize: WaitForWorkerBatchSize; }; export const baseWorkerFunctions: (keyof IBaseIndexerWorker)[] = [ @@ -135,8 +122,6 @@ export const baseWorkerFunctions: (keyof IBaseIndexerWorker)[] = [ 'numFetchingBlocks', 'getStatus', 'getMemoryLeft', - 'getBlocksLoaded', - 'waitForWorkerBatchSize', ]; export function createWorkerHost< @@ -166,8 +151,6 @@ export function createWorkerHost< numFetchingBlocks, getStatus, getMemoryLeft, - getBlocksLoaded, - waitForWorkerBatchSize, }, logger ); diff --git a/packages/node-core/src/utils/batch-size.ts b/packages/node-core/src/utils/batch-size.ts deleted file mode 100644 index 4a0e93b538..0000000000 --- a/packages/node-core/src/utils/batch-size.ts +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import {getHeapStatistics} from 'v8'; -import {Mutex} from 'async-mutex'; -import {getLogger} from '../logger'; - -const logger = getLogger('memory'); - -export const memoryLock = new Mutex(); - -export async function waitForBatchSize(sizeInBytes: number): Promise { - let resolved = false; - const checkHeap = async () => { - const heapTotal = getHeapStatistics().heap_size_limit; - const {heapUsed} = process.memoryUsage(); - const availableHeap = heapTotal - heapUsed; - if (availableHeap >= sizeInBytes && !resolved) { - resolved = true; - if (memoryLock.isLocked()) { - memoryLock.release(); - } - return; - } - if (!memoryLock.isLocked()) { - await memoryLock.acquire(); - } - if (!resolved) { - logger.warn('Out of Memory - waiting for heap to be freed...'); - await checkHeap(); - } - }; - await checkHeap(); -} - -export function formatMBtoBytes(sizeInMB: number): number { - return sizeInMB / 1024 / 1024; -} - -export function formatBytesToMB(sizeInBytes: number): number { - return sizeInBytes * 1024 * 1024; -} diff --git a/packages/node-core/src/utils/blockSizeBuffer.ts b/packages/node-core/src/utils/blockSizeBuffer.ts deleted file mode 100644 index 3c87010a2f..0000000000 --- a/packages/node-core/src/utils/blockSizeBuffer.ts +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import {Queue} from './autoQueue'; - -export class BlockSizeBuffer extends Queue { - constructor(capacity: number) { - super(capacity); - } - - average(): number { - if (this.size === 0) { - throw new Error('No block sizes to average'); - } - - let sum = 0; - for (let i = 0; i < this.size; i++) { - sum += this.items[i]; - } - - if (!this.capacity) { - throw new Error('Capacity is expected to be defined for block size buffer'); - } - return Math.floor(sum / this.capacity); - } -} diff --git a/packages/node-core/src/utils/index.ts b/packages/node-core/src/utils/index.ts index 53fa95d549..5eace77924 100644 --- a/packages/node-core/src/utils/index.ts +++ b/packages/node-core/src/utils/index.ts @@ -5,11 +5,9 @@ export * from './decorators'; export * from './object'; export * from './promise'; export * from './graphql'; -export * from './batch-size'; -export * from './autoQueue'; +export * from './queues'; export * from './project'; export * from './fetchHelpers'; -export * from './blockSizeBuffer'; export * from './configure'; export * from './reindex'; export * from './blocks'; diff --git a/packages/node-core/src/utils/autoQueue.spec.ts b/packages/node-core/src/utils/queues/autoQueue.spec.ts similarity index 100% rename from packages/node-core/src/utils/autoQueue.spec.ts rename to packages/node-core/src/utils/queues/autoQueue.spec.ts diff --git a/packages/node-core/src/utils/autoQueue.ts b/packages/node-core/src/utils/queues/autoQueue.ts similarity index 82% rename from packages/node-core/src/utils/autoQueue.ts rename to packages/node-core/src/utils/queues/autoQueue.ts index 10a8846fdd..cb810df43e 100644 --- a/packages/node-core/src/utils/autoQueue.ts +++ b/packages/node-core/src/utils/queues/autoQueue.ts @@ -2,15 +2,8 @@ // SPDX-License-Identifier: GPL-3.0 import {EventEmitter2} from '@nestjs/event-emitter'; -import {timeout} from './promise'; - -export interface IQueue { - size: number; - capacity: number | undefined; - freeSpace: number | undefined; - - flush(): void; -} +import {timeout} from '../promise'; +import {IQueue, Queue} from './queue'; export class TaskFlushedError extends Error { readonly name = 'TaskFlushedError'; @@ -24,69 +17,7 @@ export function isTaskFlushedError(e: any): e is TaskFlushedError { return (e as TaskFlushedError)?.name === 'TaskFlushedError'; } -export class Queue implements IQueue { - protected items: T[] = []; - private _capacity?: number; - - constructor(capacity?: number) { - this._capacity = capacity; - } - - get size(): number { - return this.items.length; - } - - get capacity(): number | undefined { - return this._capacity; - } - - get freeSpace(): number | undefined { - if (!this._capacity) return undefined; - - return this._capacity - this.size; - } - - put(item: T): void { - this.putMany([item]); - } - - putMany(items: T[]): void { - if (this.freeSpace && items.length > this.freeSpace) { - throw new Error('Queue exceeds max size'); - } - this.items.push(...items); - } - - peek(): T | undefined { - return this.items[0]; - } - - take(): T | undefined { - return this.items.shift(); - } - - takeMany(size: number): T[] { - const sizeCapped = Math.min(this.size, size); - - const result = this.items.slice(0, sizeCapped); - this.items = this.items.slice(sizeCapped); - - return result; - } - - takeAll(): T[] { - const result = this.items; - - this.items = []; - return result; - } - - flush(): void { - this.takeAll(); - } -} - -type Task = () => Promise | T; +export type Task = () => Promise | T; type Action = { index: number; @@ -127,7 +58,7 @@ export class AutoQueue implements IQueue { capacity?: number, public concurrency = 1, private taskTimeoutSec = 900, - private name = 'Auto' + protected name = 'Auto' ) { this.queue = new Queue>(capacity); } diff --git a/packages/node-core/src/utils/queues/index.ts b/packages/node-core/src/utils/queues/index.ts new file mode 100644 index 0000000000..043c0f0fac --- /dev/null +++ b/packages/node-core/src/utils/queues/index.ts @@ -0,0 +1,6 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export * from './autoQueue'; +export * from './queue'; +export * from './rampQueue'; diff --git a/packages/node-core/src/utils/queues/queue.ts b/packages/node-core/src/utils/queues/queue.ts new file mode 100644 index 0000000000..7adda4a252 --- /dev/null +++ b/packages/node-core/src/utils/queues/queue.ts @@ -0,0 +1,72 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export interface IQueue { + size: number; + capacity: number | undefined; + freeSpace: number | undefined; + + flush(): void; +} + +export class Queue implements IQueue { + protected items: T[] = []; + private _capacity?: number; + + constructor(capacity?: number) { + this._capacity = capacity; + } + + get size(): number { + return this.items.length; + } + + get capacity(): number | undefined { + return this._capacity; + } + + get freeSpace(): number | undefined { + if (!this._capacity) return undefined; + + return this._capacity - this.size; + } + + put(item: T): void { + this.putMany([item]); + } + + putMany(items: T[]): void { + if (this.freeSpace && items.length > this.freeSpace) { + throw new Error('Queue exceeds max size'); + } + this.items.push(...items); + } + + peek(): T | undefined { + return this.items[0]; + } + + take(): T | undefined { + return this.items.shift(); + } + + takeMany(size: number): T[] { + const sizeCapped = Math.min(this.size, size); + + const result = this.items.slice(0, sizeCapped); + this.items = this.items.slice(sizeCapped); + + return result; + } + + takeAll(): T[] { + const result = this.items; + + this.items = []; + return result; + } + + flush(): void { + this.takeAll(); + } +} diff --git a/packages/node-core/src/utils/queues/rampQueue.spec.ts b/packages/node-core/src/utils/queues/rampQueue.spec.ts new file mode 100644 index 0000000000..78a7d4cb71 --- /dev/null +++ b/packages/node-core/src/utils/queues/rampQueue.spec.ts @@ -0,0 +1,51 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {Task} from './autoQueue'; +import {RampQueue} from './rampQueue'; + +const testSizes = [ + 4.448733415947499, 8.956082893010741, 1.0740251087438613, 7.874408027526685, 2.095588330681719, 9.877603058371257, + 0.26588775084013694, 5.3688758024204075, 4.671317302459488, 4.936036470903067, 7.463695662357379, 0.0711211590650418, + 4.841152155689485, 6.589399364540089, 5.33840341477309, 8.083555618831673, 0.9052566506797755, 5.9079018928373035, + 4.846492987181811, 8.245959543695747, 5.460921697188357, 0.7043806224121507, 2.3857434319496518, 2.6918821593173825, + 2.1284178622273875, 4.6586931023608, 5.541801214592789, 7.699660358512865, 1.7609267208636559, 7.374614871844476, + 3.467359287274774, 1.9130382916923239, 4.299989967525457, 8.700538279205558, 9.565834125047408, 1.8466949701526802, + 6.537691936703256, 1.768453891237669, 5.692631134946318, 8.628395051306788, 5.472044652806851, 9.927113870306119, + 4.003094529277136, 0.552330717827545, 3.5144104884478566, 7.449595823096287, 2.8985520335505566, 5.947189837345519, + 3.0711372037168583, 1.6113066655669672, 6.921119496961885, 4.719632718538214, 7.871754171657878, 7.955877161534463, + 3.397497333755941, 9.496938503818358, 1.3986331511241934, 5.622688974863552, 2.8435658149293097, 3.866639909234073, + 6.316575461599183, 3.141113012713883, 2.776692764764137, 5.831976543135431, 4.288044903146373, 6.048401548461621, + 2.6875146658212157, 6.822941828005713, 8.10971754598745, 0.3413062242535725, 8.897781544073045, 6.865489802832028, + 3.894256575593764, 5.862579766647702, 0.6062292779262113, 2.5020990968271106, 0.4564383819215645, 7.437457990684706, + 5.378929658223783, 6.404423211376513, 3.4856467856434903, 1.78756098752324, 5.06180877528712, 5.971995837564927, + 0.3329444767396139, 5.528156896524639, 2.6196190996965063, 6.971005783415343, 9.948474242499683, 3.011190899660412, + 8.133780227538313, 2.4909706549660715, 5.603675099720298, 5.136555674514199, 3.5004619769181144, 6.445875763339708, + 0.9132682345511878, 4.094728057715262, 8.94382906736585, 2.4066587617826785, +]; + +describe('RampQueue', () => { + it('Ramps correctly', async () => { + const queue = new RampQueue((v: number) => v, 10, 100); + + // Initial should always be 1 + expect(queue.concurrency).toBe(1); + + await queue.put(() => Promise.resolve(10)); + + // A single item shouldn't change the concurrenct + expect(queue.concurrency).toBe(1); + + // Fill up to initial capacity + const tasks: Task[] = new Array(99).fill(0).map((_, i) => () => Promise.resolve(testSizes[i])); + await Promise.all(queue.putMany(tasks)); + + // Concurrency should still be one since capacity has only just been reached, from now we should be adjusting capacity + // expect(queue.concurrency).toBe(1); + + const tasks1: Task[] = new Array(100).fill(0).map((_, i) => () => Promise.resolve(testSizes[i])); + await Promise.all(queue.putMany(tasks1)); + + expect(queue.concurrency).toBe(5); + }); +}); diff --git a/packages/node-core/src/utils/queues/rampQueue.ts b/packages/node-core/src/utils/queues/rampQueue.ts new file mode 100644 index 0000000000..0a100cbed5 --- /dev/null +++ b/packages/node-core/src/utils/queues/rampQueue.ts @@ -0,0 +1,92 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {getLogger} from '../../logger'; +import {AutoQueue, Task} from './autoQueue'; + +const median = (arr: number[]): number => { + if (!arr.length) return 0; + const s = [...arr].sort((a, b) => a - b); + const mid = Math.floor(s.length / 2); + return s.length % 2 ? s[mid] : (s[mid - 1] + s[mid]) / 2; +}; + +const MAX_SIZES = 1000; +const MIN_SIZES = 20; + +const logger = getLogger('RampQueue'); + +/** + * The ramp queue is an exension of the AutoQueue which dynamically adjusts the concurrency based on the getSize function. + * It will start at concurrency 1 and ramp up to the max concurrency unless there is a large jump above the median size then it will decrease. + * */ +export class RampQueue extends AutoQueue { + #maxConcurrency: number; + #sizes: number[] = []; + + constructor( + private getSize: (data: T) => number, + concurrency: number, + capacity?: number, + timeout?: number, + name?: string + ) { + super(capacity, 1, timeout, name); + this.#maxConcurrency = concurrency; + } + + async put(item: Task): Promise { + return this.putMany([item])[0]; + } + + putMany(tasks: Task[]): Promise[] { + return super.putMany(tasks).map((r) => + r.then((d) => { + this.adjustConcurrency(d); + return d; + }) + ); + } + + private setConcurrency(newConcurrency: number) { + const clamped = Math.max(1, Math.min(Math.floor(newConcurrency), this.#maxConcurrency)); + if (clamped > this.concurrency) { + logger.debug(`${this.name} increased concurrency to ${clamped}`); + } else if (clamped < this.concurrency) { + logger.debug(`${this.name} decreased concurrency to ${clamped}`); + } + this.concurrency = clamped; + } + + private adjustConcurrency(data: T): void { + try { + const m = median(this.#sizes); + const size = this.getSize(data); + + this.addSize(size); + + // Not enough data to construct a median + if (this.#sizes.length < MIN_SIZES) { + return; + } + + if (size > m * 2) { + // Inverse of the size compared to the median. E.g if a block is 5x as big as the median then the batch size should be 1/5 of the max + const multiplier = 1 / (size / m); + this.setConcurrency(this.#maxConcurrency * multiplier); + } else if (this.#sizes.length % MIN_SIZES === 0) { + // Increase by 10% of max + this.setConcurrency(this.concurrency + Math.floor(this.#maxConcurrency / 10)); + } + } catch (e) { + console.log('Failed to adjust concurrency', e); + } + } + + private addSize(size: number): void { + if (this.#sizes.length >= MAX_SIZES) { + this.#sizes.shift(); + } + this.#sizes.push(size); + } +} diff --git a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts index 91d4178cf8..c3301de374 100644 --- a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts @@ -15,7 +15,7 @@ import { IBlock, IStoreModelProvider, } from '@subql/node-core'; -import { SubstrateDatasource } from '@subql/types'; +import { SubstrateBlock, SubstrateDatasource } from '@subql/types'; import { SubqueryProject } from '../../configure/SubqueryProject'; import { ApiService } from '../api.service'; import { IndexerManager } from '../indexer.manager'; @@ -101,4 +101,13 @@ export class BlockDispatcherService runtimeVersion, ); } + + protected getBlockSize( + block: IBlock, + ): number { + return block.block.events.reduce( + (acc, evt) => acc + evt.encodedLength, + (block.block.block as SubstrateBlock)?.encodedLength ?? 0, + ); + } } diff --git a/packages/node/src/utils/substrate.ts b/packages/node/src/utils/substrate.ts index 882a41d027..0eaa80f5e9 100644 --- a/packages/node/src/utils/substrate.ts +++ b/packages/node/src/utils/substrate.ts @@ -29,7 +29,7 @@ import { SubstrateExtrinsic, BlockHeader, } from '@subql/types'; -import { merge, range } from 'lodash'; +import { merge } from 'lodash'; import { SubqlProjectBlockFilter } from '../configure/SubqueryProject'; import { ApiPromiseConnection } from '../indexer/apiPromise.connection'; import { BlockContent, LightBlockContent } from '../indexer/types'; @@ -288,6 +288,7 @@ export async function getBlockByHeight( ); throw ApiPromiseConnection.handleError(e); }); + // validate block is valid if (block.block.header.hash.toHex() !== blockHash.toHex()) { throw new Error( From 0c969d4fbbb97a3a98f67298091e6a13eb6b7a1e Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 2 Dec 2024 11:41:46 +1300 Subject: [PATCH 2/9] Use RampQueue with workers --- .../src/indexer/worker/worker.service.ts | 22 ++++++++++++------- packages/node/src/indexer/types.ts | 10 +++++++++ .../node/src/indexer/worker/worker.service.ts | 13 ++++++++++- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/packages/node-core/src/indexer/worker/worker.service.ts b/packages/node-core/src/indexer/worker/worker.service.ts index 1332ee0134..6f6fa7b6cc 100644 --- a/packages/node-core/src/indexer/worker/worker.service.ts +++ b/packages/node-core/src/indexer/worker/worker.service.ts @@ -30,35 +30,41 @@ export abstract class BaseWorkerService< private fetchedBlocks: Record> = {}; private _isIndexing = false; - private queue: AutoQueue; + private queue: AutoQueue>; protected abstract fetchChainBlock(heights: number, extra: E): Promise>; protected abstract toBlockResponse(block: B): R; protected abstract processFetchedBlock(block: IBlock, dataSources: DS[]): Promise; - // protected abstract getBlockSize(block: IBlock): number; + protected abstract getBlockSize(block: IBlock): number; constructor( private projectService: IProjectService, private projectUpgradeService: IProjectUpgradeService, nodeConfig: NodeConfig ) { - // this.queue = new RampQueue(this.getBlockSize.bind(this), nodeConfig.batchSize, undefined, nodeConfig.timeout, 'WorkerService'); - this.queue = new AutoQueue(undefined, nodeConfig.batchSize, nodeConfig.timeout, 'Worker Service'); + this.queue = new RampQueue( + this.getBlockSize.bind(this), + nodeConfig.batchSize, + undefined, + nodeConfig.timeout, + 'WorkerService' + ); } async fetchBlock(height: number, extra: E): Promise { try { - return await this.queue.put(async () => { + const block = await this.queue.put(async () => { // If a dynamic ds is created we might be asked to fetch blocks again, use existing result if (!this.fetchedBlocks[height]) { const block = await this.fetchChainBlock(height, extra); this.fetchedBlocks[height] = block; } - const block = this.fetchedBlocks[height]; - // Return info to get the runtime version, this lets the worker thread know - return this.toBlockResponse(block.block); + return this.fetchedBlocks[height]; }); + + // Return info to get the runtime version, this lets the worker thread know + return this.toBlockResponse(block.block); } catch (e: any) { if (!isTaskFlushedError(e)) { logger.error(e, `Failed to fetch block ${height}`); diff --git a/packages/node/src/indexer/types.ts b/packages/node/src/indexer/types.ts index ed803830d8..b13e8bf070 100644 --- a/packages/node/src/indexer/types.ts +++ b/packages/node/src/indexer/types.ts @@ -4,6 +4,7 @@ import { ApiPromise } from '@polkadot/api'; import { ApiDecoration } from '@polkadot/api/types'; import type { HexString } from '@polkadot/util/types'; +import { IBlock } from '@subql/node-core'; import { BlockHeader, LightSubstrateEvent, @@ -32,3 +33,12 @@ export function isFullBlock( ): block is BlockContent { return (block as BlockContent).extrinsics !== undefined; } + +export function getBlockSize( + block: IBlock, +): number { + return block.block.events.reduce( + (acc, evt) => acc + evt.encodedLength, + (block.block.block as SubstrateBlock)?.encodedLength ?? 0, + ); +} diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index 5ed1d0e69d..6ea497f36e 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -17,7 +17,12 @@ import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary'; import { IndexerManager } from '../indexer.manager'; import { WorkerRuntimeService } from '../runtime/workerRuntimeService'; -import { BlockContent, isFullBlock, LightBlockContent } from '../types'; +import { + BlockContent, + getBlockSize, + isFullBlock, + LightBlockContent, +} from '../types'; export type FetchBlockResponse = Header & { specVersion?: number }; @@ -68,6 +73,12 @@ export class WorkerService extends BaseWorkerService< }; } + protected getBlockSize( + block: IBlock, + ): number { + return getBlockSize(block); + } + protected async processFetchedBlock( block: IBlock, dataSources: SubstrateDatasource[], From 4a1f2b0380cd90ad7733d67b7980e12e5d59c4bb Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 2 Dec 2024 14:00:54 +1300 Subject: [PATCH 3/9] Fix tests --- packages/node-core/src/indexer/fetch.service.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 8e0abf229f..507911c40a 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -207,6 +207,14 @@ export abstract class BaseFetchService latestHeight + ); if (this.blockDispatcher.freeSize < scaledBatchSize || startBlockHeight > latestHeight) { if (this.blockDispatcher.freeSize < scaledBatchSize) { logger.debug( @@ -223,7 +231,7 @@ export abstract class BaseFetchService { + console.log('FFFF'); // End height from current dataSource const {endHeight, value: relevantDs} = this.getRelevantDsDetails(startBlockHeight); // Estimated range end height @@ -328,6 +338,7 @@ export abstract class BaseFetchService Date: Mon, 2 Dec 2024 14:37:41 +1300 Subject: [PATCH 4/9] Fix circular imports --- .../node-core/src/db/migration-service/migration.ts | 12 +++++------- packages/node-core/src/db/sync-helper.ts | 6 +++--- packages/node-core/src/indexer/sandbox.ts | 2 +- .../node-core/src/subcommands/forceClean.service.ts | 3 +-- packages/node-core/src/utils/graphql.ts | 6 +++++- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/node-core/src/db/migration-service/migration.ts b/packages/node-core/src/db/migration-service/migration.ts index aa5cdf35cb..469e7cc02f 100644 --- a/packages/node-core/src/db/migration-service/migration.ts +++ b/packages/node-core/src/db/migration-service/migration.ts @@ -23,12 +23,10 @@ import {isEqual, uniq} from 'lodash'; import {NodeConfig} from '../../configure/NodeConfig'; import {HistoricalMode, StoreService} from '../../indexer'; import {getLogger} from '../../logger'; -import {EnumType, getColumnOption, modelsTypeToModelAttributes} from '../../utils'; +import {EnumType, getColumnOption, modelsTypeToModelAttributes, enumNameToHash} from '../../utils'; import {formatAttributes, formatColumnName, modelToTableName} from '../sequelizeUtil'; import * as syncHelper from '../sync-helper'; -type RemovedIndexes = Record; - const logger = getLogger('db-manager'); export class Migration { @@ -358,8 +356,8 @@ export class Migration { // It is difficult for sequelize use replacement, instead we use escape to avoid injection // UPDATE: this comment got syntax error with cockroach db, disable it for now. Waiting to be fixed. // See https://github.com/cockroachdb/cockroach/issues/44135 - const enumTypeName = syncHelper.enumNameToHash(e.name); - const enumTypeNameDeprecated = `${this.schemaName}_enum_${syncHelper.enumNameToHash(e.name)}`; + const enumTypeName = enumNameToHash(e.name); + const enumTypeNameDeprecated = `${this.schemaName}_enum_${enumNameToHash(e.name)}`; let type: string | null = null; @@ -397,8 +395,8 @@ export class Migration { } dropEnum(e: GraphQLEnumsType): void { - const enumTypeName = syncHelper.enumNameToHash(e.name); - const enumTypeNameDeprecated = `${this.schemaName}_enum_${syncHelper.enumNameToHash(e.name)}`; + const enumTypeName = enumNameToHash(e.name); + const enumTypeNameDeprecated = `${this.schemaName}_enum_${enumNameToHash(e.name)}`; [enumTypeName, enumTypeNameDeprecated].forEach((typeName) => { if (this.enumTypeMap.has(typeName)) { diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index 4c15a3d816..8cdd15f6a8 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -297,9 +297,9 @@ export function createSchemaTriggerFunction(schema: string): string { $$ LANGUAGE plpgsql;`; } -export function enumNameToHash(enumName: string): string { - return blake2AsHex(enumName).substr(2, 10); -} +// export function enumNameToHash(enumName: string): string { +// return blake2AsHex(enumName).substr(2, 10); +// } export function getExistedIndexesQuery(schema: string): string { return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`; diff --git a/packages/node-core/src/indexer/sandbox.ts b/packages/node-core/src/indexer/sandbox.ts index 2aaddd7332..099d2c8418 100644 --- a/packages/node-core/src/indexer/sandbox.ts +++ b/packages/node-core/src/indexer/sandbox.ts @@ -11,7 +11,7 @@ import {SourceMapConsumer, NullableMappedPosition} from 'source-map'; import {NodeVM, NodeVMOptions, VMError, VMScript} from 'vm2'; import {NodeConfig} from '../configure/NodeConfig'; import {getLogger} from '../logger'; -import {timeout} from '../utils'; +import {timeout} from '../utils/promise'; export const SANDBOX_DEFAULT_BUILTINS = ['assert', 'buffer', 'crypto', 'util', 'path', 'url', 'stream']; diff --git a/packages/node-core/src/subcommands/forceClean.service.ts b/packages/node-core/src/subcommands/forceClean.service.ts index 5e3a1ab8e7..54ad7289dc 100644 --- a/packages/node-core/src/subcommands/forceClean.service.ts +++ b/packages/node-core/src/subcommands/forceClean.service.ts @@ -5,11 +5,10 @@ import {Inject, Injectable} from '@nestjs/common'; import {getAllEntitiesRelations} from '@subql/utils'; import {QueryTypes, Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../configure'; -import {enumNameToHash} from '../db'; import {MonitorService} from '../indexer'; import {ISubqueryProject} from '../indexer/types'; import {getLogger} from '../logger'; -import {getEnumDeprecated, getExistingProjectSchema} from '../utils'; +import {getEnumDeprecated, getExistingProjectSchema, enumNameToHash} from '../utils'; const logger = getLogger('Force-clean'); diff --git a/packages/node-core/src/utils/graphql.ts b/packages/node-core/src/utils/graphql.ts index 3e48958f2c..62fd243294 100644 --- a/packages/node-core/src/utils/graphql.ts +++ b/packages/node-core/src/utils/graphql.ts @@ -13,10 +13,14 @@ import { isNull, GraphQLEntityField, GraphQLJsonFieldType, + blake2AsHex, } from '@subql/utils'; import {ModelAttributes, ModelAttributeColumnOptions} from '@subql/x-sequelize'; import {isArray, isObject} from 'lodash'; -import {enumNameToHash} from '../db'; + +export function enumNameToHash(enumName: string): string { + return blake2AsHex(enumName).substr(2, 10); +} export interface EnumType { enumValues: string[]; From 4e242431c8212d5f289d119edc9b83e96dc2e71d Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 2 Dec 2024 15:12:46 +1300 Subject: [PATCH 5/9] Remove irrelevant tests --- .../worker-block-dispatcher.spec.ts | 80 ------------------- .../src/utils/queues/rampQueue.spec.ts | 4 +- 2 files changed, 2 insertions(+), 82 deletions(-) delete mode 100644 packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts deleted file mode 100644 index e0d49ecc10..0000000000 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { IProjectUpgradeService, NodeConfig } from '../../configure'; -import { PoiSyncService } from '../poi'; -import { StoreService } from '../store.service'; -import { StoreCacheService } from '../storeModelProvider'; -import { Header, IProjectService, ISubqueryProject } from '../types'; -import { WorkerBlockDispatcher } from './worker-block-dispatcher'; - -class TestWorkerBlockDispatcher extends WorkerBlockDispatcher { - async fetchBlock(worker: any, height: number): Promise
{ - return Promise.resolve({ - blockHeight: height, - } as Header); - } - - get minimumHeapLimit(): number { - return 150; - } -} -describe('WorkerBlockDispatcher', () => { - let dispatcher: WorkerBlockDispatcher; - - // Mock workers - const mockWorkers = [ - { getMemoryLeft: jest.fn().mockResolvedValue(100), waitForWorkerBatchSize: jest.fn().mockResolvedValue(undefined) }, - { getMemoryLeft: jest.fn().mockResolvedValue(200), waitForWorkerBatchSize: jest.fn().mockResolvedValue(undefined) }, - { getMemoryLeft: jest.fn().mockResolvedValue(300), waitForWorkerBatchSize: jest.fn().mockResolvedValue(undefined) }, - ]; - - beforeEach(() => { - dispatcher = new TestWorkerBlockDispatcher( - { workers: 3 } as unknown as NodeConfig, - null as unknown as EventEmitter2, - null as unknown as IProjectService, - null as unknown as IProjectUpgradeService, - null as unknown as StoreService, - null as unknown as StoreCacheService, - null as unknown as PoiSyncService, - null as unknown as ISubqueryProject, - null as unknown as () => Promise - ); - (dispatcher as any).workers = mockWorkers; - }); - - afterEach(() => { - jest.clearAllMocks(); - }); - - test('getNextWorkerIndex should return the index of the next worker that has memory above the minimum limit', async () => { - const index = await (dispatcher as any).getNextWorkerIndex(); - expect(index).toBe(1); - }); - - test('getNextWorkerIndex should skip workers that have memory below the minimum limit', async () => { - // Make the first worker return memory below the limit - mockWorkers[1].getMemoryLeft.mockResolvedValue(100); - - const index = await (dispatcher as any).getNextWorkerIndex(); - expect(index).toBe(2); - }); - - test('getNextWorkerIndex should wait for memory to be freed if all workers have memory below the minimum limit', async () => { - // Make all workers return memory below the limit - mockWorkers[0].getMemoryLeft.mockResolvedValue(100); - mockWorkers[1].getMemoryLeft.mockResolvedValue(100); - mockWorkers[2].getMemoryLeft.mockResolvedValue(100); - - // Make the first worker free up memory after waiting - mockWorkers[0].waitForWorkerBatchSize.mockImplementationOnce(() => { - mockWorkers[0].getMemoryLeft.mockResolvedValue(200); - return Promise.resolve(); - }); - - const index = await (dispatcher as any).getNextWorkerIndex(); - expect(index).toBe(0); - }); -}); diff --git a/packages/node-core/src/utils/queues/rampQueue.spec.ts b/packages/node-core/src/utils/queues/rampQueue.spec.ts index 78a7d4cb71..e144084a05 100644 --- a/packages/node-core/src/utils/queues/rampQueue.spec.ts +++ b/packages/node-core/src/utils/queues/rampQueue.spec.ts @@ -1,8 +1,8 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import {Task} from './autoQueue'; -import {RampQueue} from './rampQueue'; +import { Task } from './autoQueue'; +import { RampQueue } from './rampQueue'; const testSizes = [ 4.448733415947499, 8.956082893010741, 1.0740251087438613, 7.874408027526685, 2.095588330681719, 9.877603058371257, From ce2d0ddd11991cde5641516453eed4037d0f7bff Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 2 Dec 2024 15:29:35 +1300 Subject: [PATCH 6/9] Update changelogs --- packages/node-core/CHANGELOG.md | 6 ++++++ packages/node/CHANGELOG.md | 2 ++ 2 files changed, 8 insertions(+) diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index 6a4dd0af24..51fd5c761b 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Removed +- SmartBatchService as it didn't function as intended (#2611) + +### Changed +- Implement new RampQueue to dynamically scale block fetching concurrency, this helps indexing larger blocks (#2611) +- Memoize promises to get finalized and best blocks (#2611) ## [15.0.3] - 2024-11-26 ### Fixed diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index f6dc390e5b..58fc9c94d8 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- Add block size function to match breaking node-core change (#2611) ## [5.4.2] - 2024-11-26 ### Fixed From a8589126a40e0b6ff05980d1b24bafac9a00e1b3 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 2 Dec 2024 15:47:08 +1300 Subject: [PATCH 7/9] Clean up --- packages/node-core/src/db/sync-helper.ts | 4 ---- .../indexer/blockDispatcher/base-block-dispatcher.ts | 1 - packages/node-core/src/indexer/fetch.service.ts | 12 ------------ 3 files changed, 17 deletions(-) diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index 8cdd15f6a8..c0d4e73acd 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -297,10 +297,6 @@ export function createSchemaTriggerFunction(schema: string): string { $$ LANGUAGE plpgsql;`; } -// export function enumNameToHash(enumName: string): string { -// return blake2AsHex(enumName).substr(2, 10); -// } - export function getExistedIndexesQuery(schema: string): string { return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`; } diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 24825dd3b9..f2b4eb862d 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -82,7 +82,6 @@ export abstract class BaseBlockDispatcher implements IB } get batchSize(): number { - // TODO make this smarter return this.nodeConfig.batchSize; } diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 507911c40a..42bb2d52ba 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -207,14 +207,6 @@ export abstract class BaseFetchService latestHeight - ); if (this.blockDispatcher.freeSize < scaledBatchSize || startBlockHeight > latestHeight) { if (this.blockDispatcher.freeSize < scaledBatchSize) { logger.debug( @@ -268,7 +260,6 @@ export abstract class BaseFetchService { - console.log('FFFF'); // End height from current dataSource const {endHeight, value: relevantDs} = this.getRelevantDsDetails(startBlockHeight); // Estimated range end height @@ -338,7 +328,6 @@ export abstract class BaseFetchService Date: Tue, 3 Dec 2024 13:55:03 +1300 Subject: [PATCH 8/9] Address comments --- .../src/indexer/blockDispatcher/block-dispatcher.ts | 1 - packages/node-core/src/utils/queues/rampQueue.ts | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 5c92cd7d71..151aed8cb7 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -16,7 +16,6 @@ import {StoreService} from '../store.service'; import {IStoreModelProvider} from '../storeModelProvider'; import {IProjectService, ISubqueryProject} from '../types'; import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; -// import { RampQueue } from '@subql/node-core/utils/rampQueue'; const logger = getLogger('BlockDispatcherService'); diff --git a/packages/node-core/src/utils/queues/rampQueue.ts b/packages/node-core/src/utils/queues/rampQueue.ts index 0a100cbed5..a39a6f3fe7 100644 --- a/packages/node-core/src/utils/queues/rampQueue.ts +++ b/packages/node-core/src/utils/queues/rampQueue.ts @@ -23,6 +23,7 @@ const logger = getLogger('RampQueue'); export class RampQueue extends AutoQueue { #maxConcurrency: number; #sizes: number[] = []; + #totalItems = 0; constructor( private getSize: (data: T) => number, @@ -72,9 +73,9 @@ export class RampQueue extends AutoQueue { if (size > m * 2) { // Inverse of the size compared to the median. E.g if a block is 5x as big as the median then the batch size should be 1/5 of the max - const multiplier = 1 / (size / m); + const multiplier = m / size; this.setConcurrency(this.#maxConcurrency * multiplier); - } else if (this.#sizes.length % MIN_SIZES === 0) { + } else if (this.#totalItems % MIN_SIZES === 0) { // Increase by 10% of max this.setConcurrency(this.concurrency + Math.floor(this.#maxConcurrency / 10)); } @@ -87,6 +88,7 @@ export class RampQueue extends AutoQueue { if (this.#sizes.length >= MAX_SIZES) { this.#sizes.shift(); } + this.#totalItems++; this.#sizes.push(size); } } From f730b7d0a17a9d9fb3efc5e921d0b42e4e4dda66 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Wed, 4 Dec 2024 10:49:33 +1300 Subject: [PATCH 9/9] Address comments --- .../worker-block-dispatcher.ts | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 7ffa065f65..9e90e47ba8 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -9,7 +9,7 @@ import {last} from 'lodash'; import {NodeConfig} from '../../configure'; import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service'; import {IndexerEvent} from '../../events'; -import {IBlock, PoiSyncService} from '../../indexer'; +import {IBlock, PoiSyncService, WorkerStatusResponse} from '../../indexer'; import {getLogger} from '../../logger'; import {monitorWrite} from '../../process'; import {AutoQueue, isTaskFlushedError} from '../../utils'; @@ -18,13 +18,13 @@ import {StoreService} from '../store.service'; import {IStoreModelProvider} from '../storeModelProvider'; import {ISubqueryProject, IProjectService, Header} from '../types'; import {isBlockUnavailableError} from '../worker/utils'; -import {BaseBlockDispatcher} from './base-block-dispatcher'; +import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; const logger = getLogger('WorkerBlockDispatcherService'); type Worker = { - processBlock: (height: number) => Promise; - getStatus: () => Promise; + processBlock: (height: number) => Promise; + getStatus: () => Promise; getMemoryLeft: () => Promise; terminate: () => Promise; }; @@ -110,17 +110,13 @@ export abstract class WorkerBlockDispatcher // eslint-disable-next-line no-constant-condition if (true) { - let startIndex = 0; - while (startIndex < heights.length) { - const workerIdx = await this.getNextWorkerIndex(); - const batchSize = heights.length - startIndex; - await Promise.all( - heights - .slice(startIndex, startIndex + batchSize) - .map((height) => this.enqueueBlock(height as number, workerIdx)) - ); - startIndex += batchSize; - } + /* + * Load balancing: + * worker1: 1,2,3 + * worker2: 4,5,6 + */ + const workerIdx = await this.getNextWorkerIndex(); + heights.map((height) => this.enqueueBlock(height as number, workerIdx)); } else { /* * Load balancing: @@ -133,8 +129,7 @@ export abstract class WorkerBlockDispatcher this.latestBufferedHeight = latestBufferHeight ?? last(heights as number[]) ?? this.latestBufferedHeight; } - // eslint-disable-next-line @typescript-eslint/require-await - private async enqueueBlock(height: number, workerIdx: number): Promise { + private enqueueBlock(height: number, workerIdx: number): void { if (this.isShutdown) return; const worker = this.workers[workerIdx]; @@ -206,9 +201,13 @@ export abstract class WorkerBlockDispatcher }); } + // Finds the minimum toFetchBlocks amongst workers then randomly selects from onese that have a matching minimum private async getNextWorkerIndex(): Promise { - return Promise.all(this.workers.map((worker) => worker.getMemoryLeft())).then((memoryLeftValues) => { - return memoryLeftValues.indexOf(Math.max(...memoryLeftValues)); - }); + const statuses = await Promise.all(this.workers.map((worker) => worker.getStatus())); + const metric = statuses.map((s) => s.toFetchBlocks); + const lowest = statuses.filter((s) => s.toFetchBlocks === Math.min(...metric)); + const randIndex = Math.floor(Math.random() * lowest.length); + + return lowest[randIndex].threadId - 1; } }