Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional ttl index to expire ops #12

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ function LiveDbMongo(mongo, options) {
if (!options) options = {};

this.mongoPoll = options.mongoPoll || null;
this.ttl = options.ttl || null;

// The getVersion() and getOps() methods depend on a collectionname_ops
// collection, and that collection should have an index on the operations
Expand Down Expand Up @@ -166,6 +167,12 @@ LiveDbMongo.prototype._opCollection = function(cName) {
collection.ensureIndex({name: 1, v: 1}, true, function(error, name) {
if (error) console.warn('Warning: Could not create index for op collection:', error.stack || error);
});

if (this.ttl) {
collection.ensureIndex({ 'm.d': 1 }, { expireAfterSeconds: this.ttl, background: true }, function(error) {
if (error) console.warn('Could not create ttl index for op collection: ', error.stack || error);
});
}

this.opIndexes[cName] = true;
}
Expand All @@ -182,6 +189,10 @@ LiveDbMongo.prototype.writeOp = function(cName, docName, opData, callback) {
var data = shallowClone(opData);
data._id = docName + ' v' + opData.v,
data.name = docName;
if (this.ttl) {
if (!data.m) data.m = {};
data.m.d = new Date();
}

this._opCollection(cName).save(data, callback);
};
Expand All @@ -204,21 +215,75 @@ LiveDbMongo.prototype.getVersion = function(cName, docName, callback) {
});
};

// Exported for testing
LiveDbMongo.prototype._readOps = function(cName, docName, start, end, callback) {
var query = end == null ? {$gte:start} : {$gte:start, $lt:end};
this._opCollection(cName).find({name:docName, v:query}, {sort:{v:1}}).toArray(callback);
};

LiveDbMongo.prototype.getOps = function(cName, docName, start, end, callback) {
var err; if (err = this._check(cName)) return callback(err);

var query = end == null ? {$gte:start} : {$gte:start, $lt:end};
this._opCollection(cName).find({name:docName, v:query}, {sort:{v:1}}).toArray(function(err, data) {
if (err) return callback(err);

var retried = false;
var self = this;

function retry() {
// First, attempt to get the requested ops from the op collection
self._readOps(cName, docName, start, end, function(err, data) {
if (err) return callback(err);

// If we didn't get any ops, and we don't know the end of the range requested,
// then we need to get the document version to check if ops were expected.
if (data.length === 0 && end == null) {
// If we already retried (see below for why),
// then ops are missing from the requested range.
if (retried)
return callback('Missing operations');

// Get the current snapshot version
self.getVersion(cName, docName, function(err, v) {
if (err) return callback(err);

// Race condition! If the version returned is greater than the start
// of the requested range (and we got no ops above), an op may have
// been submitted between when we got ops and when we got the version.
// Retry to get the missing ops.
if (v > start) {
retried = true;
retry();
} else {
done(data);
}
});
} else {
done(data);
}
});
}

function done(data) {
// If we know the end of the requested op range, check that we got the right number.
if (end != null) {
var expectedLength = start >= end ? 0 : end - start;
if (data.length !== expectedLength)
return callback('Missing operations');
}

var v = start;
for (var i = 0; i < data.length; i++) {
// Check that we got ops in the right order with none missing.
if (data[i].v !== v++)
return callback('Missing operations');

// Strip out _id in the results
delete data[i]._id;
delete data[i].name;
}

callback(null, data);
});
}

retry();
};


Expand Down
109 changes: 109 additions & 0 deletions test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ describe 'mongo', ->

throw Error "Could not find index in ops db - #{JSON.stringify(indexes)}"
, 400

it 'adds a ttl index for ops if given option', (done) -> clear =>
db = liveDbMongo 'mongodb://localhost:27017/test?auto_reconnect', safe: false, ttl: 10
db.writeOp 'testcollection', 'foo', {v:0, create:{type:'json0'}}, (err) =>
setTimeout =>
@mongo.collection('testcollection_ops').indexInformation (err, indexes) ->
throw err if err

# We should find an index with [ 'm.d', 1 ]
for name, idx of indexes
if JSON.stringify(idx) is '[["m.d",1]]'
return done()

throw Error "Could not find index in ops db - #{JSON.stringify(indexes)}"
, 400

it 'does not allow editing the system collection', (done) ->
@db.writeSnapshot 'system', 'test', {type:'json0', v:5, m:{}, data:{x:5}}, (err) =>
Expand All @@ -61,6 +76,100 @@ describe 'mongo', ->
assert.equal v, 3
done()

describe 'getOps', ->
it 'errors if ops are missing at the start of the range', (done) ->
@db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) =>
throw Error err if err
@db.writeOp 'testcollection', 'test', {v:1, op:{test:2}}, (err) =>
throw Error err if err

readOps = @db._readOps
@db._readOps = (cName, docName, start, end, callback) ->
readOps.call this, cName, docName, start, end, (err, ops) ->
callback err, ops.slice(1)

@db.getOps 'testcollection', 'test', 0, null, (err, ops) =>
@db._readOps = readOps
assert.equal err, 'Missing operations'
done()

it 'errors if ops are missing in the middle of the range', (done) ->
@db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) =>
throw Error err if err
@db.writeOp 'testcollection', 'test', {v:1, op:{test:2}}, (err) =>
throw Error err if err
@db.writeOp 'testcollection', 'test', {v:2, op:{test:3}}, (err) =>
throw Error err if err

readOps = @db._readOps
@db._readOps = (cName, docName, start, end, callback) ->
readOps.call this, cName, docName, start, end, (err, ops) ->
callback err, [ops[0], ops[2]]

@db.getOps 'testcollection', 'test', 0, null, (err, ops) =>
@db._readOps = readOps
assert.equal err, 'Missing operations'
done()

it 'errors if ops are missing when end specified', (done) ->
@db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) =>
throw Error err if err
@db.writeOp 'testcollection', 'test', {v:1, op:{test:2}}, (err) =>
throw Error err if err

readOps = @db._readOps
@db._readOps = (cName, docName, start, end, callback) ->
readOps.call this, cName, docName, start, end, (err, ops) ->
callback err, ops.slice(1)

@db.getOps 'testcollection', 'test', 0, 2, (err, ops) =>
@db._readOps = readOps
assert.equal err, 'Missing operations'
done()

it 'errors if ops are missing when ops missing from range end', (done) ->
@db.writeOp 'testcollection', 'test', {v:0, op:{test:1}}, (err) =>
throw Error err if err

@db.getOps 'testcollection', 'test', 0, 5, (err, ops) =>
assert.equal err, 'Missing operations'
done()

it 'errors if there are no ops and snapshot version is larger than range start', (done) ->
@db.writeSnapshot 'testcollection', 'test', {type: 'json0', v: 3, data:{x:5}}, (err) =>
throw Error err if err

@db.getOps 'testcollection', 'test', 0, null, (err, ops) =>
assert.equal err, 'Missing operations'
done()

it 'handles race condition when ops are submitted at the same time as a getOps call', (done) ->
@db.writeSnapshot 'testcollection', 'test', {type: 'json0', v: 3, data:{x:5}}, (err) =>
throw Error err if err

# simulate race condition by writing in getVersion
getVersion = @db.getVersion
@db.getVersion = (cName, docName, callback) ->
@writeOp 'testcollection', 'test', {v:3, op:{test:1}}, (err) =>
throw Error err if err
@getVersion = getVersion
@getVersion cName, docName, callback

@db.getOps 'testcollection', 'test', 3, null, (err, ops) =>
throw Error err if err
assert.ok ops
assert.ok ops.length
done()

it 'should return nothing if there are no ops, and version matches range start', (done) ->
@db.writeSnapshot 'testcollection', 'test', {type: 'json0', v: 3, data:{x:5}}, (err) =>
throw Error err if err

@db.getOps 'testcollection', 'test', 3, null, (err, ops) =>
throw Error err if err
assert.ok ops
assert.equal ops.length, 0
done()

describe 'query', ->
it 'returns data in the collection', (done) ->
Expand Down