Skip to content

Commit

Permalink
fix socket memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
skadefro committed Nov 15, 2022
1 parent 277f58b commit fac3044
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 80 deletions.
1 change: 1 addition & 0 deletions OpenFlow/src/DBHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ export class DBHelper {
}
public async UserRoleUpdate(userrole: Base | TokenUser, watch: boolean, span: Span) {
if (NoderedUtil.IsNullUndefinded(userrole)) return;
if (!this._doClear(watch, span)) return;
if (userrole._type == "user") {
Logger.instanse.debug("Remove user from cache : " + userrole._id, span);
let u: User = userrole as any;
Expand Down
36 changes: 20 additions & 16 deletions OpenFlow/src/WebSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ export class WebSocketServer {
const payload = Crypt.decryptToken(cli.jwt);
const clockTimestamp = Math.floor(Date.now() / 1000);
if ((payload.exp - clockTimestamp) < 60) {
Logger.instanse.debug("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " expires in less than 1 minute, send new jwt to client", span);
Logger.instanse.debug("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " expires in less than 1 minute, send new jwt to client", span);
const tuser: TokenUser = await Message.DoSignin(cli, null, span);
if (tuser != null) {
span?.addEvent("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " expires in less than 1 minute, send new jwt to client");
span?.addEvent("Token for " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " expires in less than 1 minute, send new jwt to client");
const l: SigninMessage = new SigninMessage();
cli.jwt = Crypt.createToken(tuser, Config.shorttoken_expires_in);
l.jwt = cli.jwt;
Expand All @@ -219,7 +219,7 @@ export class WebSocketServer {
}
} catch (error) {
try {
Logger.instanse.debug(cli.id + "/" + cli.user?.name + "/" + cli.clientagent + " ERROR: " + (error.message || error), span);
Logger.instanse.debug(cli.id + "/" + cli.user?.name + "/" + cli.clientagent + "/" + cli.remoteip + " ERROR: " + (error.message || error), span);
if (cli != null) cli.Close(span);
} catch (error) {
}
Expand All @@ -229,11 +229,11 @@ export class WebSocketServer {
const seconds = (now.getTime() - cli.created.getTime()) / 1000;
if (seconds >= Config.client_signin_timeout) {
if (cli.user != null) {
span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection");
Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection", span);
span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection");
Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection", span);
} else {
span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection");
Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + " did not signin in after " + seconds + " seconds, close connection", span);
span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection");
Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " did not signin in after " + seconds + " seconds, close connection", span);
}
cli.Close(span);
}
Expand All @@ -247,26 +247,30 @@ export class WebSocketServer {
cli.lastheartbeatsec = seconds.toString();
if (seconds >= Config.client_heartbeat_timeout) {
if (cli.user != null) {
span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " timeout, close down");
Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + " timeout, close down", span);
span?.addEvent("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down");
Logger.instanse.debug("client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down", span);
} else {
span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + " timeout, close down");
Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + " timeout, close down", span);
span?.addEvent("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down");
Logger.instanse.debug("client not signed/" + cli.id + "/" + cli.clientagent + "/" + cli.remoteip + " timeout, close down", span);
}
cli.Close(span);
}
cli.ping(span);
if (!cli.connected() && cli.queuecount() == 0) {
if (cli.user != null) {
Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent, span);
span?.addEvent("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent);
Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip, span);
span?.addEvent("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent + "/" + cli.remoteip);
} else {
Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.clientagent, span);
span?.addEvent("removing disconnected client " + cli.id + "/" + cli.clientagent);
Logger.instanse.debug("removing disconnected client " + cli.id + "/" + cli.clientagent + "/" + cli.remoteip, span);
span?.addEvent("removing disconnected client " + cli.id + "/" + cli.clientagent + "/" + cli.remoteip);
}
try {
cli.Close(span)
WebSocketServer._clients.splice(i, 1);
if (cli._socketObject == null || cli._socketObject.readyState === cli._socketObject.CLOSED) {
WebSocketServer._clients.splice(i, 1);
} else {
Logger.instanse.silly("Not ready to remove client yet " + cli.id + "/" + cli.clientagent + "/" + cli.remoteip, span);
}
} catch (error) {
Logger.instanse.error(error, span);
}
Expand Down
48 changes: 21 additions & 27 deletions OpenFlow/src/WebSocketServerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ export class WebSocketServerClient {
public _queuescurrentstr: string = "0";
public _exchanges: amqpexchange[] = [];
public devnull: boolean = false;
private _message: any = null;
private _error: any = null;
private _close: any = null;
private _amqpdisconnected: any = null;
private _dbdisconnected: any = null;
private _dbconnected: any = null;
Expand All @@ -84,10 +87,13 @@ export class WebSocketServerClient {
const span: Span = Logger.otel.startSpanExpress("WebSocketServerClient.Initialize", req);
try {
this._socketObject = socketObject;
this._socketObject.on("open", this.open.bind(this));
this._socketObject.on("message", this.message.bind(this)); // e: MessageEvent
this._socketObject.on("error", this.error.bind(this));
this._socketObject.on("close", this.close.bind(this));

this._message = this.message.bind(this);
this._error = this.error.bind(this);
this._close = this.close.bind(this);
this._socketObject.on("message", this._message);
this._socketObject.on("error", this._error);
this._socketObject.on("close", this._close);
this._dbdisconnected = this.dbdisconnected.bind(this);
this._dbconnected = this.dbconnected.bind(this);
this._amqpdisconnected = this.amqpdisconnected.bind(this);
Expand Down Expand Up @@ -167,13 +173,10 @@ export class WebSocketServerClient {
this._exchanges = [];
this.CloseConsumers(null);
}
private open(e: Event): void {
Logger.instanse.debug("Connection opened " + e + " " + this.id, null);
}
private close(e: CloseEvent): void {
Logger.instanse.debug("Connection closed " + e + " " + this.id + "/" + this.clientagent, null, Logger.parsecli(this));
this.init_complete = false;
this.Close(null);
// this.Close(null);
}
private error(e: Event): void {
Logger.instanse.error(e, null, Logger.parsecli(this));
Expand All @@ -187,9 +190,6 @@ export class WebSocketServerClient {
if (this._socketObject.readyState === this._socketObject.OPEN || this._socketObject.readyState === this._socketObject.CONNECTING) {
return true;
}
if (this._socketObject.readyState === this._socketObject.CLOSED) {
delete this._socketObject;
}
return false;
}
public ping(parent: Span): void {
Expand Down Expand Up @@ -221,7 +221,7 @@ export class WebSocketServerClient {
Logger.otel.endSpan(span);
}
}
private _message(message: string): void {
private message(message: string): void {
try {
Logger.instanse.silly("WebSocket message received " + message, null, Logger.parsecli(this));
let msg: SocketMessage = SocketMessage.fromjson(message);
Expand All @@ -236,15 +236,11 @@ export class WebSocketServerClient {
if ((msg.index + 1) >= msg.count) this.ProcessQueue(null);
} catch (error) {
Logger.instanse.error(error, null, Logger.parsecli(this));
}
}
private async message(message: string): Promise<void> {
try {
this._message(message);
} catch (error) {
Logger.instanse.error(error, null, Logger.parsecli(this));
const errormessage: Message = new Message(); errormessage.command = "error"; errormessage.data = (error.message ? error.message : error);
this._socketObject.send(JSON.stringify(errormessage));
try {
const errormessage: Message = new Message(); errormessage.command = "error"; errormessage.data = (error.message ? error.message : error);
this._socketObject.send(JSON.stringify(errormessage));
} catch (error) {
}
}
}
public async CloseConsumers(parent: Span): Promise<void> {
Expand All @@ -269,14 +265,12 @@ export class WebSocketServerClient {
if (this._dbdisconnected != null) Config.db.removeListener("disconnected", this._dbdisconnected);
if (this._dbconnected != null) Config.db.removeListener("connected", this._dbconnected);

if (this._message != null) Config.db.removeListener("disconnected", this._message);
if (this._error != null) Config.db.removeListener("disconnected", this._error);
if (this._close != null) Config.db.removeListener("disconnected", this._close);

await this.CloseConsumers(span);
// await this.CloseStreams();
if (!NoderedUtil.IsNullUndefinded(this._socketObject)) {
try {
this._socketObject.removeAllListeners();
} catch (error) {
Logger.instanse.error(error, span, Logger.parsecli(this));
}
try {
this._socketObject.close();
} catch (error) {
Expand Down
18 changes: 9 additions & 9 deletions OpenFlow/src/public/app.css
Original file line number Diff line number Diff line change
Expand Up @@ -546,33 +546,33 @@ textarea{

@media (min-width: 1201px) {
.container,
.container-lg,
.container-md,
.container-sm,
.container-xl {
.container-lg,
.container-md,
.container-sm,
.container-xl {
max-width: 95%;
padding-top: 2%;
}
}
@media (min-width: 993px) {
.container-lg,
.container-md,
.container-sm {
.container-md,
.container-sm {
max-width: 95%;
padding-top: 2%;
}
}
@media (min-width: 769px) {
.container,
.container-md,
.container-sm {
.container-md,
.container-sm {
max-width: 95%;
padding-top: 2%;
}
}
@media (min-width: 577px) {
.container,
.container-sm {
.container-sm {
max-width: 95%;
padding-top: 2%;
}
Expand Down
4 changes: 2 additions & 2 deletions OpenFlowNodeRED/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"jsonwebtoken": "^8.5.1",
"morgan": "^1.10.0",
"node-red": "^3.0.2",
"pako": "^2.0.4",
"pako": "^2.1.0",
"passport-saml": "^3.2.1",
"passport-saml-metadata": "^2.6.0",
"pm2": "^5.2.2",
Expand All @@ -54,6 +54,6 @@
},
"devDependencies": {
"@types/compression": "^1.7.2",
"@types/node": "^18.11.5"
"@types/node": "^18.11.9"
}
}
48 changes: 23 additions & 25 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@
},
"dependencies": {
"@fortawesome/fontawesome-free": "^5.15.3",
"@kubernetes/client-node": "0.17.0",
"@kubernetes/client-node": "0.17.1",
"@openiap/openflow-api": "^2.1.6",
"@opentelemetry/api-metrics": "^0.32.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.32.0",
"@opentelemetry/sdk-metrics-base": "^0.31.0",
"@opentelemetry/sdk-node": "^0.32.0",
"@skadefro/cache-manager-mongodb": "^0.3.2",
"@types/amqplib": "^0.8.2",
"@types/node": "^18.7.13",
"amqplib": "^0.10.2",
"@types/amqplib": "^0.10.0",
"@types/node": "^18.11.9",
"amqplib": "^0.10.3",
"bcryptjs": "^2.4.3",
"body-parser": "^1.20.0",
"body-parser": "^1.20.1",
"cache-manager": "^4.1.0",
"cache-manager-ioredis": "^2.1.0",
"cache-require-paths": "^0.3.0",
Expand All @@ -54,56 +54,54 @@
"cookie-parser": "^1.4.6",
"cookie-session": "^2.0.0",
"dockerode": "^3.3.4",
"dotenv": "^16.0.1",
"envfile": "^6.17.0",
"envfile": "^6.18.0",
"express": "^4.18.1",
"flash": "^1.1.0",
"got": "^11.8.5",
"halfmoon": "^1.1.1",
"jose": "^2.0.6",
"json-stable-stringify": "^1.0.1",
"json-stringify-safe": "^5.0.1",
"jsondiffpatch": "^0.4.1",
"jsonpath-plus": "^7.0.0",
"jsonpath-plus": "^7.2.0",
"jsonwebtoken": "^8.5.1",
"mimetype": "0.0.8",
"mongodb": "^4.9.0",
"mongodb": "^4.11.0",
"multer": "^1.4.2",
"multer-gridfs-storage": "^5.0.2",
"nodemailer": "^6.7.8",
"nodemailer": "^6.8.0",
"nyc": "^15.1.0",
"oauth2-server": "^3.1.1",
"oidc-provider": "^6.31.0",
"pako": "^2.0.4",
"pako": "^2.1.0",
"passport": "^0.5.3",
"passport-google-oauth20": "^2.0.0",
"passport-local": "^1.0.0",
"passport-openidconnect": "^0.1.1",
"passport-saml": "^3.2.1",
"passport-saml-metadata": "^2.6.0",
"pm2": "^5.2.0",
"rate-limiter-flexible": "^2.3.8",
"pm2": "^5.2.2",
"rate-limiter-flexible": "^2.4.1",
"readline-sync": "^1.4.10",
"request": "^2.88.2",
"saml20": "^0.1.14",
"samlp": "^6.0.2",
"web-push": "^3.5.0",
"ws": "^8.8.1"
"ws": "^8.11.0"
},
"devDependencies": {
"@testdeck/mocha": "0.2.0",
"@testdeck/mocha": "0.3.2",
"@types/angular": "^1.8.4",
"@types/angular-route": "^1.7.2",
"@types/express": "^4.17.13",
"@types/mocha": "9.1.1",
"@types/passport": "^1.0.10",
"@types/express": "^4.17.14",
"@types/mocha": "10.0.0",
"@types/passport": "^1.0.11",
"angular": "^1.8.2",
"angular-chart.js": "^1.1.1",
"angular-route": "^1.8.2",
"angular-sanitize": "^1.8.2",
"browserify": "^17.0.0",
"browserify-css": "^0.15.0",
"chart.js": "^2.9.4",
"dotenv": "^16.0.3",
"exorcist": "^2.0.0",
"formBuilder": "^3.8.3",
"gulp": "^4.0.2",
Expand All @@ -112,16 +110,16 @@
"gulp-shell": "^0.8.0",
"gulp-typescript": "^6.0.0-alpha.1",
"merge-stream": "^2.0.0",
"mocha": "^10.0.0",
"mocha": "^10.1.0",
"popper.js": "^1.16.1",
"sass": "^1.54.5",
"sass": "^1.56.1",
"shepherd.js": "^10.0.1",
"tinyify": "^3.1.0",
"tinyify": "^4.0.0",
"ts-mockito": "^2.6.1",
"ts-node": "^10.9.1",
"tsify": "^5.0.4",
"typescript": "^4.8.2",
"typescript": "^4.8.4",
"watchify": "^4.0.0",
"wtfnode": "^0.9.1"
}
}
}
2 changes: 1 addition & 1 deletion test/loadtest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import { Logger } from '../OpenFlow/src/Logger';
}

@timeout(6000000)
// @test
@test
async 'crud connection load test'() {
await this.createandconnect(0);
var Promises: Promise<any>[] = [];
Expand Down

0 comments on commit fac3044

Please sign in to comment.