Skip to content

Commit

Permalink
⚡️ Fix Presence Stream leak
Browse files Browse the repository at this point in the history
At the moment, when subscribing a `Presence` multiple times, multiple
PubSub `Stream`s are created per-subscription.

However, unsubscribing will only destroy the [last created][1],
resulting in streams that are never destroyed.

This change updates our subscription logic to check if we already have
an existing stream. If we already have a stream, then we just
re-request presence from other clients (which is needed by our
[`DocPresence` logic][2]), and return.

Note that we also tweak our unsubscribe logic to eagerly remove the
stream after calling destroy (so that subsequent subscribes don't try to
reuse a stream that is currently being destroyed).

We also remove a check against `seq`, which is currently not covered by
tests (and actually never calls the `callback`), so is clearly broken
and unneeded anyway.

[1]: https://github.com/share/sharedb/blob/62e4ec5d46c0dcb097931e9dae60240aaba6b2b3/lib/agent.js#L829-L830
[2]: https://github.com/share/sharedb/blob/62e4ec5d46c0dcb097931e9dae60240aaba6b2b3/lib/client/presence/remote-doc-presence.js#L108-L110
  • Loading branch information
alecgibson committed Oct 10, 2023
1 parent 8b0e5b8 commit 0270d21
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
19 changes: 15 additions & 4 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,19 @@ Agent.prototype._createPresence = function(request) {

Agent.prototype._subscribePresence = function(channel, seq, callback) {
var agent = this;

function requestPresence() {
agent._requestPresence(channel, function(error) {
callback(error, {ch: channel, seq: seq});
});
}

var existingStream = agent.subscribedPresences[channel];
if (existingStream) {
agent.presenceSubscriptionSeq[channel] = seq;
return requestPresence();
}

var presenceChannel = this._getPresenceChannel(channel);
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
if (error) return callback(error);
Expand All @@ -819,17 +832,15 @@ Agent.prototype._subscribePresence = function(channel, seq, callback) {
agent.presenceSubscriptionSeq[channel] = seq;
agent.subscribedPresences[channel] = stream;
agent._subscribeToPresenceStream(channel, stream);
agent._requestPresence(channel, function(error) {
callback(error, {ch: channel, seq: seq});
});
requestPresence();
});
};

Agent.prototype._unsubscribePresence = function(channel, seq, callback) {
if (seq < this.presenceSubscriptionSeq[channel]) return;
this.presenceSubscriptionSeq[channel] = seq;
var stream = this.subscribedPresences[channel];
if (stream) stream.destroy();
delete this.subscribedPresences[channel];
callback(null, {ch: channel, seq: seq});
};

Expand Down
16 changes: 16 additions & 0 deletions test/client/presence/presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,22 @@ describe('Presence', function() {
], done);
});

it('does not leak Streams when subscribing the same presence multiple times', function(done) {
var streamsCount;
async.series([
presence1.subscribe.bind(presence1, {force: true}),
function(next) {
streamsCount = backend.pubsub.streamsCount;
next();
},
presence1.subscribe.bind(presence1, {force: true}),
function(next) {
expect(backend.pubsub.streamsCount).to.equal(streamsCount);
next();
}
], done);
});

it('throws an error when trying to create a presence with a non-string ID', function() {
expect(function() {
presence1.create(123);
Expand Down

0 comments on commit 0270d21

Please sign in to comment.