Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/op metadata support #586

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
10 changes: 10 additions & 0 deletions docs/middleware/op-submission.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ backend.use('submit', (context, next) => {
if (!userCanChangeDoc(userId, id)) {
return next(new Error('Unauthorized'))
}

// add custom metadata to the op
Object.assign(context.op.m, context.agent.custom);
// Explicitly specify which metadata fields to be included when storing the op
context.opMetadataProjection = { userId: true };

next()
})
```
Expand All @@ -61,6 +67,10 @@ backend.use('apply', (context, next) => {
if (userId !== ownerId) {
return next(new Error('Unauthorized'))
}

// Add op metadata to snapshot before snapshot is stored
Object.assign(context.snapshot.m, context.op.m);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay have discussed with @ericyhwang and we think:

  • we should go back to having a feature flag (sorry about the U-turn!)
  • if the feature flag is enabled, we send all metadata across pubsub
  • if the feature flag is enabled, we also apply the metadata projection in the op middleware, which should handle the cases I asked for on your tests (reiterated here for clarity)

Cases we need to handle:

  • Op submission (the case you're already testing): ops broadcast over pubsub to other subscribers
  • Op fetching: have a stale document that calls doc.fetch() — this will fetch ops through backend._getSanitizedOps()
  • Ops sent back when submitting: have a stale document that calls doc.submitOp() — this retrieves ops through another mechanism in SubmitRequest.submit()


next()
})
```
Expand Down
1 change: 1 addition & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ Agent.prototype._sendOp = function(collection, id, op) {
if ('op' in op) message.op = op.op;
if (op.create) message.create = op.create;
if (op.del) message.del = true;
if (op.m) message.m = op.m;

this.send(message);
};
Expand Down
12 changes: 5 additions & 7 deletions lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,7 @@ Doc.prototype._otApply = function(op, source) {
);
}

// NB: If we need to add another argument to this event, we should consider
// the fact that the 'op' event has op.src as its 3rd argument
this.emit('before op batch', op.op, source);
this.emit('before op batch', op.op, source, op.src, {op: op});
maxfortun marked this conversation as resolved.
Show resolved Hide resolved

// Iteratively apply multi-component remote operations and rollback ops
// (source === false) for the default JSON0 OT type. It could use
Expand Down Expand Up @@ -637,24 +635,24 @@ Doc.prototype._otApply = function(op, source) {
this._setData(this.type.apply(this.data, componentOp.op));
this.emit('op', componentOp.op, source, op.src);
}
this.emit('op batch', op.op, source);
this.emit('op batch', op.op, source, op.src, {op: op});
// Pop whatever was submitted since we started applying this op
this._popApplyStack(stackLength);
return;
}

// The 'before op' event enables clients to pull any necessary data out of
// the snapshot before it gets changed
this.emit('before op', op.op, source, op.src);
this.emit('before op', op.op, source, op.src, {op: op});
// Apply the operation to the local data, mutating it in place
this._setData(this.type.apply(this.data, op.op));
// Emit an 'op' event once the local data includes the changes from the
// op. For locally submitted ops, this will be synchronously with
// submission and before the server or other clients have received the op.
// For ops from other clients, this will be after the op has been
// committed to the database and published
this.emit('op', op.op, source, op.src);
this.emit('op batch', op.op, source);
this.emit('op', op.op, source, op.src, {op: op});
this.emit('op batch', op.op, source, op.src, {op: op});
return;
}

Expand Down
36 changes: 35 additions & 1 deletion lib/submit-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ SubmitRequest.prototype.commit = function(callback) {
var op = request.op;
op.c = request.collection;
op.d = request.id;
op.m = undefined;
op.m = request._metadataProjection();
// Needed for agent to detect if it can ignore sending the op back to
// the client that submitted it in subscriptions
if (request.collection !== request.index) op.i = request.index;
Expand All @@ -185,6 +185,38 @@ SubmitRequest.prototype.commit = function(callback) {
});
};

SubmitRequest.prototype._metadataProjection = function() {
var request = this;

// Default behavior
if (!request.opMetadataProjection) {
return undefined;
}

// Granular projection
if (typeof request.opMetadataProjection === 'object') {
return request._granularMetadataProjection();
}

// Full projection
return request.op.m;
};

// Specify top level fields to beincluded in a granular projection
SubmitRequest.prototype._granularMetadataProjection = function() {
var request = this;
var metadataProjection = {};
for (var key in request.opMetadataProjection) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should at least add a code comment here that we only support a single level of projection.

Would be extra nice if we updated the docs to include this flag and notes on its use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment, and updated docs with an example.

var doProject = request.opMetadataProjection[key];
if (doProject ) {
metadataProjection[key] = request.op.m[key];
}
}

return metadataProjection;
};


SubmitRequest.prototype.retry = function(callback) {
this.retries++;
if (this.maxRetries != null && this.retries > this.maxRetries) {
Expand Down Expand Up @@ -233,6 +265,8 @@ SubmitRequest.prototype._addSnapshotMeta = function() {
meta.ctime = this.start;
} else if (this.op.del) {
this.op.m.data = this.snapshot.data;
// break circular dependency if snapshot data already contains metadata
delete this.op.m.data._m;
maxfortun marked this conversation as resolved.
Show resolved Hide resolved
}
meta.mtime = this.start;
};
Expand Down
186 changes: 186 additions & 0 deletions test/client/submit.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ var numberType = require('./number-type');
types.register(deserializedType.type);
types.register(deserializedType.type2);
types.register(numberType.type);
var util = require('../util');
var errorHandler = util.errorHandler;

module.exports = function() {
describe('client submit', function() {
Expand Down Expand Up @@ -1210,5 +1212,189 @@ module.exports = function() {
});
});
});

describe('metadata projection', function() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've flexed metadata on ops that were broadcast over pubsub, but ops can also be fetched when submitting an op from a stale document (that isn't subscribed, and is behind the server); can we test that the ops we get in that case also have their metadata correctly set?

This would need us to tweak SubmitRequest.submit().

Also, can we add a test case for an unsubscribed client calling fetch after another remote client has changed the doc: this is where you'll need the backend._getSanitizedOps() to handle metadata projection for us.

it('passed metadata to connect', function(done) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this test has anything to do with this PR?

var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

var connection = this.backend.connect(undefined, metadata);
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
done();
});
});

it('passed metadata to submit', function(done) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: I'm not sure this test has anything to do with this PR?

var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request) {
expect(request.agent.custom).eql(metadata);
done();
});

var connection = this.backend.connect(undefined, metadata);
var doc = null;
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'}, function() {
doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(done));
});
});
});

it('received local op without metadata', function(done) {
var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request, next) {
expect(request.agent.custom).eql(metadata);
Object.assign(request.op.m, request.agent.custom);
request.opMetadataProjection = {username: true};
next();
});

var connection = this.backend.connect(undefined, metadata);
var doc = null;
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'}, function() {
doc.on('op', function(op, source, src, context) {
if (src) {
return;
}
expect(context.op.m).equal(undefined);
done();
});
doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(function() {}));
});
});
});

it('concurrent changes', function(done) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please simplify this test case to just have two clients? I'm not sure there's a vast amount of benefit in having so many clients; it just makes the test harder to read. Would be nice to get rid of the for loops.

this.backend.use('connect', function(request, next) {
expect(request.req).to.have.property('username');
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request, next) {
expect(request.agent.custom).to.have.property('username');
Object.assign(request.op.m, request.agent.custom);
request.opMetadataProjection = {username: true};
next();
});

this.backend.use('apply', function(request, next) {
Object.assign(request.snapshot.m, request.op.m);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unneeded?

expect(request.op.m).to.have.property('username');
next();
});

this.backend.use('commit', function(request, next) {
expect(request.op.m).to.have.property('username');
next();
});

this.backend.use('afterWrite', function(request, next) {
expect(request.op.m).to.have.property('username');
next();
});

var subscriberCount = 10;
var subscriberOpCount = 10;

var metadatas = [];
for (var i = 0; i < subscriberCount; i++) {
metadatas[i] = {username: 'user-'+i};
}

var ops = [];
for (var i = 0; i < subscriberCount; i++) {
ops[i] = [];
for (var j = 0; j < subscriberOpCount; j++) {
ops[i].push({p: ['tricks '+i+' '+j], oi: 1});
}
}

var docs = [];

function submitOps() {
for (var j = 0; j < subscriberOpCount; j++) {
for (var i = 0; i < subscriberCount; i++) {
var doc = docs[i];
doc.submitOp([ops[i][j]], {source: 'src-'+i}, errorHandler(doneAfter));
}
}
}

function validateAndDone() {
var firstDoc = docs[0];
// validate that all documents across connections are in sync
for (var i = 1; i < subscriberCount; i++) {
var doc = docs[i];
expect(doc.data).eql(firstDoc.data);
}
done();
};

var submitOpsAfter = util.callAfter(subscriberCount - 1, submitOps);
var doneAfter = util.callAfter((subscriberCount * subscriberCount * subscriberOpCount) - 1, validateAndDone);

function getDoc(callback) {
var thisDoc = this;
thisDoc.fetch(function() {
if (!thisDoc.data) {
return thisDoc.create({}, function() {
thisDoc.subscribe(callback);
});
}
thisDoc.subscribe(callback);
});
}

for (var i = 0; i < subscriberCount; i++) {
var metadata = metadatas[i];

var connection = this.backend.connect(undefined, Object.assign({}, metadata));
connection.__test_metadata = Object.assign({}, metadata);
connection.__test_id = i;

connection.on('connected', function() {
var thisConnection = this;

expect(thisConnection.agent.custom).eql(thisConnection.__test_metadata);

thisConnection.doc = docs[thisConnection.__test_id] = thisConnection.get('dogs', 'fido');

thisConnection.doc.on('op', function(op, source, src, context) {
if (!src || !context) { // If I am the source there is no metadata to check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we send metadata back to the original client? The server might add context the sender wants (and which remote clients will get).

What's your use-case? Does it work without the original op submitter getting their metadata?

We could add the meta to the op acknowledgement.

return doneAfter();
}
var id = op[0].p[0].split(' ')[1];
expect(context.op.m).eql(metadatas[id]);
doneAfter();
});

getDoc.bind(thisConnection.doc)(submitOpsAfter);
});
}
});
});
});
};