diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index d6964f56d8..b98269a06c 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- When configuring multiple endpoints, poor network conditions may lead to block crawling delays. (#2572) + ## [14.1.7] - 2024-10-30 ### Changed - Bump `@subql/common` dependency diff --git a/packages/node-core/src/indexer/connectionPool.service.spec.ts b/packages/node-core/src/indexer/connectionPool.service.spec.ts index adfd6a1b64..be89dccbf8 100644 --- a/packages/node-core/src/indexer/connectionPool.service.spec.ts +++ b/packages/node-core/src/indexer/connectionPool.service.spec.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {delay} from '@subql/common'; -import {ApiErrorType, ConnectionPoolStateManager, IApiConnectionSpecific, NodeConfig} from '..'; +import {ApiErrorType, ConnectionPoolStateManager, getLogger, IApiConnectionSpecific, NodeConfig} from '..'; import {ConnectionPoolService} from './connectionPool.service'; async function waitFor(conditionFn: () => boolean, timeout = 50000, interval = 100): Promise { @@ -127,4 +127,30 @@ describe('ConnectionPoolService', () => { expect(handleApiDisconnectsSpy).toHaveBeenCalledTimes(1); }, 15000); }); + + describe('Rate limit endpoint delay', () => { + it('call delay', async () => { + const logger = getLogger('connection-pool'); + const consoleSpy = jest.spyOn(logger, 'info'); + + await connectionPoolService.addToConnections(mockApiConnection, TEST_URL); + await connectionPoolService.addToConnections(mockApiConnection, `${TEST_URL}/2`); + await connectionPoolService.handleApiError(TEST_URL, { + name: 'timeout', + errorType: ApiErrorType.Timeout, + message: 'timeout error', + }); + await connectionPoolService.handleApiError(`${TEST_URL}/2`, { + name: 'DefaultError', + errorType: ApiErrorType.Default, + message: 'Default error', + }); + await (connectionPoolService as any).flushResultCache(); + + await connectionPoolService.api.fetchBlocks([34365]); + + expect(consoleSpy).toHaveBeenCalledWith('throtling on ratelimited endpoint 10s'); + consoleSpy.mockRestore(); + }, 30000); + }); }); diff --git a/packages/node-core/src/indexer/connectionPool.service.ts b/packages/node-core/src/indexer/connectionPool.service.ts index dacde9be3f..d9eebae592 100644 --- a/packages/node-core/src/indexer/connectionPool.service.ts +++ b/packages/node-core/src/indexer/connectionPool.service.ts @@ -104,9 +104,8 @@ export class ConnectionPoolService; @@ -12,62 +13,52 @@ describe('ConnectionPoolStateManager', function () { connectionPoolStateManager = new ConnectionPoolStateManager(); }); + afterEach(async function () { + await connectionPoolStateManager.onApplicationShutdown(); + }); + it('chooses primary endpoint first', async function () { - (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT1] = { - primary: true, - performanceScore: 100, - failureCount: 0, - endpoint: '', - backoffDelay: 0, - rateLimited: false, - failed: false, - connected: true, - lastRequestTime: 0, - }; - - (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] = { - primary: false, - performanceScore: 100, - failureCount: 0, - endpoint: '', - backoffDelay: 0, - rateLimited: false, - failed: false, - connected: true, - lastRequestTime: 0, - }; + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, true); + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false); expect(await connectionPoolStateManager.getNextConnectedEndpoint()).toEqual(EXAMPLE_ENDPOINT1); }); it('does not choose primary endpoint if failed', async function () { - (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT1] = { - primary: true, - performanceScore: 100, - failureCount: 0, - endpoint: '', - backoffDelay: 0, - rateLimited: false, - failed: false, - connected: false, - lastRequestTime: 0, - }; - - (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] = { - primary: false, - performanceScore: 100, - failureCount: 0, - endpoint: '', - backoffDelay: 0, - rateLimited: false, - failed: false, - connected: true, - lastRequestTime: 0, - }; + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, true); + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false); + + await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default); expect(await connectionPoolStateManager.getNextConnectedEndpoint()).toEqual(EXAMPLE_ENDPOINT2); }); + it('All endpoints backoff; select a rateLimited endpoint. reason: ApiErrorType.Timeout', async function () { + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, false); + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false); + + await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default); + await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT2, ApiErrorType.Timeout); + + const nextEndpoint = await connectionPoolStateManager.getNextConnectedEndpoint(); + const endpointInfo = (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] as ConnectionPoolItem; + expect(nextEndpoint).toEqual(EXAMPLE_ENDPOINT2); + expect(endpointInfo.rateLimited).toBe(true); + }); + + it('All endpoints backoff; select a rateLimited endpoint. reason: ApiErrorType.RateLimit', async function () { + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, false); + await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false); + + await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default); + await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT2, ApiErrorType.RateLimit); + + const nextEndpoint = await connectionPoolStateManager.getNextConnectedEndpoint(); + const endpointInfo = (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] as ConnectionPoolItem; + expect(nextEndpoint).toEqual(EXAMPLE_ENDPOINT2); + expect(endpointInfo.rateLimited).toBe(true); + }); + it('can calculate performance score for response time of zero', function () { const score = (connectionPoolStateManager as any).calculatePerformanceScore(0, 0); expect(score).not.toBeNaN(); diff --git a/packages/node-core/src/indexer/connectionPoolState.manager.ts b/packages/node-core/src/indexer/connectionPoolState.manager.ts index 8ff1a62885..dd170eb068 100644 --- a/packages/node-core/src/indexer/connectionPoolState.manager.ts +++ b/packages/node-core/src/indexer/connectionPoolState.manager.ts @@ -10,7 +10,8 @@ import {getLogger} from '../logger'; import {exitWithError} from '../process'; import {errorTypeToScoreAdjustment} from './connectionPool.service'; -const RETRY_DELAY = 60 * 1000; +const RETRY_DELAY = 10 * 1000; +const MAX_RETRY_DELAY = 32 * RETRY_DELAY; const MAX_FAILURES = 5; const RESPONSE_TIME_WEIGHT = 0.7; const FAILURE_WEIGHT = 0.3; @@ -191,13 +192,13 @@ export class ConnectionPoolStateManager { + async setRecoverTimeout(endpoint: string, delay: number): Promise { // Make sure there is no existing timeout await this.clearTimeout(endpoint); this.pool[endpoint].timeoutId = setTimeout(() => { this.pool[endpoint].backoffDelay = 0; // Reset backoff delay only if there are no consecutive errors - this.pool[endpoint].rateLimited = false; + // this.pool[endpoint].rateLimited = false; // Do not reset rateLimited status this.pool[endpoint].failed = false; this.pool[endpoint].timeoutId = undefined; // Clear the timeout ID @@ -247,35 +248,41 @@ export class ConnectionPoolStateManager): number { + // Exponential backoff using failure count, Start with RETRY_DELAY and double on each failure, MAX_RETRY_DELAY is the maximum delay + return Math.min(RETRY_DELAY * Math.pow(2, poolItem.failureCount - 1), MAX_RETRY_DELAY); } private calculatePerformanceScore(responseTime: number, failureCount: number): number {