Skip to content

Commit

Permalink
Remove scale-batch-size flag that had no functionality (#2275)
Browse files Browse the repository at this point in the history
* Remove scale-batch-size flag that had no functionality

* Update changelog

* Tidy up
  • Loading branch information
stwiname authored Feb 27, 2024
1 parent 908ce4c commit c54816e
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 56 deletions.
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Removed
- `scale-batch-size` flag as it had no use (#2275)

## [7.3.0] - 2024-02-23
### Added
Expand Down
5 changes: 0 additions & 5 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ export interface IConfig {
readonly storeCacheUpperLimit: number;
readonly storeGetCacheSize: number;
readonly storeCacheAsync: boolean;
readonly scaleBatchSize?: boolean;
readonly storeFlushInterval: number;
readonly isTest?: boolean;
readonly root?: string;
Expand Down Expand Up @@ -275,10 +274,6 @@ export class NodeConfig<C extends IConfig = IConfig> implements IConfig {
return !!this._config.pgCa;
}

get scaleBatchSize(): boolean {
return !!this._config.scaleBatchSize;
}

get postgresCACert(): string | undefined {
if (!this._config.pgCa) {
return undefined;
Expand Down
17 changes: 2 additions & 15 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
import assert from 'assert';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval, SchedulerRegistry} from '@nestjs/schedule';
import {SchedulerRegistry} from '@nestjs/schedule';
import {DictionaryQueryEntry, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {range, uniq, without} from 'lodash';
import {NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
import {checkMemoryUsage, cleanedBatchBlocks, delay, transformBypassBlocks, waitForBatchSize} from '../utils';
import {cleanedBatchBlocks, delay, transformBypassBlocks, waitForBatchSize} from '../utils';
import {IBlockDispatcher} from './blockDispatcher';
import {DictionaryService} from './dictionary.service';
import {DynamicDsService} from './dynamic-ds.service';
import {IProjectService} from './types';

const logger = getLogger('FetchService');
const CHECK_MEMORY_INTERVAL = 60000;

export abstract class BaseFetchService<
DS extends BaseDataSource,
Expand All @@ -28,7 +27,6 @@ export abstract class BaseFetchService<
private _latestBestHeight?: number;
private _latestFinalizedHeight?: number;
private isShutdown = false;
private batchSizeScale = 1;
private bypassBlocks: number[] = [];

protected abstract buildDictionaryQueryEntries(dataSources: DS[]): DictionaryQueryEntry[];
Expand Down Expand Up @@ -159,17 +157,6 @@ export abstract class BaseFetchService<
return this.latestFinalizedHeight;
}

@Interval(CHECK_MEMORY_INTERVAL)
checkBatchScale(): void {
if (this.nodeConfig.scaleBatchSize) {
const scale = checkMemoryUsage(this.batchSizeScale, this.nodeConfig);

if (this.batchSizeScale !== scale) {
this.batchSizeScale = scale;
}
}
}

async getFinalizedBlockHead(): Promise<void> {
try {
const currentFinalizedHeight = await this.getFinalizedHeight();
Expand Down
30 changes: 0 additions & 30 deletions packages/node-core/src/utils/batch-size.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,10 @@

import {getHeapStatistics} from 'v8';
import {Mutex} from 'async-mutex';
import {NodeConfig} from '../configure/NodeConfig';
import {getLogger} from '../logger';

const HIGH_THRESHOLD = 0.85;
const LOW_THRESHOLD = 0.6;

const logger = getLogger('memory');

export function checkMemoryUsage(batchSizeScale: number, nodeConfig: NodeConfig): number {
const memoryData = getHeapStatistics();
const ratio = memoryData.used_heap_size / memoryData.heap_size_limit;
if (nodeConfig.profiler) {
logger.info(`Heap Statistics: ${JSON.stringify(memoryData)}`);
logger.info(`Heap Usage: ${ratio}`);
}

let scale = batchSizeScale;

if (ratio > HIGH_THRESHOLD) {
if (scale > 0) {
scale = Math.max(scale - 0.1, 0);
logger.debug(`Heap usage: ${ratio}, decreasing batch size by 10%`);
}
}

if (ratio < LOW_THRESHOLD) {
if (scale < 1) {
scale = Math.min(scale + 0.1, 1);
logger.debug(`Heap usage: ${ratio} increasing batch size by 10%`);
}
}
return scale;
}

export const memoryLock = new Mutex();

export async function waitForBatchSize(sizeInBytes: number): Promise<void> {
Expand Down
6 changes: 0 additions & 6 deletions packages/node-core/src/yargs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,6 @@ export function yargsBuilder<
type: 'number',
default: 100,
},
'scale-batch-size': {
type: 'boolean',
demandOption: false,
describe: 'scale batch size based on memory usage',
default: false,
},
'store-cache-threshold': {
demandOption: false,
describe: 'Store cache will flush data to the database when number of records excess this threshold',
Expand Down

0 comments on commit c54816e

Please sign in to comment.