From 63e705b5c705407f26849ea9271c9798e8c1b902 Mon Sep 17 00:00:00 2001 From: prostgles Date: Fri, 8 Dec 2023 10:50:08 +0200 Subject: [PATCH] fix kill query mismatch bug --- lib/DboBuilder/QueryStreamer.ts | 18 +++++++++--------- package-lock.json | 4 ++-- package.json | 2 +- tests/server/package-lock.json | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/DboBuilder/QueryStreamer.ts b/lib/DboBuilder/QueryStreamer.ts index 0d2a1e5e..14b0aef5 100644 --- a/lib/DboBuilder/QueryStreamer.ts +++ b/lib/DboBuilder/QueryStreamer.ts @@ -132,7 +132,7 @@ export class QueryStreamer { this.socketQueries[socketId]![id]!.client = currentClient; try { if(!client){ - await currentClient.connect(); + await currentClient.connect(); } processID = (currentClient as any).processID const queryStream = new QueryStream(query.query, undefined, { batchSize: 1e6, highWaterMark: 1e6, rowMode: "array" }); @@ -167,11 +167,12 @@ export class QueryStreamer { const streamResult = getStreamResult(); streamState = "ended"; emit("ended", streamResult); - // release the client when the stream is finished AND connection is not persisted - if(!query.options?.persistStreamConnection){ - delete this.socketQueries[socketId]?.[id]; - currentClient.end(); + + if(query.options?.persistStreamConnection){ + return; } + delete this.socketQueries[socketId]?.[id]; + currentClient.end(); }); } catch(err){ socketQuery.onError(err); @@ -189,7 +190,7 @@ export class QueryStreamer { if(!queryClient) return; try { const stopFunction = opts?.terminate? "pg_terminate_backend" : "pg_cancel_backend"; - const rows = await this.adminClient.query(`SELECT ${stopFunction}(pid), pid, state, query FROM pg_stat_activity WHERE pid = $1 AND query = $2`, [processID, query.query]); + const rows = await this.adminClient.query(`SELECT ${stopFunction}(pid), pid, state, query FROM pg_stat_activity WHERE pid = $1`, [processID]); cleanup() cb({ processID, info: rows.rows[0] }); } catch (error){ @@ -204,12 +205,11 @@ export class QueryStreamer { socket.removeAllListeners(channel); socket.on(channel, async (_data: { query: string; params: any } | undefined, cb: BasicCallback) => { if(streamState === "started"){ - // TODO return cb(null, "Already started"); } streamState = "started"; try { - /* Query for persisted connection */ + /* Persisted connection query */ if(runCount){ const persistedClient = this.socketQueries[socketId]?.[id]; if(!persistedClient) throw "Persisted query client not found"; @@ -226,7 +226,7 @@ export class QueryStreamer { runCount++; }); - /** If not started in 5 seconds then assume this will never happen */ + /** If not started within 5 seconds then assume it will never happen */ setTimeout(() => { if(streamState) return; cleanup(); diff --git a/package-lock.json b/package-lock.json index 35f9ccf8..51764858 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "prostgles-server", - "version": "4.1.140", + "version": "4.1.141", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "prostgles-server", - "version": "4.1.140", + "version": "4.1.141", "license": "MIT", "dependencies": { "@types/express": "^4.17.13", diff --git a/package.json b/package.json index f6316e23..05d10b38 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "prostgles-server", - "version": "4.1.140", + "version": "4.1.141", "description": "", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/tests/server/package-lock.json b/tests/server/package-lock.json index a9a78144..337e0d21 100644 --- a/tests/server/package-lock.json +++ b/tests/server/package-lock.json @@ -21,7 +21,7 @@ }, "../..": { "name": "prostgles-server", - "version": "4.1.140", + "version": "4.1.141", "license": "MIT", "dependencies": { "@types/express": "^4.17.13",