From 69c708916fcf0dd33b2f943e51f7bb429081e22e Mon Sep 17 00:00:00 2001 From: Allan Zimmermann Date: Mon, 14 Nov 2022 17:42:33 +0100 Subject: [PATCH 1/2] bump --- OpenFlowNodeRED/package.json | 2 +- VERSION | 2 +- package.json | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/OpenFlowNodeRED/package.json b/OpenFlowNodeRED/package.json index c0558b52..b6cf736c 100644 --- a/OpenFlowNodeRED/package.json +++ b/OpenFlowNodeRED/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/nodered", - "version": "1.4.31", + "version": "1.4.32", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { diff --git a/VERSION b/VERSION index dc37b34b..7e0d42a4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.4.31 \ No newline at end of file +1.4.32 \ No newline at end of file diff --git a/package.json b/package.json index d7177e0a..38fc6b18 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.4.31", + "version": "1.4.32", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { @@ -124,4 +124,4 @@ "watchify": "^4.0.0", "wtfnode": "^0.9.1" } -} +} \ No newline at end of file From fac304449723307fd71a2cfa0d211b014486f1c2 Mon Sep 17 00:00:00 2001 From: Allan Zimmermann Date: Tue, 15 Nov 2022 15:54:41 +0100 Subject: [PATCH 2/2] fix socket memory leak --- OpenFlow/src/DBHelper.ts | 1 + OpenFlow/src/WebSocketServer.ts | 36 +++++++++++--------- OpenFlow/src/WebSocketServerClient.ts | 48 ++++++++++++--------------- OpenFlow/src/public/app.css | 18 +++++----- OpenFlowNodeRED/package.json | 4 +-- package.json | 48 +++++++++++++-------------- test/loadtest.test.ts | 2 +- 7 files changed, 77 insertions(+), 80 deletions(-) diff --git a/OpenFlow/src/DBHelper.ts b/OpenFlow/src/DBHelper.ts index a1b2adcc..c085893f 100644 --- a/OpenFlow/src/DBHelper.ts +++ b/OpenFlow/src/DBHelper.ts @@ -642,6 +642,7 @@ export class DBHelper { } public async UserRoleUpdate(userrole: Base | TokenUser, watch: boolean, span: Span) { if (NoderedUtil.IsNullUndefinded(userrole)) return; + if (!this._doClear(watch, span)) return; if (userrole._type == "user") { Logger.instanse.debug("Remove user from cache : " + userrole._id, span); let u: User = userrole as any; diff --git a/OpenFlow/src/WebSocketServer.ts b/OpenFlow/src/WebSocketServer.ts index fda89738..cbbfea80 100644 --- a/OpenFlow/src/WebSocketServer.ts +++ b/OpenFlow/src/WebSocketServer.ts @@ -202,10 +202,10 @@ export class WebSocketServer { const payload = Crypt.decryptToken(cli.jwt); const clockTimestamp = Math.floor(Date.now() / 1000); if ((payload.exp - clockTimestamp) < 60) { - Logger.instanse.debug("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " expires in less than 1 minute, send new jwt to client", span); + Logger.instanse.debug("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " expires in less than 1 minute, send new jwt to client", span); const tuser: TokenUser = await Message.DoSignin(cli, null, span); if (tuser != null) { - span?.addEvent("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " expires in less than 1 minute, send new jwt to client"); + span?.addEvent("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " expires in less than 1 minute, send new jwt to client"); const l: SigninMessage = new SigninMessage(); cli.jwt = Crypt.createToken(tuser, Config.shorttoken_expires_in); l.jwt = cli.jwt; @@ -219,7 +219,7 @@ export class WebSocketServer { } } catch (error) { try { - Logger.instanse.debug(cli.id + "/" + cli.user?.name + "/" + cli.clientagent + " ERROR: " + (error.message || error), span); + Logger.instanse.debug(cli.id + "/" + cli.user?.name + "/" + cli.clientagent + "/" + cli.remoteip + " ERROR: " + (error.message || error), span); if (cli != null) cli.Close(span); } catch (error) { } @@ -229,11 +229,11 @@ export class WebSocketServer { const seconds = (now.getTime() - cli.created.getTime()) / 1000; if (seconds >= Config.client_signin_timeout) { if (cli.user != null) { - span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection"); - Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection", span); + span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection"); + Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection", span); } else { - span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection"); - Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection", span); + span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection"); + Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection", span); } cli.Close(span); } @@ -247,26 +247,30 @@ export class WebSocketServer { cli.lastheartbeatsec = seconds.toString(); if (seconds >= Config.client_heartbeat_timeout) { if (cli.user != null) { - span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " timeout, close down"); - Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " timeout, close down", span); + span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down"); + Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down", span); } else { - span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + " timeout, close down"); - Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + " timeout, close down", span); + span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down"); + Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down", span); } cli.Close(span); } cli.ping(span); if (!cli.connected() && cli.queuecount() == 0) { if (cli.user != null) { - Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent, span); - span?.addEvent("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent); + Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip, span); + span?.addEvent("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip); } else { - Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.clientagent, span); - span?.addEvent("removing disconnected client " + cli.id + "/" + cli.clientagent); + Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.clientagent + "/" + cli.remoteip, span); + span?.addEvent("removing disconnected client " + cli.id + "/" + cli.clientagent + "/" + cli.remoteip); } try { cli.Close(span) - WebSocketServer._clients.splice(i, 1); + if (cli._socketObject == null || cli._socketObject.readyState === cli._socketObject.CLOSED) { + WebSocketServer._clients.splice(i, 1); + } else { + Logger.instanse.silly("Not ready to remove client yet " + cli.id + "/" + cli.clientagent + "/" + cli.remoteip, span); + } } catch (error) { Logger.instanse.error(error, span); } diff --git a/OpenFlow/src/WebSocketServerClient.ts b/OpenFlow/src/WebSocketServerClient.ts index b094ff82..45f9a202 100644 --- a/OpenFlow/src/WebSocketServerClient.ts +++ b/OpenFlow/src/WebSocketServerClient.ts @@ -67,6 +67,9 @@ export class WebSocketServerClient { public _queuescurrentstr: string = "0"; public _exchanges: amqpexchange[] = []; public devnull: boolean = false; + private _message: any = null; + private _error: any = null; + private _close: any = null; private _amqpdisconnected: any = null; private _dbdisconnected: any = null; private _dbconnected: any = null; @@ -84,10 +87,13 @@ export class WebSocketServerClient { const span: Span = Logger.otel.startSpanExpress("WebSocketServerClient.Initialize", req); try { this._socketObject = socketObject; - this._socketObject.on("open", this.open.bind(this)); - this._socketObject.on("message", this.message.bind(this)); // e: MessageEvent - this._socketObject.on("error", this.error.bind(this)); - this._socketObject.on("close", this.close.bind(this)); + + this._message = this.message.bind(this); + this._error = this.error.bind(this); + this._close = this.close.bind(this); + this._socketObject.on("message", this._message); + this._socketObject.on("error", this._error); + this._socketObject.on("close", this._close); this._dbdisconnected = this.dbdisconnected.bind(this); this._dbconnected = this.dbconnected.bind(this); this._amqpdisconnected = this.amqpdisconnected.bind(this); @@ -167,13 +173,10 @@ export class WebSocketServerClient { this._exchanges = []; this.CloseConsumers(null); } - private open(e: Event): void { - Logger.instanse.debug("Connection opened " + e + " " + this.id, null); - } private close(e: CloseEvent): void { Logger.instanse.debug("Connection closed " + e + " " + this.id + "/" + this.clientagent, null, Logger.parsecli(this)); this.init_complete = false; - this.Close(null); + // this.Close(null); } private error(e: Event): void { Logger.instanse.error(e, null, Logger.parsecli(this)); @@ -187,9 +190,6 @@ export class WebSocketServerClient { if (this._socketObject.readyState === this._socketObject.OPEN || this._socketObject.readyState === this._socketObject.CONNECTING) { return true; } - if (this._socketObject.readyState === this._socketObject.CLOSED) { - delete this._socketObject; - } return false; } public ping(parent: Span): void { @@ -221,7 +221,7 @@ export class WebSocketServerClient { Logger.otel.endSpan(span); } } - private _message(message: string): void { + private message(message: string): void { try { Logger.instanse.silly("WebSocket message received " + message, null, Logger.parsecli(this)); let msg: SocketMessage = SocketMessage.fromjson(message); @@ -236,15 +236,11 @@ export class WebSocketServerClient { if ((msg.index + 1) >= msg.count) this.ProcessQueue(null); } catch (error) { Logger.instanse.error(error, null, Logger.parsecli(this)); - } - } - private async message(message: string): Promise { - try { - this._message(message); - } catch (error) { - Logger.instanse.error(error, null, Logger.parsecli(this)); - const errormessage: Message = new Message(); errormessage.command = "error"; errormessage.data = (error.message ? error.message : error); - this._socketObject.send(JSON.stringify(errormessage)); + try { + const errormessage: Message = new Message(); errormessage.command = "error"; errormessage.data = (error.message ? error.message : error); + this._socketObject.send(JSON.stringify(errormessage)); + } catch (error) { + } } } public async CloseConsumers(parent: Span): Promise { @@ -269,14 +265,12 @@ export class WebSocketServerClient { if (this._dbdisconnected != null) Config.db.removeListener("disconnected", this._dbdisconnected); if (this._dbconnected != null) Config.db.removeListener("connected", this._dbconnected); + if (this._message != null) Config.db.removeListener("disconnected", this._message); + if (this._error != null) Config.db.removeListener("disconnected", this._error); + if (this._close != null) Config.db.removeListener("disconnected", this._close); + await this.CloseConsumers(span); - // await this.CloseStreams(); if (!NoderedUtil.IsNullUndefinded(this._socketObject)) { - try { - this._socketObject.removeAllListeners(); - } catch (error) { - Logger.instanse.error(error, span, Logger.parsecli(this)); - } try { this._socketObject.close(); } catch (error) { diff --git a/OpenFlow/src/public/app.css b/OpenFlow/src/public/app.css index ab46ad5a..df5f1e78 100644 --- a/OpenFlow/src/public/app.css +++ b/OpenFlow/src/public/app.css @@ -546,33 +546,33 @@ textarea{ @media (min-width: 1201px) { .container, -.container-lg, -.container-md, -.container-sm, -.container-xl { + .container-lg, + .container-md, + .container-sm, + .container-xl { max-width: 95%; padding-top: 2%; } } @media (min-width: 993px) { .container-lg, -.container-md, -.container-sm { + .container-md, + .container-sm { max-width: 95%; padding-top: 2%; } } @media (min-width: 769px) { .container, -.container-md, -.container-sm { + .container-md, + .container-sm { max-width: 95%; padding-top: 2%; } } @media (min-width: 577px) { .container, -.container-sm { + .container-sm { max-width: 95%; padding-top: 2%; } diff --git a/OpenFlowNodeRED/package.json b/OpenFlowNodeRED/package.json index b6cf736c..b65de1ef 100644 --- a/OpenFlowNodeRED/package.json +++ b/OpenFlowNodeRED/package.json @@ -43,7 +43,7 @@ "jsonwebtoken": "^8.5.1", "morgan": "^1.10.0", "node-red": "^3.0.2", - "pako": "^2.0.4", + "pako": "^2.1.0", "passport-saml": "^3.2.1", "passport-saml-metadata": "^2.6.0", "pm2": "^5.2.2", @@ -54,6 +54,6 @@ }, "devDependencies": { "@types/compression": "^1.7.2", - "@types/node": "^18.11.5" + "@types/node": "^18.11.9" } } \ No newline at end of file diff --git a/package.json b/package.json index 38fc6b18..8ffdd191 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ }, "dependencies": { "@fortawesome/fontawesome-free": "^5.15.3", - "@kubernetes/client-node": "0.17.0", + "@kubernetes/client-node": "0.17.1", "@openiap/openflow-api": "^2.1.6", "@opentelemetry/api-metrics": "^0.32.0", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0", @@ -41,11 +41,11 @@ "@opentelemetry/sdk-metrics-base": "^0.31.0", "@opentelemetry/sdk-node": "^0.32.0", "@skadefro/cache-manager-mongodb": "^0.3.2", - "@types/amqplib": "^0.8.2", - "@types/node": "^18.7.13", - "amqplib": "^0.10.2", + "@types/amqplib": "^0.10.0", + "@types/node": "^18.11.9", + "amqplib": "^0.10.3", "bcryptjs": "^2.4.3", - "body-parser": "^1.20.0", + "body-parser": "^1.20.1", "cache-manager": "^4.1.0", "cache-manager-ioredis": "^2.1.0", "cache-require-paths": "^0.3.0", @@ -54,49 +54,46 @@ "cookie-parser": "^1.4.6", "cookie-session": "^2.0.0", "dockerode": "^3.3.4", - "dotenv": "^16.0.1", - "envfile": "^6.17.0", + "envfile": "^6.18.0", "express": "^4.18.1", "flash": "^1.1.0", "got": "^11.8.5", "halfmoon": "^1.1.1", "jose": "^2.0.6", - "json-stable-stringify": "^1.0.1", - "json-stringify-safe": "^5.0.1", "jsondiffpatch": "^0.4.1", - "jsonpath-plus": "^7.0.0", + "jsonpath-plus": "^7.2.0", "jsonwebtoken": "^8.5.1", "mimetype": "0.0.8", - "mongodb": "^4.9.0", + "mongodb": "^4.11.0", "multer": "^1.4.2", "multer-gridfs-storage": "^5.0.2", - "nodemailer": "^6.7.8", + "nodemailer": "^6.8.0", "nyc": "^15.1.0", "oauth2-server": "^3.1.1", "oidc-provider": "^6.31.0", - "pako": "^2.0.4", + "pako": "^2.1.0", "passport": "^0.5.3", "passport-google-oauth20": "^2.0.0", "passport-local": "^1.0.0", "passport-openidconnect": "^0.1.1", "passport-saml": "^3.2.1", "passport-saml-metadata": "^2.6.0", - "pm2": "^5.2.0", - "rate-limiter-flexible": "^2.3.8", + "pm2": "^5.2.2", + "rate-limiter-flexible": "^2.4.1", "readline-sync": "^1.4.10", "request": "^2.88.2", "saml20": "^0.1.14", "samlp": "^6.0.2", "web-push": "^3.5.0", - "ws": "^8.8.1" + "ws": "^8.11.0" }, "devDependencies": { - "@testdeck/mocha": "0.2.0", + "@testdeck/mocha": "0.3.2", "@types/angular": "^1.8.4", "@types/angular-route": "^1.7.2", - "@types/express": "^4.17.13", - "@types/mocha": "9.1.1", - "@types/passport": "^1.0.10", + "@types/express": "^4.17.14", + "@types/mocha": "10.0.0", + "@types/passport": "^1.0.11", "angular": "^1.8.2", "angular-chart.js": "^1.1.1", "angular-route": "^1.8.2", @@ -104,6 +101,7 @@ "browserify": "^17.0.0", "browserify-css": "^0.15.0", "chart.js": "^2.9.4", + "dotenv": "^16.0.3", "exorcist": "^2.0.0", "formBuilder": "^3.8.3", "gulp": "^4.0.2", @@ -112,16 +110,16 @@ "gulp-shell": "^0.8.0", "gulp-typescript": "^6.0.0-alpha.1", "merge-stream": "^2.0.0", - "mocha": "^10.0.0", + "mocha": "^10.1.0", "popper.js": "^1.16.1", - "sass": "^1.54.5", + "sass": "^1.56.1", "shepherd.js": "^10.0.1", - "tinyify": "^3.1.0", + "tinyify": "^4.0.0", "ts-mockito": "^2.6.1", "ts-node": "^10.9.1", "tsify": "^5.0.4", - "typescript": "^4.8.2", + "typescript": "^4.8.4", "watchify": "^4.0.0", "wtfnode": "^0.9.1" } -} \ No newline at end of file +} diff --git a/test/loadtest.test.ts b/test/loadtest.test.ts index 42f86006..9a5b2806 100644 --- a/test/loadtest.test.ts +++ b/test/loadtest.test.ts @@ -79,7 +79,7 @@ import { Logger } from '../OpenFlow/src/Logger'; } @timeout(6000000) - // @test + @test async 'crud connection load test'() { await this.createandconnect(0); var Promises: Promise[] = [];