Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/multi endpint delay #2582

Merged
merged 9 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- When configuring multiple endpoints, poor network conditions may lead to block crawling delays. (#2572)

## [14.1.7] - 2024-10-30
### Changed
- Bump `@subql/common` dependency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {delay} from '@subql/common';
import {ApiErrorType, ConnectionPoolStateManager, IApiConnectionSpecific, NodeConfig} from '..';
import {ApiErrorType, ConnectionPoolStateManager, getLogger, IApiConnectionSpecific, NodeConfig} from '..';
import {ConnectionPoolService} from './connectionPool.service';

async function waitFor(conditionFn: () => boolean, timeout = 50000, interval = 100): Promise<void> {
Expand Down
7 changes: 0 additions & 7 deletions packages/node-core/src/indexer/connectionPool.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
if (prop === 'fetchBlocks') {
return async (heights: number[], ...args: any): Promise<any> => {
try {
// Check if the endpoint is rate-limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still necessary or there will be no throttling

if (await this.poolStateManager.getFieldValue(endpoint, 'rateLimited')) {
logger.info('throtling on ratelimited endpoint');
const backoffDelay = await this.poolStateManager.getFieldValue(endpoint, 'backoffDelay');
await delay(backoffDelay / 1000);
}

const start = Date.now();
const result = await target.fetchBlocks(heights, ...args);
const end = Date.now();
Expand Down
113 changes: 66 additions & 47 deletions packages/node-core/src/indexer/connectionPoolState.manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {ConnectionPoolStateManager} from './connectionPoolState.manager';
import {ApiErrorType} from '../api.connection.error';
import {ConnectionPoolItem, ConnectionPoolStateManager} from './connectionPoolState.manager';

describe('ConnectionPoolStateManager', function () {
let connectionPoolStateManager: ConnectionPoolStateManager<any>;
Expand All @@ -12,62 +13,54 @@ describe('ConnectionPoolStateManager', function () {
connectionPoolStateManager = new ConnectionPoolStateManager();
});

afterEach(async function () {
await connectionPoolStateManager.onApplicationShutdown();
});

it('chooses primary endpoint first', async function () {
(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT1] = {
primary: true,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: true,
lastRequestTime: 0,
};

(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] = {
primary: false,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: true,
lastRequestTime: 0,
};
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, true);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

expect(await connectionPoolStateManager.getNextConnectedEndpoint()).toEqual(EXAMPLE_ENDPOINT1);
});

it('does not choose primary endpoint if failed', async function () {
(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT1] = {
primary: true,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: false,
lastRequestTime: 0,
};

(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] = {
primary: false,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: true,
lastRequestTime: 0,
};
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, true);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default);

expect(await connectionPoolStateManager.getNextConnectedEndpoint()).toEqual(EXAMPLE_ENDPOINT2);
});

it('All endpoints backoff; select a rateLimited endpoint. reason: ApiErrorType.Timeout', async function () {
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, false);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default);
await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT2, ApiErrorType.Timeout);

const nextEndpoint = await connectionPoolStateManager.getNextConnectedEndpoint();
const endpointInfo = (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] as ConnectionPoolItem<any>;
expect(nextEndpoint).toEqual(EXAMPLE_ENDPOINT2);
expect(endpointInfo.rateLimited).toBe(true);
expect((connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2].rateLimitDelay).toBe(20 * 1000);
});

it('All endpoints backoff; select a rateLimited endpoint. reason: ApiErrorType.RateLimit', async function () {
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, false);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default);
await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT2, ApiErrorType.RateLimit);

const nextEndpoint = await connectionPoolStateManager.getNextConnectedEndpoint();
const endpointInfo = (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] as ConnectionPoolItem<any>;
expect(nextEndpoint).toEqual(EXAMPLE_ENDPOINT2);
expect(endpointInfo.rateLimited).toBe(true);
expect((connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2].rateLimitDelay).toBe(20 * 1000);
});

it('can calculate performance score for response time of zero', function () {
const score = (connectionPoolStateManager as any).calculatePerformanceScore(0, 0);
expect(score).not.toBeNaN();
Expand All @@ -78,4 +71,30 @@ describe('ConnectionPoolStateManager', function () {
const score2 = (connectionPoolStateManager as any).calculatePerformanceScore(2, 0);
expect(score1).toBeGreaterThan(score2);
});

it('backoff delay rule', function () {
const delay1 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 0});
expect(delay1).toBe(0);

const delay2 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 1});
expect(delay2).toBe(10000);

const delay3 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 2});
expect(delay3).toBe(20000);

const delay4 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 3});
expect(delay4).toBe(40000);

const delay5 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 4});
expect(delay5).toBe(80000);

const delay6 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 5});
expect(delay6).toBe(160000);

const delay7 = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 6});
expect(delay7).toBe(320000);

const delayMax = (connectionPoolStateManager as any).calculateNextDelay({failureCount: 7});
expect(delayMax).toBe(320000);
});
});
56 changes: 35 additions & 21 deletions packages/node-core/src/indexer/connectionPoolState.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import {getLogger} from '../logger';
import {exitWithError} from '../process';
import {errorTypeToScoreAdjustment} from './connectionPool.service';

const RETRY_DELAY = 60 * 1000;
const MAX_FAILURES = 5;
const RESPONSE_TIME_WEIGHT = 0.7;
const FAILURE_WEIGHT = 0.3;
const RATE_LIMIT_DELAY = 20 * 1000;

export interface ConnectionPoolItem<T> {
endpoint: string;
Expand Down Expand Up @@ -191,7 +191,7 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
}

//eslint-disable-next-line @typescript-eslint/require-await
async setTimeout(endpoint: string, delay: number): Promise<void> {
async setRecoverTimeout(endpoint: string, delay: number): Promise<void> {
// Make sure there is no existing timeout
await this.clearTimeout(endpoint);

Expand Down Expand Up @@ -247,35 +247,49 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
switch (errorType) {
case ApiErrorType.Connection: {
if (this.pool[endpoint].connected) {
//handleApiDisconnects was already called if this is false
//this.handleApiDisconnects(endpoint);
// The connected status does not provide service. handleApiDisconnects() will be called to handle this.
this.pool[endpoint].connected = false;
}
return;
}
case ApiErrorType.Timeout:
case ApiErrorType.RateLimit: {
// The “rateLimited” status will be selected when no endpoints are available, so we should avoid setting a large delay.
this.pool[endpoint].rateLimited = true;
break;
}
case ApiErrorType.Default: {
const nextDelay = RETRY_DELAY * Math.pow(2, this.pool[endpoint].failureCount - 1); // Exponential backoff using failure count // Start with RETRY_DELAY and double on each failure
this.pool[endpoint].backoffDelay = nextDelay;

if (ApiErrorType.Timeout || ApiErrorType.RateLimit) {
this.pool[endpoint].rateLimited = true;
} else {
this.pool[endpoint].failed = true;
}

await this.setTimeout(endpoint, nextDelay);

logger.warn(
`Endpoint ${this.pool[endpoint].endpoint} experienced an error (${errorType}). Suspending for ${
nextDelay / 1000
}s.`
);
return;
// The “failed” status does not provide service.
this.pool[endpoint].failed = true;
break;
}
default: {
throw new Error(`Unknown error type ${errorType}`);
}
}

const nextDelay = this.calculateNextDelay(this.pool[endpoint]);
this.pool[endpoint].backoffDelay = nextDelay;
await this.setRecoverTimeout(endpoint, nextDelay);

logger.warn(
`Endpoint ${this.pool[endpoint].endpoint} experienced an error (${errorType}). Suspending for ${
nextDelay / 1000
}s.`
);
yoozo marked this conversation as resolved.
Show resolved Hide resolved
}

private calculateNextDelay(poolItem: ConnectionPoolItem<T>): number {
const delayRule = [10, 20, 40, 80, 160, 320];
let delayTime = 0;
if (poolItem.failureCount < 1) {
delayTime = 0;
} else if (poolItem.failureCount >= 1 && poolItem.failureCount <= delayRule.length) {
delayTime = delayRule[poolItem.failureCount - 1];
} else {
delayTime = delayRule[delayRule.length - 1];
}
return delayTime * 1000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can use the old logic, Just change RETRY_DELAY to 10*1000

}

private calculatePerformanceScore(responseTime: number, failureCount: number): number {
Expand Down
Loading