Skip to content

Commit

Permalink
⚡️ Cache latest op version when broadcasting presence
Browse files Browse the repository at this point in the history
At the moment, when sending a presence update to other subscribers, we
[call `transformPresenceToLatestVersion()`][1] for every presence update
which internally [calls `getOps()`][2] for every presence update.

Calls to `getOps()` can be expensive, and rapid presence updates may
cause undue load on the server, even when the `Doc` has not been
updated.

This change tries to mitigate this by subscribing to a pubsub stream for
any `Doc`s that an `Agent` tries to broadcast presence on. We keep an
in-memory cache of the latest snapshot version sent over this stream,
which lets us quickly check if a presence broadcast is already current
without needing to query the database at all.

To avoid leaking streams, the `Agent` will internally handle its stream
subscription state by:

 - subscribing whenever a non-`null` presence update is broadcast
 - unsubscribing whenever a `null` presence update is broadcast

This means that rapid changes in presence being `null` or not can still
result in database calls, but even in this case they should be less bad
than before, because we only perform a snapshot fetch instead of ops.

[1]: https://github.com/share/sharedb/blob/297ce5dc66563a5955311793a475768d73ac8b87/lib/agent.js#L804
[2]: https://github.com/share/sharedb/blob/297ce5dc66563a5955311793a475768d73ac8b87/lib/backend.js#L919
  • Loading branch information
alecgibson committed Aug 27, 2024
1 parent 297ce5d commit 87ef11d
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 23 deletions.
118 changes: 95 additions & 23 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ function Agent(backend, stream) {
// request if the client disconnects ungracefully. This is a
// map of channel -> id -> request
this.presenceRequests = Object.create(null);
// Keep track of the latest known Doc version, so that we can avoid fetching
// ops to transform presence if not needed
this.latestDocVersionStreams = Object.create(null);
this.latestDocVersions = Object.create(null);

// We need to track this manually to make sure we don't reply to messages
// after the stream was closed.
Expand Down Expand Up @@ -108,24 +112,21 @@ Agent.prototype._cleanup = function() {
emitter.destroy();
}
this.subscribedQueries = Object.create(null);

for (var collection in this.latestDocVersionStreams) {
var streams = this.latestDocVersionStreams[collection];
for (var id in streams) streams[id].destroy();
}
this.latestDocVersionStreams = Object.create(null);
};

/**
* Passes operation data received on stream to the agent stream via
* _sendOp()
*/
Agent.prototype._subscribeToStream = function(collection, id, stream) {
if (this.closed) return stream.destroy();

var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null));

// If already subscribed to this document, destroy the previously subscribed stream
var previous = streams[id];
if (previous) previous.destroy();
streams[id] = stream;

var agent = this;
stream.on('data', function(data) {
this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) {
if (data.error) {
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
Expand All @@ -135,13 +136,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
}
agent._onOp(collection, id, data);
});
};

Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) {
if (this.closed) return stream.destroy();

var streams = map[collection] || (map[collection] = Object.create(null));

// If already subscribed to this document, destroy the previously subscribed stream
var previous = streams[id];
if (previous) previous.destroy();
streams[id] = stream;

stream.on('data', dataHandler);
stream.on('end', function() {
// The op stream is done sending, so release its reference
var streams = agent.subscribedDocs[collection];
var streams = map[collection];
if (!streams || streams[id] !== stream) return;
delete streams[id];
if (util.hasKeys(streams)) return;
delete agent.subscribedDocs[collection];
delete map[collection];
});
};

Expand Down Expand Up @@ -794,25 +808,83 @@ Agent.prototype._broadcastPresence = function(presence, callback) {
collection: presence.c
};
var start = Date.now();
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) {

var subscriptionUpdater = presence.p === null ?
this._unsubscribeDocVersion.bind(this) :
this._subscribeDocVersion.bind(this);

subscriptionUpdater(presence.c, presence.d, function(error) {
if (error) return callback(error);
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
var previousRequest = requests[presence.id];
if (!previousRequest || previousRequest.pv < presence.pv) {
presenceRequests[presence.ch][presence.id] = presence;
}
backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) {
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) {
if (error) return callback(error);
var channel = agent._getPresenceChannel(presence.ch);
agent.backend.pubsub.publish([channel], presence, function(error) {
if (error) return callback(error);
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
var previousRequest = requests[presence.id];
if (!previousRequest || previousRequest.pv < presence.pv) {
presenceRequests[presence.ch][presence.id] = presence;
}

var transformer = function(agent, presence, callback) {
callback(null, presence);
};

var latestDocVersion = util.dig(agent.latestDocVersions, presence.c, presence.d);
var presenceIsUpToDate = presence.v === latestDocVersion;
if (!presenceIsUpToDate) {
// null presence can't be transformed, so skip the database call and just
// set the version to the latest known Doc version
if (presence.p === null) {
transformer = function(agent, presence, callback) {
presence.v = latestDocVersion;
callback(null, presence);
};
} else {
transformer = backend.transformPresenceToLatestVersion.bind(backend);
}
}

transformer(agent, presence, function(error, presence) {
if (error) return callback(error);
var channel = agent._getPresenceChannel(presence.ch);
agent.backend.pubsub.publish([channel], presence, function(error) {
if (error) return callback(error);
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
callback(null, presence);
});
});
});
});
};

Agent.prototype._subscribeDocVersion = function(collection, id, callback) {
if (!collection || !id) return callback();

var latestDocVersions = this.latestDocVersions;
var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined;
if (isSubscribed) return callback();

var agent = this;
this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) {
if (error) return callback(error);

util.digOrCreate(latestDocVersions, collection, id, function() {
return snapshot.v;
});

agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) {
// op.v behind snapshot.v by 1
latestDocVersions[collection][id] = op.v + 1;
});

callback();
});
};

Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) {
var stream = util.dig(this.latestDocVersionStreams, collection, id);
if (stream) stream.destroy();
util.nextTick(callback);
};

Agent.prototype._createPresence = function(request) {
return {
a: ACTIONS.presence,
Expand Down
38 changes: 38 additions & 0 deletions test/client/presence/doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var types = require('../../../lib/types');
var presenceTestType = require('./presence-test-type');
var errorHandler = require('../../util').errorHandler;
var PresencePauser = require('./presence-pauser');
var sinon = require('sinon');
types.register(presenceTestType.type);

describe('DocPresence', function() {
Expand Down Expand Up @@ -297,6 +298,43 @@ describe('DocPresence', function() {
], done);
});

it('does not call getOps() when presence is already up-to-date', function(done) {
var localPresence1 = presence1.create('presence-1');

async.series([
doc1.fetch.bind(doc1), // Ensure up-to-date
function(next) {
sinon.spy(Backend.prototype, 'getOps');
next();
},
localPresence1.submit.bind(localPresence1, {index: 1}),
function(next) {
expect(Backend.prototype.getOps).not.to.have.been.called;
next();
}
], done);
});

it('does not call getOps() for old presence when it is null', function(done) {
var localPresence1 = presence1.create('presence-1');

async.series([
doc1.unsubscribe.bind(doc1),
doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}),
function(next) {
expect(doc1.version).to.eql(1);
expect(doc2.version).to.eql(2);

sinon.spy(Backend.prototype, 'getOps');
localPresence1.submit(null, function(error) {
if (error) return next(error);
expect(Backend.prototype.getOps).not.to.have.been.called;
next();
});
}
], done);
});

// This test case attempts to force us into a tight race condition corner case:
// 1. doc1 sends presence, as well as submits an op
// 2. doc2 receives the op first, followed by the presence, which is now out-of-date
Expand Down

0 comments on commit 87ef11d

Please sign in to comment.