Skip to content

Commit

Permalink
♻️ Remove presence.subscribe() force option
Browse files Browse the repository at this point in the history
We previously only needed this to re-request presence as part of the
`DocPresence` catchup flow.

This change adds an internal method to actively request remote presence
for a given channel, and uses that instead of abusing `subscribe()`.
This lets us remove the `options` object from `subscribe()`.
  • Loading branch information
alecgibson committed Oct 12, 2023
1 parent 6fd6bec commit 5831da6
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 38 deletions.
18 changes: 10 additions & 8 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._subscribePresence(request.ch, request.seq, callback);
case ACTIONS.presenceUnsubscribe:
return this._unsubscribePresence(request.ch, request.seq, callback);
case ACTIONS.presenceRequest:
return this._requestPresence(request.ch, callback);
case ACTIONS.pingPong:
return this._pingPong(callback);
default:
Expand Down Expand Up @@ -807,32 +809,32 @@ Agent.prototype._createPresence = function(request) {
};
};

Agent.prototype._subscribePresence = function(channel, seq, callback) {
Agent.prototype._subscribePresence = function(channel, seq, cb) {
var agent = this;

function requestPresence() {
agent._requestPresence(channel, function(error) {
callback(error, {ch: channel, seq: seq});
});
function callback(error) {
cb(error, {ch: channel, seq: seq});
}

var existingStream = agent.subscribedPresences[channel];
if (existingStream) {
agent.presenceSubscriptionSeq[channel] = seq;
return requestPresence();
return callback();
}

var presenceChannel = this._getPresenceChannel(channel);
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
if (error) return callback(error);
if (seq < agent.presenceSubscriptionSeq[channel]) {
stream.destroy();
return callback(null, {ch: channel, seq: seq});
return callback();
}
agent.presenceSubscriptionSeq[channel] = seq;
agent.subscribedPresences[channel] = stream;
agent._subscribeToPresenceStream(channel, stream);
requestPresence();
agent._requestPresence(channel, function(error) {
callback(error);
});
});
};

Expand Down
4 changes: 4 additions & 0 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,10 @@ Connection.prototype._addPresence = function(presence) {
});
};

Connection.prototype._requestRemotePresence = function(channel) {
this.send({a: ACTIONS.presenceRequest, ch: channel});
};

Connection.prototype._handlePresenceSubscribe = function(error, message) {
var presence = util.dig(this._presences, message.ch);
if (presence) presence._handleSubscribe(error, message.seq);
Expand Down
21 changes: 10 additions & 11 deletions lib/client/presence/presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ function Presence(connection, channel) {
}
emitter.mixin(Presence);

Presence.prototype.subscribe = function(options, callback) {
this._sendSubscriptionAction(true, options, callback);
Presence.prototype.subscribe = function(callback) {
this._sendSubscriptionAction(true, callback);
};

Presence.prototype.unsubscribe = function(options, callback) {
this._sendSubscriptionAction(false, options, callback);
Presence.prototype.unsubscribe = function(callback) {
this._sendSubscriptionAction(false, callback);
};

Presence.prototype.create = function(id) {
Expand Down Expand Up @@ -83,14 +83,9 @@ Presence.prototype.destroy = function(callback) {
});
};

Presence.prototype._sendSubscriptionAction = function(wantSubscribe, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) {
wantSubscribe = !!wantSubscribe;
if (!options._force && wantSubscribe === this.wantSubscribe) {
if (wantSubscribe === this.wantSubscribe) {
if (!callback) return;
if (wantSubscribe === this.subscribed) return util.nextTick(callback);
if (Object.keys(this._subscriptionCallbacksBySeq).length) {
Expand All @@ -106,6 +101,10 @@ Presence.prototype._sendSubscriptionAction = function(wantSubscribe, options, ca
}
};

Presence.prototype._requestRemotePresence = function() {
this.connection._requestRemotePresence(this.channel);
};

Presence.prototype._handleSubscribe = function(error, seq) {
if (this.wantSubscribe) this.subscribed = true;
var callback = this._subscriptionCallback(seq);
Expand Down
4 changes: 1 addition & 3 deletions lib/client/presence/remote-doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ RemoteDocPresence.prototype._catchUpStalePresence = function() {
if (!this._opCache) {
this._startCachingOps();
this._doc.fetch();
// We're already subscribed, but we send another subscribe message
// to force presence updates from other clients
this.presence.subscribe({_force: true});
this.presence._requestRemotePresence();
return false;
}

Expand Down
14 changes: 4 additions & 10 deletions test/client/presence/doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,20 +645,14 @@ describe('DocPresence', function() {
presence1.subscribe.bind(presence1),
presence2.subscribe.bind(presence2),
function(next) {
connection1._setState('disconnected');
next();
},
function(next) {
localPresence2.submit({index: 0}, errorHandler(done));
// We've not _actually_ disconnected the connection, so this
// event will still fire.
presence1.once('receive', function() {
connection1.once('closed', function() {
next();
});
connection1.close();
},
localPresence2.submit.bind(localPresence2, {index: 0}),
function(next) {
connection1._setState('connecting');
connection1._setState('connected');
backend.connect(connection1);
presence1.once('receive', function(id, presence) {
expect(presence).to.eql({index: 0});
next();
Expand Down
15 changes: 9 additions & 6 deletions test/client/presence/presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,11 @@ describe('Presence', function() {
presence1.subscribe.bind(presence1),
presence2.subscribe.bind(presence2),
function(next) {
connection2._setState('disconnected');
connection2.once('closed', function() {
next();
});
connection2.close();
expect(connection2.canSend).to.be.false;
next();
},
localPresence1.submit.bind(localPresence1, {index: 1}),
function(next) {
Expand All @@ -355,8 +357,7 @@ describe('Presence', function() {
expect(presence).to.eql({index: 1});
next();
});
connection2._setState('connecting');
connection2._setState('connected');
backend.connect(connection2);
}
], done);
});
Expand Down Expand Up @@ -444,12 +445,14 @@ describe('Presence', function() {
it('does not leak Streams when subscribing the same presence multiple times', function(done) {
var streamsCount;
async.series([
presence1.subscribe.bind(presence1, {force: true}),
presence1.subscribe.bind(presence1),
function(next) {
streamsCount = backend.pubsub.streamsCount;
// Trick it into sending a duplicate request
presence1.wantSubscribe = false;
next();
},
presence1.subscribe.bind(presence1, {force: true}),
presence1.subscribe.bind(presence1),
function(next) {
expect(backend.pubsub.streamsCount).to.equal(streamsCount);
next();
Expand Down

0 comments on commit 5831da6

Please sign in to comment.