From 5831da64af50d13d78d24bc2b69caeca511a5be5 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:40:20 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Remove=20`presence.subscri?= =?UTF-8?q?be()`=20`force`=20option?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We previously only needed this to re-request presence as part of the `DocPresence` catchup flow. This change adds an internal method to actively request remote presence for a given channel, and uses that instead of abusing `subscribe()`. This lets us remove the `options` object from `subscribe()`. --- lib/agent.js | 18 ++++++++++-------- lib/client/connection.js | 4 ++++ lib/client/presence/presence.js | 21 ++++++++++----------- lib/client/presence/remote-doc-presence.js | 4 +--- test/client/presence/doc-presence.js | 14 ++++---------- test/client/presence/presence.js | 15 +++++++++------ 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 1ebf1799a..5c74dab2f 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -453,6 +453,8 @@ Agent.prototype._handleMessage = function(request, callback) { return this._subscribePresence(request.ch, request.seq, callback); case ACTIONS.presenceUnsubscribe: return this._unsubscribePresence(request.ch, request.seq, callback); + case ACTIONS.presenceRequest: + return this._requestPresence(request.ch, callback); case ACTIONS.pingPong: return this._pingPong(callback); default: @@ -807,19 +809,17 @@ Agent.prototype._createPresence = function(request) { }; }; -Agent.prototype._subscribePresence = function(channel, seq, callback) { +Agent.prototype._subscribePresence = function(channel, seq, cb) { var agent = this; - function requestPresence() { - agent._requestPresence(channel, function(error) { - callback(error, {ch: channel, seq: seq}); - }); + function callback(error) { + cb(error, {ch: channel, seq: seq}); } var existingStream = agent.subscribedPresences[channel]; if (existingStream) { agent.presenceSubscriptionSeq[channel] = seq; - return requestPresence(); + return callback(); } var presenceChannel = this._getPresenceChannel(channel); @@ -827,12 +827,14 @@ Agent.prototype._subscribePresence = function(channel, seq, callback) { if (error) return callback(error); if (seq < agent.presenceSubscriptionSeq[channel]) { stream.destroy(); - return callback(null, {ch: channel, seq: seq}); + return callback(); } agent.presenceSubscriptionSeq[channel] = seq; agent.subscribedPresences[channel] = stream; agent._subscribeToPresenceStream(channel, stream); - requestPresence(); + agent._requestPresence(channel, function(error) { + callback(error); + }); }); }; diff --git a/lib/client/connection.js b/lib/client/connection.js index 0c3ad28f1..896bd03e2 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -809,6 +809,10 @@ Connection.prototype._addPresence = function(presence) { }); }; +Connection.prototype._requestRemotePresence = function(channel) { + this.send({a: ACTIONS.presenceRequest, ch: channel}); +}; + Connection.prototype._handlePresenceSubscribe = function(error, message) { var presence = util.dig(this._presences, message.ch); if (presence) presence._handleSubscribe(error, message.seq); diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js index 3f0bd92dc..03d16b92e 100644 --- a/lib/client/presence/presence.js +++ b/lib/client/presence/presence.js @@ -28,12 +28,12 @@ function Presence(connection, channel) { } emitter.mixin(Presence); -Presence.prototype.subscribe = function(options, callback) { - this._sendSubscriptionAction(true, options, callback); +Presence.prototype.subscribe = function(callback) { + this._sendSubscriptionAction(true, callback); }; -Presence.prototype.unsubscribe = function(options, callback) { - this._sendSubscriptionAction(false, options, callback); +Presence.prototype.unsubscribe = function(callback) { + this._sendSubscriptionAction(false, callback); }; Presence.prototype.create = function(id) { @@ -83,14 +83,9 @@ Presence.prototype.destroy = function(callback) { }); }; -Presence.prototype._sendSubscriptionAction = function(wantSubscribe, options, callback) { - if (typeof options === 'function') { - callback = options; - options = {}; - } - options = options || {}; +Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) { wantSubscribe = !!wantSubscribe; - if (!options._force && wantSubscribe === this.wantSubscribe) { + if (wantSubscribe === this.wantSubscribe) { if (!callback) return; if (wantSubscribe === this.subscribed) return util.nextTick(callback); if (Object.keys(this._subscriptionCallbacksBySeq).length) { @@ -106,6 +101,10 @@ Presence.prototype._sendSubscriptionAction = function(wantSubscribe, options, ca } }; +Presence.prototype._requestRemotePresence = function() { + this.connection._requestRemotePresence(this.channel); +}; + Presence.prototype._handleSubscribe = function(error, seq) { if (this.wantSubscribe) this.subscribed = true; var callback = this._subscriptionCallback(seq); diff --git a/lib/client/presence/remote-doc-presence.js b/lib/client/presence/remote-doc-presence.js index ae08062a4..0f3b7f74c 100644 --- a/lib/client/presence/remote-doc-presence.js +++ b/lib/client/presence/remote-doc-presence.js @@ -105,9 +105,7 @@ RemoteDocPresence.prototype._catchUpStalePresence = function() { if (!this._opCache) { this._startCachingOps(); this._doc.fetch(); - // We're already subscribed, but we send another subscribe message - // to force presence updates from other clients - this.presence.subscribe({_force: true}); + this.presence._requestRemotePresence(); return false; } diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js index 1011ac74b..24425aa8a 100644 --- a/test/client/presence/doc-presence.js +++ b/test/client/presence/doc-presence.js @@ -645,20 +645,14 @@ describe('DocPresence', function() { presence1.subscribe.bind(presence1), presence2.subscribe.bind(presence2), function(next) { - connection1._setState('disconnected'); - next(); - }, - function(next) { - localPresence2.submit({index: 0}, errorHandler(done)); - // We've not _actually_ disconnected the connection, so this - // event will still fire. - presence1.once('receive', function() { + connection1.once('closed', function() { next(); }); + connection1.close(); }, + localPresence2.submit.bind(localPresence2, {index: 0}), function(next) { - connection1._setState('connecting'); - connection1._setState('connected'); + backend.connect(connection1); presence1.once('receive', function(id, presence) { expect(presence).to.eql({index: 0}); next(); diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js index 6405de8a1..40d4f4847 100644 --- a/test/client/presence/presence.js +++ b/test/client/presence/presence.js @@ -344,9 +344,11 @@ describe('Presence', function() { presence1.subscribe.bind(presence1), presence2.subscribe.bind(presence2), function(next) { - connection2._setState('disconnected'); + connection2.once('closed', function() { + next(); + }); + connection2.close(); expect(connection2.canSend).to.be.false; - next(); }, localPresence1.submit.bind(localPresence1, {index: 1}), function(next) { @@ -355,8 +357,7 @@ describe('Presence', function() { expect(presence).to.eql({index: 1}); next(); }); - connection2._setState('connecting'); - connection2._setState('connected'); + backend.connect(connection2); } ], done); }); @@ -444,12 +445,14 @@ describe('Presence', function() { it('does not leak Streams when subscribing the same presence multiple times', function(done) { var streamsCount; async.series([ - presence1.subscribe.bind(presence1, {force: true}), + presence1.subscribe.bind(presence1), function(next) { streamsCount = backend.pubsub.streamsCount; + // Trick it into sending a duplicate request + presence1.wantSubscribe = false; next(); }, - presence1.subscribe.bind(presence1, {force: true}), + presence1.subscribe.bind(presence1), function(next) { expect(backend.pubsub.streamsCount).to.equal(streamsCount); next();