From 5c9994dc715df64e2231ef304d1763ae8f42f365 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:16:33 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Cache=20latest=20op=20vers?= =?UTF-8?q?ion=20when=20broadcasting=20presence?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At the moment, when sending a presence update to other subscribers, we [call `transformPresenceToLatestVersion()`][1] for every presence update which internally [calls `getOps()`][2] for every presence update. Calls to `getOps()` can be expensive, and rapid presence updates may cause undue load on the server, even when the `Doc` has not been updated. This change tries to mitigate this by subscribing to a pubsub stream for any `Doc`s that an `Agent` tries to broadcast presence on. We keep an in-memory cache of the latest snapshot version sent over this stream, which lets us quickly check if a presence broadcast is already current without needing to query the database at all. To avoid leaking streams, the `Agent` will internally handle its stream subscription state by: - subscribing whenever a non-`null` presence update is broadcast - unsubscribing whenever a `null` presence update is broadcast This means that rapid changes in presence being `null` or not can still result in database calls, but even in this case they should be less bad than before, because we only perform a snapshot fetch instead of ops. [1]: https://github.com/share/sharedb/blob/297ce5dc66563a5955311793a475768d73ac8b87/lib/agent.js#L804 [2]: https://github.com/share/sharedb/blob/297ce5dc66563a5955311793a475768d73ac8b87/lib/backend.js#L919 --- lib/agent.js | 112 +++++++++++++++++++++------ test/client/presence/doc-presence.js | 38 +++++++++ 2 files changed, 127 insertions(+), 23 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index f55f1f9f4..82a603067 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -48,6 +48,10 @@ function Agent(backend, stream) { // request if the client disconnects ungracefully. This is a // map of channel -> id -> request this.presenceRequests = Object.create(null); + // Keep track of the latest known Doc version, so that we can avoid fetching + // ops to transform presence if not needed + this.latestDocVersionStreams = Object.create(null); + this.latestDocVersions = Object.create(null); // We need to track this manually to make sure we don't reply to messages // after the stream was closed. @@ -115,17 +119,8 @@ Agent.prototype._cleanup = function() { * _sendOp() */ Agent.prototype._subscribeToStream = function(collection, id, stream) { - if (this.closed) return stream.destroy(); - - var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null)); - - // If already subscribed to this document, destroy the previously subscribed stream - var previous = streams[id]; - if (previous) previous.destroy(); - streams[id] = stream; - var agent = this; - stream.on('data', function(data) { + this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) { if (data.error) { // Log then silently ignore errors in a subscription stream, since these // may not be the client's fault, and they were not the result of a @@ -135,13 +130,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { } agent._onOp(collection, id, data); }); +}; + +Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) { + if (this.closed) return stream.destroy(); + + var streams = map[collection] || (map[collection] = Object.create(null)); + + // If already subscribed to this document, destroy the previously subscribed stream + var previous = streams[id]; + if (previous) previous.destroy(); + streams[id] = stream; + + stream.on('data', dataHandler); stream.on('end', function() { // The op stream is done sending, so release its reference - var streams = agent.subscribedDocs[collection]; + var streams = map[collection]; if (!streams || streams[id] !== stream) return; delete streams[id]; if (util.hasKeys(streams)) return; - delete agent.subscribedDocs[collection]; + delete map[collection]; }); }; @@ -794,25 +802,83 @@ Agent.prototype._broadcastPresence = function(presence, callback) { collection: presence.c }; var start = Date.now(); - backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) { + + var subscriptionUpdater = presence.p === null ? + this._unsubscribeDocVersion.bind(this) : + this._subscribeDocVersion.bind(this); + + subscriptionUpdater(presence.c, presence.d, function(error) { if (error) return callback(error); - var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null)); - var previousRequest = requests[presence.id]; - if (!previousRequest || previousRequest.pv < presence.pv) { - presenceRequests[presence.ch][presence.id] = presence; - } - backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) { + backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) { if (error) return callback(error); - var channel = agent._getPresenceChannel(presence.ch); - agent.backend.pubsub.publish([channel], presence, function(error) { - if (error) return callback(error); - backend.emit('timing', 'presence.broadcast', Date.now() - start, context); + var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null)); + var previousRequest = requests[presence.id]; + if (!previousRequest || previousRequest.pv < presence.pv) { + presenceRequests[presence.ch][presence.id] = presence; + } + + var transformer = function(agent, presence, callback) { callback(null, presence); + }; + + var latestDocVersion = util.dig(agent.latestDocVersions, presence.c, presence.d); + var presenceIsUpToDate = presence.v === latestDocVersion; + if (!presenceIsUpToDate) { + // null presence can't be transformed, so skip the database call and just + // set the version to the latest known Doc version + if (presence.p === null) { + transformer = function(agent, presence, callback) { + presence.v = latestDocVersion; + callback(null, presence); + }; + } else { + transformer = backend.transformPresenceToLatestVersion.bind(backend); + } + } + + transformer(agent, presence, function(error, presence) { + if (error) return callback(error); + var channel = agent._getPresenceChannel(presence.ch); + agent.backend.pubsub.publish([channel], presence, function(error) { + if (error) return callback(error); + backend.emit('timing', 'presence.broadcast', Date.now() - start, context); + callback(null, presence); + }); }); }); }); }; +Agent.prototype._subscribeDocVersion = function(collection, id, callback) { + if (!collection || !id) return callback(); + + var latestDocVersions = this.latestDocVersions; + var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined; + if (isSubscribed) return callback(); + + var agent = this; + this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) { + if (error) return callback(error); + + util.digOrCreate(latestDocVersions, collection, id, function() { + return snapshot.v; + }); + + agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) { + // op.v behind snapshot.v by 1 + latestDocVersions[collection][id] = op.v + 1; + }); + + callback(); + }); +}; + +Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) { + var stream = util.dig(this.latestDocVersionStreams, collection, id); + if (stream) stream.destroy(); + util.nextTick(callback); +}; + Agent.prototype._createPresence = function(request) { return { a: ACTIONS.presence, diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js index 24425aa8a..b30865194 100644 --- a/test/client/presence/doc-presence.js +++ b/test/client/presence/doc-presence.js @@ -5,6 +5,7 @@ var types = require('../../../lib/types'); var presenceTestType = require('./presence-test-type'); var errorHandler = require('../../util').errorHandler; var PresencePauser = require('./presence-pauser'); +var sinon = require('sinon'); types.register(presenceTestType.type); describe('DocPresence', function() { @@ -297,6 +298,43 @@ describe('DocPresence', function() { ], done); }); + it('does not call getOps() when presence is already up-to-date', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + doc1.fetch.bind(doc1), // Ensure up-to-date + function(next) { + sinon.spy(Backend.prototype, 'getOps'); + next(); + }, + localPresence1.submit.bind(localPresence1, {index: 1}), + function(next) { + expect(Backend.prototype.getOps).not.to.have.been.called; + next(); + } + ], done); + }); + + it('does not call getOps() for old presence when it is null', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + sinon.spy(Backend.prototype, 'getOps'); + localPresence1.submit(null, function(error) { + if (error) return next(error); + expect(Backend.prototype.getOps).not.to.have.been.called; + next(); + }); + } + ], done); + }); + // This test case attempts to force us into a tight race condition corner case: // 1. doc1 sends presence, as well as submits an op // 2. doc2 receives the op first, followed by the presence, which is now out-of-date