Skip to content

Commit

Permalink
Merge pull request #289 from skadefro/master
Browse files Browse the repository at this point in the history
close 1.5.6
  • Loading branch information
skadefro authored Nov 29, 2023
2 parents d2dbb29 + 2976847 commit f2eb966
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 4 deletions.
16 changes: 14 additions & 2 deletions OpenFlow/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export class dbConfig extends Base {
const key = keys[i];
if(key == "_version") continue;
if(key == "disable_db_config") continue;
const value = conf[key];
let value = conf[key];
try {
if(key.startsWith("_")) continue;
// if(NoderedUtil.IsNullEmpty(value)) continue;
Expand All @@ -125,9 +125,20 @@ export class dbConfig extends Base {
}
}
if (Object.prototype.hasOwnProperty.call(Config, key)) {
let _default:any = Config.default_config[key]; // envorinment variable
if(typeof Config[key] === "boolean") {
value = Config.parseBoolean(value);
} else if(typeof Config[key] === "number") {
value = parseInt(value);
} else if(Array.isArray(Config[key])) {
value = Config.parseArray(value);
} else if(typeof Config[key] === "string") {
value = value;
} else {
continue;
}
Config[key] = value;

let _default:any = Config.default_config[key]; // envorinment variable
if(_default == null) _default = "";
let _env:any = process.env[key]; // db value
if(_env != null && _env != "") {
Expand Down Expand Up @@ -271,6 +282,7 @@ export class Config {
agent_HTTPS_PROXY: "",
agent_NO_PROXY: "",
agent_NPM_REGISTRY: "",
agent_NPM_TOKEN: "",

stripe_api_key: "",
stripe_api_secret: "",
Expand Down
12 changes: 11 additions & 1 deletion OpenFlow/src/Messages/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { QueueClient } from "../QueueClient";
import { AddWorkitemMessage, AddWorkitemQueueMessage, AddWorkitemsMessage, CustomCommandMessage, DeleteWorkitemMessage, DeleteWorkitemQueueMessage, GetWorkitemQueueMessage, PopWorkitemMessage, UpdateWorkitemMessage, UpdateWorkitemQueueMessage, Workitem, WorkitemQueue } from "@openiap/openflow-api";
import { WebServer } from "../WebServer";
import { iAgent } from "../commoninterfaces";
import { RateLimiterMemory } from "rate-limiter-flexible";
const pako = require('pako');
const got = require("got");

Expand Down Expand Up @@ -287,6 +288,14 @@ export class Message {
if (!NoderedUtil.IsNullEmpty(this.command)) { this.command = this.command.toLowerCase(); }
let command: string = this.command;
try {
if(Config.socket_rate_limit_duration != WebSocketServer.BaseRateLimiter.duration || Config.socket_rate_limit_points != WebSocketServer.BaseRateLimiter.points) {
Logger.instanse.info("Create new socket rate limitter", span, Logger.parsecli(cli));
WebSocketServer.BaseRateLimiter = new RateLimiterMemory({
points: Config.socket_rate_limit_points,
duration: Config.socket_rate_limit_duration,
});
}

if (Config.socket_rate_limit) await WebSocketServer.BaseRateLimiter.consume(cli.id);
} catch (error) {
// if (error.consumedPoints) {
Expand All @@ -304,7 +313,8 @@ export class Message {
// setTimeout(() => { this.Process(cli); }, 250);
// }
// return;
return reject(error);
var e = new Error("Rate limit exceeded consumedPoints: " + error.consumedPoints);
return reject(e);
}

if (!NoderedUtil.IsNullEmpty(this.replyto)) {
Expand Down
8 changes: 8 additions & 0 deletions OpenFlow/src/WebServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ const rateLimiter = async (req: express.Request, res: express.Response, next: ex
return next();
}
try {
if(Config.api_rate_limit_duration != WebServer.BaseRateLimiter.duration || Config.api_rate_limit_points != WebServer.BaseRateLimiter.points) {
Logger.instanse.info("Create new api rate limitter", span);
WebServer.BaseRateLimiter = new RateLimiterMemory({
points: Config.api_rate_limit_points,
duration: Config.api_rate_limit_duration,
});
}

Logger.instanse.verbose("Validate for " + req.originalUrl, null);
var e = await WebServer.BaseRateLimiter.consume(WebServer.remoteip(req))
Logger.instanse.verbose("consumedPoints: " + e.consumedPoints + " remainingPoints: " + e.remainingPoints, null);
Expand Down
10 changes: 10 additions & 0 deletions OpenFlow/src/amqpwrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ export class amqpwrapper extends events.EventEmitter {
}
async sendWithReplyTo(exchange: string, queue: string, replyTo: string, data: any, expiration: number, correlationId: string, routingkey: string, span: Span, priority: number = 1): Promise<void> {
await amqpwrapper.asyncWaitFor(() => this.connected);
if(data)
if (this.channel == null || this.conn == null) {
throw new Error("Cannot send message, when not connected");
}
Expand All @@ -447,6 +448,11 @@ export class amqpwrapper extends events.EventEmitter {
if (typeof data !== 'string' && !(data instanceof String)) {
data = JSON.stringify(data);
}
// PRECONDITION_FAILED - message size 155339741 is larger than configured max size 134217728
if(data.length > 130000000 ) {
Logger.instanse.error("send to queue: " + queue + " exchange: " + exchange + " PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000", span);
throw new Error("PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000")
}
Logger.instanse.silly("send to queue: " + queue + " exchange: " + exchange + " with reply to " + replyTo + " correlationId: " + correlationId, span);
const options: any = { mandatory: true };
options.replyTo = replyTo;
Expand Down Expand Up @@ -491,6 +497,10 @@ export class amqpwrapper extends events.EventEmitter {
if (typeof data !== 'string' && !(data instanceof String)) {
data = JSON.stringify(data);
}
if(data.length > 130000000 ) {
Logger.instanse.error("send to queue: " + queue + " exchange: " + exchange + " PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000", span);
throw new Error("PRECONDITION_FAILED - message size " + data.length + " is larger than configured max size 130000000")
}
if (NoderedUtil.IsNullEmpty(correlationId)) correlationId = NoderedUtil.GetUniqueIdentifier();
if (exchange != "openflow_logs") Logger.instanse.silly("send to queue: " + queue + " exchange: " + exchange, span);
const options: any = { mandatory: true };
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openiap/openflow",
"version": "1.5.5",
"version": "1.5.6",
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
"main": "index.js",
"scripts": {
Expand Down

0 comments on commit f2eb966

Please sign in to comment.