diff --git a/.github/workflows/acceptance.yml b/.github/workflows/acceptance.yml index 115846460a..4b753d3f9b 100644 --- a/.github/workflows/acceptance.yml +++ b/.github/workflows/acceptance.yml @@ -47,6 +47,7 @@ jobs: uses: ./.github/workflows/acceptance-workflow.yml with: testfilter: ratelimiter + test_ws_server: true tokencreate: name: Token Create diff --git a/package.json b/package.json index c29f940d1c..0876d6aeb4 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "acceptancetest:api_batch2": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-batch-2' --exit", "acceptancetest:api_batch3": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-batch-3' --exit", "acceptancetest:erc20": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@erc20' --exit", - "acceptancetest:ratelimiter": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@ratelimiter' --exit", + "acceptancetest:ratelimiter": "ts-mocha packages/ws-server/tests/acceptance/index.spec.ts -g '@web-socket-ratelimiter' --exit && ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@ratelimiter' --exit", "acceptancetest:tokencreate": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@tokencreate' --exit", "acceptancetest:tokenmanagement": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@tokenmanagement' --exit", "acceptancetest:htsprecompilev1": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@htsprecompilev1' --exit", diff --git a/packages/server/tests/localAcceptance.env b/packages/server/tests/localAcceptance.env index 7e08f63b7b..546464ccea 100644 --- a/packages/server/tests/localAcceptance.env +++ b/packages/server/tests/localAcceptance.env @@ -22,3 +22,4 @@ BATCH_REQUESTS_ENABLED=true TEST_GAS_PRICE_DEVIATION=0.2 WS_NEW_HEADS_ENABLED=false INITIAL_BALANCE='5000000000' +LIMIT_DURATION=90000 diff --git a/packages/ws-server/src/controllers/index.ts b/packages/ws-server/src/controllers/index.ts index f89d9837f9..a0f20f31c3 100644 --- a/packages/ws-server/src/controllers/index.ts +++ b/packages/ws-server/src/controllers/index.ts @@ -27,7 +27,11 @@ import { handleEthSubsribe, handleEthUnsubscribe } from './eth_subscribe'; import { MirrorNodeClient } from '@hashgraph/json-rpc-relay/dist/lib/clients'; import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse'; import { resolveParams, validateJsonRpcRequest, verifySupportedMethod } from '../utils/utils'; -import { InvalidRequest, MethodNotFound } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError'; +import { + InvalidRequest, + MethodNotFound, + IPRateLimitExceeded, +} from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError'; /** * Handles sending requests to a Relay by calling a specified method with given parameters. @@ -122,6 +126,11 @@ export const getRequestResult = async ( return jsonResp(request.id || null, new MethodNotFound(request.method), undefined); } + // verify rate limit for method method based on IP + if (limiter.shouldRateLimitOnMethod(ctx.ip, request.method, ctx.websocket.requestId)) { + return jsonResp(null, new IPRateLimitExceeded(request.method), undefined); + } + // Validate request's params try { const methodValidations = Validator.METHODS[method]; diff --git a/packages/ws-server/src/metrics/connectionLimiter.ts b/packages/ws-server/src/metrics/connectionLimiter.ts index 5bc7c541af..21c4f64e18 100644 --- a/packages/ws-server/src/metrics/connectionLimiter.ts +++ b/packages/ws-server/src/metrics/connectionLimiter.ts @@ -22,6 +22,9 @@ import { Logger } from 'pino'; import { WS_CONSTANTS } from '../utils/constants'; import { Gauge, Registry, Counter } from 'prom-client'; import { WebSocketError } from '@hashgraph/json-rpc-relay'; +import RateLimit from '@hashgraph/json-rpc-server/dist/rateLimit'; +import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; +import { methodConfiguration } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/methodConfiguration'; type IpCounter = { [key: string]: number; @@ -39,6 +42,7 @@ export default class ConnectionLimiter { private connectionLimitCounter: Counter; private inactivityTTLCounter: Counter; private register: Registry; + private rateLimit: RateLimit; constructor(logger: Logger, register: Registry) { this.logger = logger; @@ -82,6 +86,11 @@ export default class ConnectionLimiter { help: WS_CONSTANTS.connLimiter.inactivityTTLLimitMetric.help, registers: [register], }); + + const rateLimitDuration = process.env.LIMIT_DURATION + ? parseInt(process.env.LIMIT_DURATION) + : constants.DEFAULT_RATE_LIMIT.DURATION; + this.rateLimit = new RateLimit(logger.child({ name: 'ip-rate-limit' }), register, rateLimitDuration); } public incrementCounters(ctx) { @@ -199,4 +208,13 @@ export default class ConnectionLimiter { this.startInactivityTTLTimer(websocket); } + + public shouldRateLimitOnMethod(ip, methodName, requestId) { + // subcription limits are already covered in this.validateSubscriptionLimit() + if (methodName === WS_CONSTANTS.METHODS.ETH_SUBSCRIBE || methodName === WS_CONSTANTS.METHODS.ETH_UNSUBSCRIBE) + return false; + + const methodTotalLimit = methodConfiguration[methodName].total; + return this.rateLimit.shouldRateLimit(ip, methodName, methodTotalLimit, requestId); + } } diff --git a/packages/ws-server/src/webSocketServer.ts b/packages/ws-server/src/webSocketServer.ts index ed70f7c48c..aab145c036 100644 --- a/packages/ws-server/src/webSocketServer.ts +++ b/packages/ws-server/src/webSocketServer.ts @@ -34,6 +34,7 @@ dotenv.config({ path: path.resolve(__dirname, '../../../.env') }); import KoaJsonRpc from '@hashgraph/json-rpc-server/dist/koaJsonRpc'; import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse'; import { type Relay, RelayImpl, predefined, JsonRpcError } from '@hashgraph/json-rpc-relay'; +import { IPRateLimitExceeded } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError'; import { sendToClient, handleConnectionClose, getBatchRequestsMaxSize, getWsBatchRequestsEnabled } from './utils/utils'; const mainLogger = pino({ @@ -66,10 +67,12 @@ app.ws.use(async (ctx) => { const startTime = process.hrtime(); ctx.websocket.id = relay.subs()?.generateId(); + ctx.websocket.requestId = uuid(); + ctx.websocket.limiter = limiter; ctx.websocket.wsMetricRegistry = wsMetricRegistry; const connectionIdPrefix = formatIdMessage('Connection ID', ctx.websocket.id); - const requestIdPrefix = formatIdMessage('Request ID', uuid()); + const requestIdPrefix = formatIdMessage('Request ID', ctx.websocket.requestId); logger.info( `${connectionIdPrefix} ${requestIdPrefix} New connection established. Current active connections: ${ctx.app.server._connections}`, ); @@ -116,6 +119,13 @@ app.ws.use(async (ctx) => { if (Array.isArray(request)) { logger.trace(`${connectionIdPrefix} ${requestIdPrefix}: Receive batch request=${JSON.stringify(request)}`); + // Increment metrics for batch_requests + wsMetricRegistry.getCounter('methodsCounter').labels(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME).inc(); + wsMetricRegistry + .getCounter('methodsCounterByIp') + .labels(ctx.request.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME) + .inc(); + // send error if batch request feature is not enabled if (!getWsBatchRequestsEnabled()) { const batchRequestDisabledError = predefined.WS_BATCH_REQUESTS_DISABLED; @@ -135,7 +145,13 @@ app.ws.use(async (ctx) => { return; } - // TODO: verify rate limit + // verify rate limit for batch_request method based on IP + if (limiter.shouldRateLimitOnMethod(ctx.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME, ctx.websocket.requestId)) { + ctx.websocket.send( + JSON.stringify([jsonResp(null, new IPRateLimitExceeded(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME), undefined)]), + ); + return; + } // process requests const requestPromises = request.map((item: any) => { diff --git a/packages/ws-server/tests/acceptance/batchRequest.spec.ts b/packages/ws-server/tests/acceptance/batchRequest.spec.ts index 542ce1ad16..f1b3f09610 100644 --- a/packages/ws-server/tests/acceptance/batchRequest.spec.ts +++ b/packages/ws-server/tests/acceptance/batchRequest.spec.ts @@ -25,6 +25,7 @@ import { WsTestConstant, WsTestHelper } from '../helper'; import { predefined } from '@hashgraph/json-rpc-relay/src'; describe('@web-socket-batch-1 Batch Requests', async function () { + const METHOD_NAME = 'batch_request'; let ethersWsProvider: WebSocketProvider; let batchRequests: any = []; @@ -80,10 +81,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () { it(`Should submit batch requests to WS server using Standard Web Socket and retrieve batch responses`, async () => { // call batch request - const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket( - WsTestConstant.BATCH_REQUEST_METHOD_NAME, - batchRequests, - ); + const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests); // individually process each request let promises: any = []; @@ -97,10 +95,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () { it('Should submit batch requests to WS server and get batchRequestDisabledError if WS_BATCH_REQUESTS_DISABLED=false ', async () => { process.env.WS_BATCH_REQUESTS_ENABLED = 'false'; - const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket( - WsTestConstant.BATCH_REQUEST_METHOD_NAME, - batchRequests, - ); + const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests); const expectedError = predefined.WS_BATCH_REQUESTS_DISABLED; delete expectedError.data; @@ -111,10 +106,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () { it('Should submit batch requests to WS server and get batchRequestAmountMaxExceed if requests size exceeds WS_BATCH_REQUESTS_MAX_SIZE', async () => { process.env.WS_BATCH_REQUESTS_MAX_SIZE = '1'; - const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket( - WsTestConstant.BATCH_REQUEST_METHOD_NAME, - batchRequests, - ); + const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests); const expectedError = predefined.BATCH_REQUESTS_AMOUNT_MAX_EXCEEDED( batchRequests.length, diff --git a/packages/ws-server/tests/acceptance/rateLimiter.spec.ts b/packages/ws-server/tests/acceptance/rateLimiter.spec.ts new file mode 100644 index 0000000000..361fea9fce --- /dev/null +++ b/packages/ws-server/tests/acceptance/rateLimiter.spec.ts @@ -0,0 +1,113 @@ +/*- + * + * Hedera JSON RPC Relay + * + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// external resources +import { expect } from 'chai'; +import { WsTestHelper } from '../helper'; +import relayConstants from '@hashgraph/json-rpc-relay/src/lib/constants'; +import { AliasAccount } from '@hashgraph/json-rpc-server/tests/types/AliasAccount'; +import { IPRateLimitExceeded } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError'; + +describe('@web-socket-ratelimiter Rate Limit Tests', async function () { + const rateLimitTier1 = Number(process.env.TIER_1_RATE_LIMIT || relayConstants.DEFAULT_RATE_LIMIT.TIER_1); + const rateLimitTier2 = Number(process.env.TIER_2_RATE_LIMIT || relayConstants.DEFAULT_RATE_LIMIT.TIER_2); + const limitDuration = Number(process.env.LIMIT_DURATION) || relayConstants.DEFAULT_RATE_LIMIT.DURATION; + + const batchRequests = [ + { + id: 1, + jsonrpc: '2.0', + method: 'eth_chainId', + params: [], + }, + { + id: 1, + jsonrpc: '2.0', + method: 'eth_blockNumber', + params: [], + }, + ]; + + after(async () => { + // expect all the connections to the WS server to be closed after all + expect(global.socketServer._connections).to.eq(0); + }); + + it(`Should submit single requests to WS server and receive IPRateLimitExceeded error until rate limit is reached`, async () => { + const SINGLE_REQUEST_METHOD_NAME = 'eth_gasPrice'; + for (let i = 0; i < rateLimitTier2; i++) { + await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, []); + } + + // exceed rate limit + const response = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, []); + const ipRateLimitError = new IPRateLimitExceeded(SINGLE_REQUEST_METHOD_NAME); + expect(response.error.code).to.deep.eq(ipRateLimitError.code); + expect(response.error.message).to.deep.eq(ipRateLimitError.message); + + // wait until rate limit is reset + await new Promise((r) => setTimeout(r, limitDuration)); + }); + + it(`Should submit batch requests to WS server and receive IPRateLimitExceeded error until rate limit is reached`, async () => { + const BATCH_REQUEST_METHOD_NAME = 'batch_request'; + + // call batch request multitime to reach limit + for (let i = 0; i < rateLimitTier1; i++) { + await WsTestHelper.sendRequestToStandardWebSocket(BATCH_REQUEST_METHOD_NAME, batchRequests); + } + + // exceed rate limit + const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(BATCH_REQUEST_METHOD_NAME, batchRequests); + const ipRateLimitError = new IPRateLimitExceeded(BATCH_REQUEST_METHOD_NAME); + + expect(batchResponses[0].error.code).to.deep.eq(ipRateLimitError.code); + expect(batchResponses[0].error.message).to.deep.eq(ipRateLimitError.message); + + // wait until rate limit is reset + await new Promise((r) => setTimeout(r, limitDuration)); + }); + + it(`Should reset limit for requests`, async () => { + const SINGLE_REQUEST_METHOD_NAME = 'eth_getBalance'; + const account: AliasAccount = global.accounts[0]; + + for (let i = 0; i < rateLimitTier2; i++) { + await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [account.address, 'latest']); + } + // exceed rate limit + const rateLimitResponse = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [ + account.address, + 'latest', + ]); + const ipRateLimitError = new IPRateLimitExceeded(SINGLE_REQUEST_METHOD_NAME); + expect(rateLimitResponse.error.code).to.deep.eq(ipRateLimitError.code); + expect(rateLimitResponse.error.message).to.deep.eq(ipRateLimitError.message); + + // wait until rate limit is reset + await new Promise((r) => setTimeout(r, limitDuration)); + const response = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [ + account.address, + 'latest', + ]); + const expectedResult = await global.relay.call(SINGLE_REQUEST_METHOD_NAME, [account.address, 'latest']); + expect(response.result).to.eq(expectedResult); + }); +}); diff --git a/packages/ws-server/tests/helper/index.ts b/packages/ws-server/tests/helper/index.ts index 0148511115..42395a6c40 100644 --- a/packages/ws-server/tests/helper/index.ts +++ b/packages/ws-server/tests/helper/index.ts @@ -38,12 +38,13 @@ export class WsTestHelper { } } - static async sendRequestToStandardWebSocket(method: string, params: any, ms?: number | undefined) { + static async sendRequestToStandardWebSocket(method: string, params: any[], ms?: number | undefined) { + const BATCH_REQUEST_METHOD_NAME = 'batch_request'; const webSocket = new WebSocket(WsTestConstant.WS_RELAY_URL); let response: any; - if (method === WsTestConstant.BATCH_REQUEST_METHOD_NAME) { + if (method === BATCH_REQUEST_METHOD_NAME) { webSocket.on('open', () => { webSocket.send(JSON.stringify(params)); }); @@ -94,5 +95,4 @@ export class WsTestConstant { static STANDARD_WEB_SOCKET = 'Standard Web Socket'; static ETHERS_WS_PROVIDER = 'Ethers Web Socket Provider'; static WS_RELAY_URL = process.env.WS_RELAY_URL || `ws://127.0.0.1:8546`; - static BATCH_REQUEST_METHOD_NAME: 'batch_request'; }