From dadf3fae081f896643152e36ee3bf20992d72b76 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Mon, 9 Oct 2023 17:13:28 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Fix=20Presence=20`Stream`?= =?UTF-8?q?=20leak?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At the moment, when subscribing a `Presence` multiple times, multiple PubSub `Stream`s are created per-subscription. However, unsubscribing will only destroy the [last created][1], resulting in streams that are never destroyed. This change updates our subscription logic to check if we already have an existing stream. If we already have a stream, then we just re-request presence from other clients (which is needed by our [`DocPresence` logic][2]), and return. Note that we also tweak our unsubscribe logic to eagerly remove the stream after calling destroy (so that subsequent subscribes don't try to reuse a stream that is currently being destroyed). We also remove a check against `seq`, which is currently not covered by tests (and actually never calls the `callback`), so is clearly broken and unneeded anyway. [1]: https://github.com/share/sharedb/blob/62e4ec5d46c0dcb097931e9dae60240aaba6b2b3/lib/agent.js#L829-L830 [2]: https://github.com/share/sharedb/blob/62e4ec5d46c0dcb097931e9dae60240aaba6b2b3/lib/client/presence/remote-doc-presence.js#L108-L110 --- lib/agent.js | 19 +++++++++++++++---- test/client/presence/presence.js | 16 ++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 3c6fa4161..d8c7f3cae 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -807,6 +807,19 @@ Agent.prototype._createPresence = function(request) { Agent.prototype._subscribePresence = function(channel, seq, callback) { var agent = this; + + function requestPresence() { + agent._requestPresence(channel, function(error) { + callback(error, {ch: channel, seq: seq}); + }); + } + + var existingStream = agent.subscribedPresences[channel]; + if (existingStream) { + agent.presenceSubscriptionSeq[channel] = seq; + return requestPresence(); + } + var presenceChannel = this._getPresenceChannel(channel); this.backend.pubsub.subscribe(presenceChannel, function(error, stream) { if (error) return callback(error); @@ -817,17 +830,15 @@ Agent.prototype._subscribePresence = function(channel, seq, callback) { agent.presenceSubscriptionSeq[channel] = seq; agent.subscribedPresences[channel] = stream; agent._subscribeToPresenceStream(channel, stream); - agent._requestPresence(channel, function(error) { - callback(error, {ch: channel, seq: seq}); - }); + requestPresence(); }); }; Agent.prototype._unsubscribePresence = function(channel, seq, callback) { - if (seq < this.presenceSubscriptionSeq[channel]) return; this.presenceSubscriptionSeq[channel] = seq; var stream = this.subscribedPresences[channel]; if (stream) stream.destroy(); + delete this.subscribedPresences[channel]; callback(null, {ch: channel, seq: seq}); }; diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js index 490942765..6405de8a1 100644 --- a/test/client/presence/presence.js +++ b/test/client/presence/presence.js @@ -441,6 +441,22 @@ describe('Presence', function() { ], done); }); + it('does not leak Streams when subscribing the same presence multiple times', function(done) { + var streamsCount; + async.series([ + presence1.subscribe.bind(presence1, {force: true}), + function(next) { + streamsCount = backend.pubsub.streamsCount; + next(); + }, + presence1.subscribe.bind(presence1, {force: true}), + function(next) { + expect(backend.pubsub.streamsCount).to.equal(streamsCount); + next(); + } + ], done); + }); + it('throws an error when trying to create a presence with a non-string ID', function() { expect(function() { presence1.create(123);