Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Dec 3, 2024
1 parent 33221f7 commit f730b7d
Showing 1 changed file with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {last} from 'lodash';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {IBlock, PoiSyncService} from '../../indexer';
import {IBlock, PoiSyncService, WorkerStatusResponse} from '../../indexer';
import {getLogger} from '../../logger';
import {monitorWrite} from '../../process';
import {AutoQueue, isTaskFlushedError} from '../../utils';
Expand All @@ -18,13 +18,13 @@ import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {ISubqueryProject, IProjectService, Header} from '../types';
import {isBlockUnavailableError} from '../worker/utils';
import {BaseBlockDispatcher} from './base-block-dispatcher';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';

const logger = getLogger('WorkerBlockDispatcherService');

type Worker = {
processBlock: (height: number) => Promise<any>;
getStatus: () => Promise<any>;
processBlock: (height: number) => Promise<ProcessBlockResponse>;
getStatus: () => Promise<WorkerStatusResponse>;
getMemoryLeft: () => Promise<number>;
terminate: () => Promise<number>;
};
Expand Down Expand Up @@ -110,17 +110,13 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>

// eslint-disable-next-line no-constant-condition
if (true) {
let startIndex = 0;
while (startIndex < heights.length) {
const workerIdx = await this.getNextWorkerIndex();
const batchSize = heights.length - startIndex;
await Promise.all(
heights
.slice(startIndex, startIndex + batchSize)
.map((height) => this.enqueueBlock(height as number, workerIdx))
);
startIndex += batchSize;
}
/*
* Load balancing:
* worker1: 1,2,3
* worker2: 4,5,6
*/
const workerIdx = await this.getNextWorkerIndex();
heights.map((height) => this.enqueueBlock(height as number, workerIdx));
} else {
/*
* Load balancing:
Expand All @@ -133,8 +129,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
this.latestBufferedHeight = latestBufferHeight ?? last(heights as number[]) ?? this.latestBufferedHeight;
}

// eslint-disable-next-line @typescript-eslint/require-await
private async enqueueBlock(height: number, workerIdx: number): Promise<void> {
private enqueueBlock(height: number, workerIdx: number): void {
if (this.isShutdown) return;
const worker = this.workers[workerIdx];

Expand Down Expand Up @@ -206,9 +201,13 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
});
}

// Finds the minimum toFetchBlocks amongst workers then randomly selects from onese that have a matching minimum
private async getNextWorkerIndex(): Promise<number> {
return Promise.all(this.workers.map((worker) => worker.getMemoryLeft())).then((memoryLeftValues) => {
return memoryLeftValues.indexOf(Math.max(...memoryLeftValues));
});
const statuses = await Promise.all(this.workers.map((worker) => worker.getStatus()));
const metric = statuses.map((s) => s.toFetchBlocks);
const lowest = statuses.filter((s) => s.toFetchBlocks === Math.min(...metric));
const randIndex = Math.floor(Math.random() * lowest.length);

return lowest[randIndex].threadId - 1;
}
}

0 comments on commit f730b7d

Please sign in to comment.