From 502b728fea02efe2609b0acd7a91e81a28fd5a2c Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 21 Jun 2024 11:08:23 +0200 Subject: [PATCH] fix: properly call send callbacks during upgrade --- lib/socket.ts | 5 +++-- lib/transports/polling.ts | 3 ++- lib/transports/websocket.ts | 3 ++- lib/transports/webtransport.ts | 3 ++- test/server.js | 23 +++++++++++++++++++++++ 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/lib/socket.ts b/lib/socket.ts index 0febe90e..1dfa460b 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -221,18 +221,21 @@ export class Socket extends EventEmitter { */ private setTransport(transport) { const onError = this.onError.bind(this); + const onReady = () => this.flush(); const onPacket = this.onPacket.bind(this); const onDrain = this.onDrain.bind(this); const onClose = this.onClose.bind(this, "transport close"); this.transport = transport; this.transport.once("error", onError); + this.transport.on("ready", onReady); this.transport.on("packet", onPacket); this.transport.on("drain", onDrain); this.transport.once("close", onClose); this.cleanupFn.push(function () { transport.removeListener("error", onError); + transport.removeListener("ready", onReady); transport.removeListener("packet", onPacket); transport.removeListener("drain", onDrain); transport.removeListener("close", onClose); @@ -245,8 +248,6 @@ export class Socket extends EventEmitter { * @private */ private onDrain() { - this.flush(); - if (this.sentCallbackFn.length > 0) { debug("executing batch send callback"); const seqFn = this.sentCallbackFn.shift(); diff --git a/lib/transports/polling.ts b/lib/transports/polling.ts index 598315b7..09af20fb 100644 --- a/lib/transports/polling.ts +++ b/lib/transports/polling.ts @@ -96,7 +96,7 @@ export class Polling extends Transport { req.on("close", onClose); this.writable = true; - this.emit("drain"); + this.emit("ready"); // if we're still writable but had a pending close, trigger an empty send if (this.writable && this.shouldClose) { @@ -258,6 +258,7 @@ export class Polling extends Transport { debug('writing "%s"', data); this.doWrite(data, options, () => { this.req.cleanup(); + this.emit("drain"); }); } diff --git a/lib/transports/websocket.ts b/lib/transports/websocket.ts index e4cc2623..788094f4 100644 --- a/lib/transports/websocket.ts +++ b/lib/transports/websocket.ts @@ -103,8 +103,9 @@ export class WebSocket extends Transport { if (err) { this.onError("write error", err.stack); } else { - this.writable = true; this.emit("drain"); + this.writable = true; + this.emit("ready"); } }; diff --git a/lib/transports/webtransport.ts b/lib/transports/webtransport.ts index 663a0f66..07852282 100644 --- a/lib/transports/webtransport.ts +++ b/lib/transports/webtransport.ts @@ -56,8 +56,9 @@ export class WebTransport extends Transport { debug("error while writing: %s", e.message); } - this.writable = true; this.emit("drain"); + this.writable = true; + this.emit("ready"); } doClose(fn) { diff --git a/test/server.js b/test/server.js index c7c914bb..9f5ea33a 100644 --- a/test/server.js +++ b/test/server.js @@ -2701,6 +2701,29 @@ describe("server", () => { }); }); + it("should execute when message sent during polling upgrade window", (done) => { + const engine = listen((port) => { + const socket = new ClientSocket(`ws://localhost:${port}`, { + transports: ["polling", "websocket"], + }); + + const partialDone = createPartialDone(() => { + engine.httpServer.close(); + socket.close(); + done(); + }, 2); + + engine.on("connection", (conn) => { + conn.on("upgrading", () => { + conn.send("a", partialDone); + }); + }); + socket.on("open", () => { + socket.on("message", partialDone); + }); + }); + }); + it("should execute when message sent (websocket)", (done) => { const engine = listen({ allowUpgrades: false }, (port) => { const socket = new ClientSocket(`ws://localhost:${port}`, {