Skip to content

Commit

Permalink
feat: improved metrics and added rate limit for the batch request fea…
Browse files Browse the repository at this point in the history
…ture in ws server (#2474) (#2487)

* feat: added shouldRateLimitOnMethod() to connectionLimiter

Signed-off-by: Logan Nguyen <[email protected]>

* feat: applied shouldRateLimitOnMethod() on batch requests

Signed-off-by: Logan Nguyen <[email protected]>

* feat: applied shouldRateLimitOnMethod() on single requests

Signed-off-by: Logan Nguyen <[email protected]>

* feat: added methodsCounter metric for batch_requests

Signed-off-by: Logan Nguyen <[email protected]>

* fix: skipped rateLimit for eth_subscribe and eth_unsubscribe

Signed-off-by: Logan Nguyen <[email protected]>

* test: added acceptancetest for rate limiter in the WS

Signed-off-by: Logan Nguyen <[email protected]>

* test: enabled test_ws_server in ratelimiter CI

Signed-off-by: Logan Nguyen <[email protected]>

---------

Signed-off-by: Logan Nguyen <[email protected]>
  • Loading branch information
quiet-node authored May 16, 2024
1 parent 1ca66cf commit 94bf628
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 19 deletions.
1 change: 1 addition & 0 deletions .github/workflows/acceptance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jobs:
uses: ./.github/workflows/acceptance-workflow.yml
with:
testfilter: ratelimiter
test_ws_server: true

tokencreate:
name: Token Create
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"acceptancetest:api_batch2": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-batch-2' --exit",
"acceptancetest:api_batch3": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-batch-3' --exit",
"acceptancetest:erc20": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@erc20' --exit",
"acceptancetest:ratelimiter": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@ratelimiter' --exit",
"acceptancetest:ratelimiter": "ts-mocha packages/ws-server/tests/acceptance/index.spec.ts -g '@web-socket-ratelimiter' --exit && ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@ratelimiter' --exit",
"acceptancetest:tokencreate": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@tokencreate' --exit",
"acceptancetest:tokenmanagement": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@tokenmanagement' --exit",
"acceptancetest:htsprecompilev1": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@htsprecompilev1' --exit",
Expand Down
1 change: 1 addition & 0 deletions packages/server/tests/localAcceptance.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ BATCH_REQUESTS_ENABLED=true
TEST_GAS_PRICE_DEVIATION=0.2
WS_NEW_HEADS_ENABLED=false
INITIAL_BALANCE='5000000000'
LIMIT_DURATION=90000
11 changes: 10 additions & 1 deletion packages/ws-server/src/controllers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import { handleEthSubsribe, handleEthUnsubscribe } from './eth_subscribe';
import { MirrorNodeClient } from '@hashgraph/json-rpc-relay/dist/lib/clients';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
import { resolveParams, validateJsonRpcRequest, verifySupportedMethod } from '../utils/utils';
import { InvalidRequest, MethodNotFound } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
import {
InvalidRequest,
MethodNotFound,
IPRateLimitExceeded,
} from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';

/**
* Handles sending requests to a Relay by calling a specified method with given parameters.
Expand Down Expand Up @@ -122,6 +126,11 @@ export const getRequestResult = async (
return jsonResp(request.id || null, new MethodNotFound(request.method), undefined);
}

// verify rate limit for method method based on IP
if (limiter.shouldRateLimitOnMethod(ctx.ip, request.method, ctx.websocket.requestId)) {
return jsonResp(null, new IPRateLimitExceeded(request.method), undefined);
}

// Validate request's params
try {
const methodValidations = Validator.METHODS[method];
Expand Down
18 changes: 18 additions & 0 deletions packages/ws-server/src/metrics/connectionLimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import { Logger } from 'pino';
import { WS_CONSTANTS } from '../utils/constants';
import { Gauge, Registry, Counter } from 'prom-client';
import { WebSocketError } from '@hashgraph/json-rpc-relay';
import RateLimit from '@hashgraph/json-rpc-server/dist/rateLimit';
import constants from '@hashgraph/json-rpc-relay/dist/lib/constants';
import { methodConfiguration } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/methodConfiguration';

type IpCounter = {
[key: string]: number;
Expand All @@ -39,6 +42,7 @@ export default class ConnectionLimiter {
private connectionLimitCounter: Counter;
private inactivityTTLCounter: Counter;
private register: Registry;
private rateLimit: RateLimit;

constructor(logger: Logger, register: Registry) {
this.logger = logger;
Expand Down Expand Up @@ -82,6 +86,11 @@ export default class ConnectionLimiter {
help: WS_CONSTANTS.connLimiter.inactivityTTLLimitMetric.help,
registers: [register],
});

const rateLimitDuration = process.env.LIMIT_DURATION
? parseInt(process.env.LIMIT_DURATION)
: constants.DEFAULT_RATE_LIMIT.DURATION;
this.rateLimit = new RateLimit(logger.child({ name: 'ip-rate-limit' }), register, rateLimitDuration);
}

public incrementCounters(ctx) {
Expand Down Expand Up @@ -199,4 +208,13 @@ export default class ConnectionLimiter {

this.startInactivityTTLTimer(websocket);
}

public shouldRateLimitOnMethod(ip, methodName, requestId) {
// subcription limits are already covered in this.validateSubscriptionLimit()
if (methodName === WS_CONSTANTS.METHODS.ETH_SUBSCRIBE || methodName === WS_CONSTANTS.METHODS.ETH_UNSUBSCRIBE)
return false;

const methodTotalLimit = methodConfiguration[methodName].total;
return this.rateLimit.shouldRateLimit(ip, methodName, methodTotalLimit, requestId);
}
}
20 changes: 18 additions & 2 deletions packages/ws-server/src/webSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dotenv.config({ path: path.resolve(__dirname, '../../../.env') });
import KoaJsonRpc from '@hashgraph/json-rpc-server/dist/koaJsonRpc';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
import { type Relay, RelayImpl, predefined, JsonRpcError } from '@hashgraph/json-rpc-relay';
import { IPRateLimitExceeded } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
import { sendToClient, handleConnectionClose, getBatchRequestsMaxSize, getWsBatchRequestsEnabled } from './utils/utils';

const mainLogger = pino({
Expand Down Expand Up @@ -66,10 +67,12 @@ app.ws.use(async (ctx) => {
const startTime = process.hrtime();

ctx.websocket.id = relay.subs()?.generateId();
ctx.websocket.requestId = uuid();

ctx.websocket.limiter = limiter;
ctx.websocket.wsMetricRegistry = wsMetricRegistry;
const connectionIdPrefix = formatIdMessage('Connection ID', ctx.websocket.id);
const requestIdPrefix = formatIdMessage('Request ID', uuid());
const requestIdPrefix = formatIdMessage('Request ID', ctx.websocket.requestId);
logger.info(
`${connectionIdPrefix} ${requestIdPrefix} New connection established. Current active connections: ${ctx.app.server._connections}`,
);
Expand Down Expand Up @@ -116,6 +119,13 @@ app.ws.use(async (ctx) => {
if (Array.isArray(request)) {
logger.trace(`${connectionIdPrefix} ${requestIdPrefix}: Receive batch request=${JSON.stringify(request)}`);

// Increment metrics for batch_requests
wsMetricRegistry.getCounter('methodsCounter').labels(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME).inc();
wsMetricRegistry
.getCounter('methodsCounterByIp')
.labels(ctx.request.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME)
.inc();

// send error if batch request feature is not enabled
if (!getWsBatchRequestsEnabled()) {
const batchRequestDisabledError = predefined.WS_BATCH_REQUESTS_DISABLED;
Expand All @@ -135,7 +145,13 @@ app.ws.use(async (ctx) => {
return;
}

// TODO: verify rate limit
// verify rate limit for batch_request method based on IP
if (limiter.shouldRateLimitOnMethod(ctx.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME, ctx.websocket.requestId)) {
ctx.websocket.send(
JSON.stringify([jsonResp(null, new IPRateLimitExceeded(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME), undefined)]),
);
return;
}

// process requests
const requestPromises = request.map((item: any) => {
Expand Down
16 changes: 4 additions & 12 deletions packages/ws-server/tests/acceptance/batchRequest.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { WsTestConstant, WsTestHelper } from '../helper';
import { predefined } from '@hashgraph/json-rpc-relay/src';

describe('@web-socket-batch-1 Batch Requests', async function () {
const METHOD_NAME = 'batch_request';
let ethersWsProvider: WebSocketProvider;
let batchRequests: any = [];

Expand Down Expand Up @@ -80,10 +81,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () {

it(`Should submit batch requests to WS server using Standard Web Socket and retrieve batch responses`, async () => {
// call batch request
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(
WsTestConstant.BATCH_REQUEST_METHOD_NAME,
batchRequests,
);
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests);

// individually process each request
let promises: any = [];
Expand All @@ -97,10 +95,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () {

it('Should submit batch requests to WS server and get batchRequestDisabledError if WS_BATCH_REQUESTS_DISABLED=false ', async () => {
process.env.WS_BATCH_REQUESTS_ENABLED = 'false';
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(
WsTestConstant.BATCH_REQUEST_METHOD_NAME,
batchRequests,
);
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests);

const expectedError = predefined.WS_BATCH_REQUESTS_DISABLED;
delete expectedError.data;
Expand All @@ -111,10 +106,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () {
it('Should submit batch requests to WS server and get batchRequestAmountMaxExceed if requests size exceeds WS_BATCH_REQUESTS_MAX_SIZE', async () => {
process.env.WS_BATCH_REQUESTS_MAX_SIZE = '1';

const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(
WsTestConstant.BATCH_REQUEST_METHOD_NAME,
batchRequests,
);
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests);

const expectedError = predefined.BATCH_REQUESTS_AMOUNT_MAX_EXCEEDED(
batchRequests.length,
Expand Down
113 changes: 113 additions & 0 deletions packages/ws-server/tests/acceptance/rateLimiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*-
*
* Hedera JSON RPC Relay
*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// external resources
import { expect } from 'chai';
import { WsTestHelper } from '../helper';
import relayConstants from '@hashgraph/json-rpc-relay/src/lib/constants';
import { AliasAccount } from '@hashgraph/json-rpc-server/tests/types/AliasAccount';
import { IPRateLimitExceeded } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';

describe('@web-socket-ratelimiter Rate Limit Tests', async function () {
const rateLimitTier1 = Number(process.env.TIER_1_RATE_LIMIT || relayConstants.DEFAULT_RATE_LIMIT.TIER_1);
const rateLimitTier2 = Number(process.env.TIER_2_RATE_LIMIT || relayConstants.DEFAULT_RATE_LIMIT.TIER_2);
const limitDuration = Number(process.env.LIMIT_DURATION) || relayConstants.DEFAULT_RATE_LIMIT.DURATION;

const batchRequests = [
{
id: 1,
jsonrpc: '2.0',
method: 'eth_chainId',
params: [],
},
{
id: 1,
jsonrpc: '2.0',
method: 'eth_blockNumber',
params: [],
},
];

after(async () => {
// expect all the connections to the WS server to be closed after all
expect(global.socketServer._connections).to.eq(0);
});

it(`Should submit single requests to WS server and receive IPRateLimitExceeded error until rate limit is reached`, async () => {
const SINGLE_REQUEST_METHOD_NAME = 'eth_gasPrice';
for (let i = 0; i < rateLimitTier2; i++) {
await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, []);
}

// exceed rate limit
const response = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, []);
const ipRateLimitError = new IPRateLimitExceeded(SINGLE_REQUEST_METHOD_NAME);
expect(response.error.code).to.deep.eq(ipRateLimitError.code);
expect(response.error.message).to.deep.eq(ipRateLimitError.message);

// wait until rate limit is reset
await new Promise((r) => setTimeout(r, limitDuration));
});

it(`Should submit batch requests to WS server and receive IPRateLimitExceeded error until rate limit is reached`, async () => {
const BATCH_REQUEST_METHOD_NAME = 'batch_request';

// call batch request multitime to reach limit
for (let i = 0; i < rateLimitTier1; i++) {
await WsTestHelper.sendRequestToStandardWebSocket(BATCH_REQUEST_METHOD_NAME, batchRequests);
}

// exceed rate limit
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(BATCH_REQUEST_METHOD_NAME, batchRequests);
const ipRateLimitError = new IPRateLimitExceeded(BATCH_REQUEST_METHOD_NAME);

expect(batchResponses[0].error.code).to.deep.eq(ipRateLimitError.code);
expect(batchResponses[0].error.message).to.deep.eq(ipRateLimitError.message);

// wait until rate limit is reset
await new Promise((r) => setTimeout(r, limitDuration));
});

it(`Should reset limit for requests`, async () => {
const SINGLE_REQUEST_METHOD_NAME = 'eth_getBalance';
const account: AliasAccount = global.accounts[0];

for (let i = 0; i < rateLimitTier2; i++) {
await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [account.address, 'latest']);
}
// exceed rate limit
const rateLimitResponse = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [
account.address,
'latest',
]);
const ipRateLimitError = new IPRateLimitExceeded(SINGLE_REQUEST_METHOD_NAME);
expect(rateLimitResponse.error.code).to.deep.eq(ipRateLimitError.code);
expect(rateLimitResponse.error.message).to.deep.eq(ipRateLimitError.message);

// wait until rate limit is reset
await new Promise((r) => setTimeout(r, limitDuration));
const response = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [
account.address,
'latest',
]);
const expectedResult = await global.relay.call(SINGLE_REQUEST_METHOD_NAME, [account.address, 'latest']);
expect(response.result).to.eq(expectedResult);
});
});
6 changes: 3 additions & 3 deletions packages/ws-server/tests/helper/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ export class WsTestHelper {
}
}

static async sendRequestToStandardWebSocket(method: string, params: any, ms?: number | undefined) {
static async sendRequestToStandardWebSocket(method: string, params: any[], ms?: number | undefined) {
const BATCH_REQUEST_METHOD_NAME = 'batch_request';
const webSocket = new WebSocket(WsTestConstant.WS_RELAY_URL);

let response: any;

if (method === WsTestConstant.BATCH_REQUEST_METHOD_NAME) {
if (method === BATCH_REQUEST_METHOD_NAME) {
webSocket.on('open', () => {
webSocket.send(JSON.stringify(params));
});
Expand Down Expand Up @@ -94,5 +95,4 @@ export class WsTestConstant {
static STANDARD_WEB_SOCKET = 'Standard Web Socket';
static ETHERS_WS_PROVIDER = 'Ethers Web Socket Provider';
static WS_RELAY_URL = process.env.WS_RELAY_URL || `ws://127.0.0.1:8546`;
static BATCH_REQUEST_METHOD_NAME: 'batch_request';
}

0 comments on commit 94bf628

Please sign in to comment.