Skip to content

Commit

Permalink
Fix conn pool workers (#2154)
Browse files Browse the repository at this point in the history
* Only consider connected endpoints in workers when selecting endpoint

* Update changelog

* Update changelog
  • Loading branch information
stwiname authored Nov 10, 2023
1 parent ea0bc3b commit 777d144
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
3 changes: 3 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- When using store get methods the `save` function would be missing

### Fixed
- Workers selecting apis for endpoints that aren't connected (#2154)

## [6.3.0] - 2023-11-06
### Added
- Add `dictionaryQuerySize` to nodeConfig, so the block range in dictionary can be configurable. (#2139)
Expand Down
8 changes: 6 additions & 2 deletions packages/node-core/src/indexer/connectionPool.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import {isMainThread} from 'node:worker_threads';
import {OnApplicationShutdown, Injectable} from '@nestjs/common';
import {Interval} from '@nestjs/schedule';
Expand Down Expand Up @@ -74,10 +75,11 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
}

private async updateNextConnectedApiIndex(): Promise<void> {
this.cachedEndpoint = await this.poolStateManager.getNextConnectedEndpoint();
if (this.allApi[this.cachedEndpoint as string] === null) {
const newEndpoint = await this.poolStateManager.getNextConnectedEndpoint(Object.keys(this.allApi));
if (!!newEndpoint && this.allApi[newEndpoint] === null) {
return this.updateNextConnectedApiIndex();
}
this.cachedEndpoint = newEndpoint;
}

get api(): T {
Expand All @@ -90,6 +92,8 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
}
const api = this.allApi[endpoint];

assert(api, `Api for endpoint ${endpoint} not found`);

const wrappedApi = new Proxy(api, {
get: (target, prop, receiver) => {
if (prop === 'fetchBlocks') {
Expand Down
21 changes: 16 additions & 5 deletions packages/node-core/src/indexer/connectionPoolState.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {OnApplicationShutdown} from '@nestjs/common';
import {Interval} from '@nestjs/schedule';
import chalk from 'chalk';
import {ApiErrorType} from '../api.connection.error';
import {IApiConnectionSpecific} from '../api.service';
Expand All @@ -26,11 +27,12 @@ export interface ConnectionPoolItem<T> {
timeoutId?: NodeJS.Timeout;
}

const logger = getLogger('connection-pool-state');
const logger = getLogger('ConnectionPoolState');

export interface IConnectionPoolStateManager<T extends IApiConnectionSpecific<any, any, any>> {
addToConnections(endpoint: string, primary: boolean, initFailed: boolean): Promise<void>;
getNextConnectedEndpoint(): Promise<string | undefined>;
// Connected endpoints allows reducing the endpoints to ones connected in the worker
getNextConnectedEndpoint(connectedEndpoints?: string[]): Promise<string | undefined>;
// Async to be compatible with workers
getFieldValue<K extends keyof ConnectionPoolItem<T>>(endpoint: string, field: K): Promise<ConnectionPoolItem<T>[K]>;
// Async to be compatible with workers
Expand Down Expand Up @@ -71,20 +73,29 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
}
}

@Interval(15000)
logConnectionStatus() {
logger.debug(JSON.stringify(this.pool, null, 2));
}

//eslint-disable-next-line @typescript-eslint/require-await
async getNextConnectedEndpoint(): Promise<string | undefined> {
async getNextConnectedEndpoint(connectedEndpoints?: string[]): Promise<string | undefined> {
const primaryendpoint = this.getPrimaryEndpoint();
if (primaryendpoint !== undefined) {
return primaryendpoint;
}

const endpoints = Object.keys(this.pool).filter(
const availableEndpoints = Object.keys(this.pool).filter(
(endpoint) => !connectedEndpoints || connectedEndpoints.includes(endpoint)
);

const endpoints = availableEndpoints.filter(
(endpoint) => !this.pool[endpoint].backoffDelay && this.pool[endpoint].connected && !this.pool[endpoint].failed
);

if (endpoints.length === 0) {
// If all endpoints are suspended, try to find a rate-limited one
const rateLimitedEndpoints = Object.keys(this.pool).filter(
const rateLimitedEndpoints = availableEndpoints.filter(
(endpoint) => this.pool[endpoint].backoffDelay && this.pool[endpoint].rateLimited
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from '../connectionPoolState.manager';

export type HostConnectionPoolState<T> = {
hostGetNextConnectedEndpoint: () => Promise<string | undefined>;
hostGetNextConnectedEndpoint: (connectedEndpoints?: string[]) => Promise<string | undefined>;
hostAddToConnections: (endpoint: string, primary: boolean) => Promise<void>;
hostGetFieldFromConnectionPoolItem: <K extends keyof ConnectionPoolItem<T>>(
endpoint: string,
Expand Down Expand Up @@ -68,8 +68,8 @@ export class WorkerConnectionPoolStateManager<T extends IApiConnectionSpecific>
}
}

async getNextConnectedEndpoint(): Promise<string | undefined> {
return this.host.hostGetNextConnectedEndpoint();
async getNextConnectedEndpoint(connectedEndpoints?: string[]): Promise<string | undefined> {
return this.host.hostGetNextConnectedEndpoint(connectedEndpoints);
}

async addToConnections(endpoint: string, primary = false): Promise<void> {
Expand Down

0 comments on commit 777d144

Please sign in to comment.