diff --git a/mongo.js b/mongo.js index aa68c0e1..6d898efc 100644 --- a/mongo.js +++ b/mongo.js @@ -56,6 +56,7 @@ function LiveDbMongo(mongo, options) { if (!options) options = {}; this.mongoPoll = options.mongoPoll || null; + this.ttl = options.ttl || null; // The getVersion() and getOps() methods depend on a collectionname_ops // collection, and that collection should have an index on the operations @@ -166,6 +167,12 @@ LiveDbMongo.prototype._opCollection = function(cName) { collection.ensureIndex({name: 1, v: 1}, true, function(error, name) { if (error) console.warn('Warning: Could not create index for op collection:', error.stack || error); }); + + if (this.ttl) { + collection.ensureIndex({ 'm.d': 1 }, { expireAfterSeconds: this.ttl, background: true }, function(error) { + if (error) console.warn('Could not create ttl index for op collection: ', error.stack || error); + }); + } this.opIndexes[cName] = true; } @@ -182,6 +189,10 @@ LiveDbMongo.prototype.writeOp = function(cName, docName, opData, callback) { var data = shallowClone(opData); data._id = docName + ' v' + opData.v, data.name = docName; + if (this.ttl) { + if (!data.m) data.m = {}; + data.m.d = new Date(); + } this._opCollection(cName).save(data, callback); }; @@ -204,21 +215,75 @@ LiveDbMongo.prototype.getVersion = function(cName, docName, callback) { }); }; +// Exported for testing +LiveDbMongo.prototype._readOps = function(cName, docName, start, end, callback) { + var query = end == null ? {$gte:start} : {$gte:start, $lt:end}; + this._opCollection(cName).find({name:docName, v:query}, {sort:{v:1}}).toArray(callback); +}; + LiveDbMongo.prototype.getOps = function(cName, docName, start, end, callback) { var err; if (err = this._check(cName)) return callback(err); - var query = end == null ? {$gte:start} : {$gte:start, $lt:end}; - this._opCollection(cName).find({name:docName, v:query}, {sort:{v:1}}).toArray(function(err, data) { - if (err) return callback(err); - + var retried = false; + var self = this; + + function retry() { + // First, attempt to get the requested ops from the op collection + self._readOps(cName, docName, start, end, function(err, data) { + if (err) return callback(err); + + // If we didn't get any ops, and we don't know the end of the range requested, + // then we need to get the document version to check if ops were expected. + if (data.length === 0 && end == null) { + // If we already retried (see below for why), + // then ops are missing from the requested range. + if (retried) + return callback('Missing operations'); + + // Get the current snapshot version + self.getVersion(cName, docName, function(err, v) { + if (err) return callback(err); + + // Race condition! If the version returned is greater than the start + // of the requested range (and we got no ops above), an op may have + // been submitted between when we got ops and when we got the version. + // Retry to get the missing ops. + if (v > start) { + retried = true; + retry(); + } else { + done(data); + } + }); + } else { + done(data); + } + }); + } + + function done(data) { + // If we know the end of the requested op range, check that we got the right number. + if (end != null) { + var expectedLength = start >= end ? 0 : end - start; + if (data.length !== expectedLength) + return callback('Missing operations'); + } + + var v = start; for (var i = 0; i < data.length; i++) { + // Check that we got ops in the right order with none missing. + if (data[i].v !== v++) + return callback('Missing operations'); + // Strip out _id in the results delete data[i]._id; delete data[i].name; } + callback(null, data); - }); + } + retry(); }; diff --git a/test.coffee b/test.coffee index f8d4a8c2..c7a8da62 100644 --- a/test.coffee +++ b/test.coffee @@ -44,6 +44,21 @@ describe 'mongo', -> throw Error "Could not find index in ops db - #{JSON.stringify(indexes)}" , 400 + + it 'adds a ttl index for ops if given option', (done) -> clear => + db = liveDbMongo 'mongodb://localhost:27017/test?auto_reconnect', safe: false, ttl: 10 + db.writeOp 'testcollection', 'foo', {v:0, create:{type:'json0'}}, (err) => + setTimeout => + @mongo.collection('testcollection_ops').indexInformation (err, indexes) -> + throw err if err + + # We should find an index with [ 'm.d', 1 ] + for name, idx of indexes + if JSON.stringify(idx) is '[["m.d",1]]' + return done() + + throw Error "Could not find index in ops db - #{JSON.stringify(indexes)}" + , 400 it 'does not allow editing the system collection', (done) -> @db.writeSnapshot 'system', 'test', {type:'json0', v:5, m:{}, data:{x:5}}, (err) => @@ -61,6 +76,100 @@ describe 'mongo', -> assert.equal v, 3 done() + describe 'getOps', -> + it 'errors if ops are missing at the start of the range', (done) -> + @db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) => + throw Error err if err + @db.writeOp 'testcollection', 'test', {v:1, op:{test:2}}, (err) => + throw Error err if err + + readOps = @db._readOps + @db._readOps = (cName, docName, start, end, callback) -> + readOps.call this, cName, docName, start, end, (err, ops) -> + callback err, ops.slice(1) + + @db.getOps 'testcollection', 'test', 0, null, (err, ops) => + @db._readOps = readOps + assert.equal err, 'Missing operations' + done() + + it 'errors if ops are missing in the middle of the range', (done) -> + @db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) => + throw Error err if err + @db.writeOp 'testcollection', 'test', {v:1, op:{test:2}}, (err) => + throw Error err if err + @db.writeOp 'testcollection', 'test', {v:2, op:{test:3}}, (err) => + throw Error err if err + + readOps = @db._readOps + @db._readOps = (cName, docName, start, end, callback) -> + readOps.call this, cName, docName, start, end, (err, ops) -> + callback err, [ops[0], ops[2]] + + @db.getOps 'testcollection', 'test', 0, null, (err, ops) => + @db._readOps = readOps + assert.equal err, 'Missing operations' + done() + + it 'errors if ops are missing when end specified', (done) -> + @db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) => + throw Error err if err + @db.writeOp 'testcollection', 'test', {v:1, op:{test:2}}, (err) => + throw Error err if err + + readOps = @db._readOps + @db._readOps = (cName, docName, start, end, callback) -> + readOps.call this, cName, docName, start, end, (err, ops) -> + callback err, ops.slice(1) + + @db.getOps 'testcollection', 'test', 0, 2, (err, ops) => + @db._readOps = readOps + assert.equal err, 'Missing operations' + done() + + it 'errors if ops are missing when ops missing from range end', (done) -> + @db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) => + throw Error err if err + + @db.getOps 'testcollection', 'test', 0, 5, (err, ops) => + assert.equal err, 'Missing operations' + done() + + it 'errors if there are no ops and snapshot version is larger than range start', (done) -> + @db.writeSnapshot 'testcollection', 'test', {type: 'json0', v: 3, data:{x:5}}, (err) => + throw Error err if err + + @db.getOps 'testcollection', 'test', 0, null, (err, ops) => + assert.equal err, 'Missing operations' + done() + + it 'handles race condition when ops are submitted at the same time as a getOps call', (done) -> + @db.writeSnapshot 'testcollection', 'test', {type: 'json0', v: 3, data:{x:5}}, (err) => + throw Error err if err + + # simulate race condition by writing in getVersion + getVersion = @db.getVersion + @db.getVersion = (cName, docName, callback) -> + @writeOp 'testcollection', 'test', {v:3, op:{test:1}}, (err) => + throw Error err if err + @getVersion = getVersion + @getVersion cName, docName, callback + + @db.getOps 'testcollection', 'test', 3, null, (err, ops) => + throw Error err if err + assert.ok ops + assert.ok ops.length + done() + + it 'should return nothing if there are no ops, and version matches range start', (done) -> + @db.writeSnapshot 'testcollection', 'test', {type: 'json0', v: 3, data:{x:5}}, (err) => + throw Error err if err + + @db.getOps 'testcollection', 'test', 3, null, (err, ops) => + throw Error err if err + assert.ok ops + assert.equal ops.length, 0 + done() describe 'query', -> it 'returns data in the collection', (done) ->