From bc8bf582d3337fb6025c876d323e13b6b71d55c3 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 28 Nov 2023 17:02:50 +0100 Subject: [PATCH 1/4] pre-check data size --- OpenFlow/src/amqpwrapper.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/OpenFlow/src/amqpwrapper.ts b/OpenFlow/src/amqpwrapper.ts index 681fe00f..63867e88 100644 --- a/OpenFlow/src/amqpwrapper.ts +++ b/OpenFlow/src/amqpwrapper.ts @@ -430,6 +430,7 @@ export class amqpwrapper extends events.EventEmitter { } async sendWithReplyTo(exchange: string, queue: string, replyTo: string, data: any, expiration: number, correlationId: string, routingkey: string, span: Span, priority: number = 1): Promise { await amqpwrapper.asyncWaitFor(() => this.connected); + if(data) if (this.channel == null || this.conn == null) { throw new Error("Cannot send message, when not connected"); } @@ -447,6 +448,11 @@ export class amqpwrapper extends events.EventEmitter { if (typeof data !== 'string' && !(data instanceof String)) { data = JSON.stringify(data); } + // PRECONDITION_FAILED - message size 155339741 is larger than configured max size 134217728 + if(data.length > 130000000 ) { + Logger.instanse.error("send to queue: " + queue + " exchange: " + exchange + " PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000", span); + throw new Error("PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000") + } Logger.instanse.silly("send to queue: " + queue + " exchange: " + exchange + " with reply to " + replyTo + " correlationId: " + correlationId, span); const options: any = { mandatory: true }; options.replyTo = replyTo; @@ -491,6 +497,10 @@ export class amqpwrapper extends events.EventEmitter { if (typeof data !== 'string' && !(data instanceof String)) { data = JSON.stringify(data); } + if(data.length > 130000000 ) { + Logger.instanse.error("send to queue: " + queue + " exchange: " + exchange + " PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000", span); + throw new Error("PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000") + } if (NoderedUtil.IsNullEmpty(correlationId)) correlationId = NoderedUtil.GetUniqueIdentifier(); if (exchange != "openflow_logs") Logger.instanse.silly("send to queue: " + queue + " exchange: " + exchange, span); const options: any = { mandatory: true }; From 774a90ecb53551c5881ffd1b108a480f47e8e722 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 28 Nov 2023 17:03:13 +0100 Subject: [PATCH 2/4] bump --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 73ba4c6e..96e4c539 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.5", + "version": "1.5.6", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 8a2460e00ade926a6f02f383eb5f4bb197d89b42 Mon Sep 17 00:00:00 2001 From: skadefro Date: Wed, 29 Nov 2023 16:57:01 +0100 Subject: [PATCH 3/4] fix value type parsing --- OpenFlow/src/Config.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/Config.ts b/OpenFlow/src/Config.ts index 24c0ce6f..23a4b964 100644 --- a/OpenFlow/src/Config.ts +++ b/OpenFlow/src/Config.ts @@ -110,7 +110,7 @@ export class dbConfig extends Base { const key = keys[i]; if(key == "_version") continue; if(key == "disable_db_config") continue; - const value = conf[key]; + let value = conf[key]; try { if(key.startsWith("_")) continue; // if(NoderedUtil.IsNullEmpty(value)) continue; @@ -125,9 +125,20 @@ export class dbConfig extends Base { } } if (Object.prototype.hasOwnProperty.call(Config, key)) { + let _default:any = Config.default_config[key]; // envorinment variable + if(typeof Config[key] === "boolean") { + value = Config.parseBoolean(value); + } else if(typeof Config[key] === "number") { + value = parseInt(value); + } else if(Array.isArray(Config[key])) { + value = Config.parseArray(value); + } else if(typeof Config[key] === "string") { + value = value; + } else { + continue; + } Config[key] = value; - let _default:any = Config.default_config[key]; // envorinment variable if(_default == null) _default = ""; let _env:any = process.env[key]; // db value if(_env != null && _env != "") { @@ -271,6 +282,7 @@ export class Config { agent_HTTPS_PROXY: "", agent_NO_PROXY: "", agent_NPM_REGISTRY: "", + agent_NPM_TOKEN: "", stripe_api_key: "", stripe_api_secret: "", From 297684779c4db67118635420267f0b8e52e6c925 Mon Sep 17 00:00:00 2001 From: skadefro Date: Wed, 29 Nov 2023 16:57:22 +0100 Subject: [PATCH 4/4] Add dynamic updates of rate limiter --- OpenFlow/src/Messages/Message.ts | 12 +++++++++++- OpenFlow/src/WebServer.ts | 8 ++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index 2f85866d..aa43a185 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -21,6 +21,7 @@ import { QueueClient } from "../QueueClient"; import { AddWorkitemMessage, AddWorkitemQueueMessage, AddWorkitemsMessage, CustomCommandMessage, DeleteWorkitemMessage, DeleteWorkitemQueueMessage, GetWorkitemQueueMessage, PopWorkitemMessage, UpdateWorkitemMessage, UpdateWorkitemQueueMessage, Workitem, WorkitemQueue } from "@openiap/openflow-api"; import { WebServer } from "../WebServer"; import { iAgent } from "../commoninterfaces"; +import { RateLimiterMemory } from "rate-limiter-flexible"; const pako = require('pako'); const got = require("got"); @@ -287,6 +288,14 @@ export class Message { if (!NoderedUtil.IsNullEmpty(this.command)) { this.command = this.command.toLowerCase(); } let command: string = this.command; try { + if(Config.socket_rate_limit_duration != WebSocketServer.BaseRateLimiter.duration || Config.socket_rate_limit_points != WebSocketServer.BaseRateLimiter.points) { + Logger.instanse.info("Create new socket rate limitter", span, Logger.parsecli(cli)); + WebSocketServer.BaseRateLimiter = new RateLimiterMemory({ + points: Config.socket_rate_limit_points, + duration: Config.socket_rate_limit_duration, + }); + } + if (Config.socket_rate_limit) await WebSocketServer.BaseRateLimiter.consume(cli.id); } catch (error) { // if (error.consumedPoints) { @@ -304,7 +313,8 @@ export class Message { // setTimeout(() => { this.Process(cli); }, 250); // } // return; - return reject(error); + var e = new Error("Rate limit exceeded consumedPoints: " + error.consumedPoints); + return reject(e); } if (!NoderedUtil.IsNullEmpty(this.replyto)) { diff --git a/OpenFlow/src/WebServer.ts b/OpenFlow/src/WebServer.ts index 07b29f09..50485990 100644 --- a/OpenFlow/src/WebServer.ts +++ b/OpenFlow/src/WebServer.ts @@ -39,6 +39,14 @@ const rateLimiter = async (req: express.Request, res: express.Response, next: ex return next(); } try { + if(Config.api_rate_limit_duration != WebServer.BaseRateLimiter.duration || Config.api_rate_limit_points != WebServer.BaseRateLimiter.points) { + Logger.instanse.info("Create new api rate limitter", span); + WebServer.BaseRateLimiter = new RateLimiterMemory({ + points: Config.api_rate_limit_points, + duration: Config.api_rate_limit_duration, + }); + } + Logger.instanse.verbose("Validate for " + req.originalUrl, null); var e = await WebServer.BaseRateLimiter.consume(WebServer.remoteip(req)) Logger.instanse.verbose("consumedPoints: " + e.consumedPoints + " remainingPoints: " + e.remainingPoints, null);