Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update and delete functionality #8

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 58 additions & 13 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,95 @@ module.exports = {
// default config
{
batchSize: 1,
insertOptions: { w: 1 },
operationType: 'insert'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I have the feeling you break the API for some users.
Maybe we should keep insertOptions for now, and use it into operation.insert?

},
// overrided options
options,
// override config
options
);

// those variables can't be initialized without Promises, so we wait first drain
// These variables can't be initialized without Promises, so we wait for the first drain
let dbConnection;
let collection;
let records = [];

// this function is usefull to insert records and reset the records array
const insert = async () => {
await collection.insert(records, config.insertOptions);
records = [];
// Supported operations
const operations = {
insert: async () => {
return collection.insertMany(records);
},
update: async () => {
// make sure we have some index so we can select the correct record to update
if (typeof config.indexName === 'undefined') {
return Promise.reject(Error('Operation type was update, but no index was provided.'));
}

// update each record in tandem.
return Promise.all(records.map(doc =>
collection.updateMany({ [config.indexName]: doc[config.indexName] }, { $set: doc })));
},
delete: async () => {
// make sure we have some index so we can select the correct record to update
if (typeof config.indexName === 'undefined') {
return Promise.reject(Error('Operation type was delete, but no index was provided.'));
}

// update each record in tandem.
return Promise.all(records.map(doc =>
collection.deleteMany({ [config.indexName]: doc[config.indexName] }, { $set: doc })));
},
invalid: () => { return Promise.reject(Error(`Invalid operation type: ${config.operationType}`)); }
};

// Utility for writing to the db with the correct operation
const writeToDB = async () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could cache this function result before calling it.
Here you call this function every time we need to write to db (line 84).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus the error about invalid operation will be triggered before the stream is used.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I speak about which operation to choose (which callback)

if (Object.keys(operations).includes(config.operationType)) {
try {
return operations[config.operationType]();
} catch (err) {
return operations.invalid();
}
} else {
// TODO: add some kind of error message for invalid operations
return operations.invalid();
}
};

// stream
const writable = new Writable({
objectMode: true,
write: async (record, encoding, next) => {
// try/catch for initialization
try {
// connection
if (!dbConnection) dbConnection = await MongoClient.connect(config.dbURL);
if (!collection) collection = await dbConnection.collection(config.collection);
} catch (err) {
if (dbConnection) await dbConnection.close();
writable.emit('Error on db init', err);
}

// try/catch for write operations
try {
// add to batch records
records.push(record);

// insert and reset batch recors
if (records.length >= config.batchSize) await insert();
// write and reset batch records
if (records.length >= config.batchSize) {
await writeToDB();
records = [];
}

// next stream
next();
} catch (error) {
if (dbConnection) await dbConnection.close();
writable.emit('error', error);
writable.emit('Error on data write', error);
}
}
});

writable.on('finish', async () => {
try {
if (records.length > 0) await insert();
if (records.length > 0) await writeToDB();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fell more confortable if you reset records here too.
That's why I put it to insert at first place :)

if (dbConnection) await dbConnection.close();

writable.emit('close');
Expand Down
182 changes: 162 additions & 20 deletions src/spec/index_spec.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
/* global beforeAll, beforeEach, afterAll, expect, it, describe */
/* global beforeEach, afterAll, expect, it, describe */
import MongoDB from 'mongodb';
import fs from 'fs';
import path from 'path';
import JSONStream from 'JSONStream';
import StreamToMongoDB from '../index';

const DATA_FILE_LOCATION = path.resolve('src/spec/support/data.json');
const INSERT_DATA_FILE_LOCATION = path.resolve('src/spec/support/insertData.json');
const UPDATE_DATA_FILE_LOCATION = path.resolve('src/spec/support/updateData.json');
const UPDATE_DATA_FILE_VALUE = 1337;

const testDB = 'streamToMongoDB';
const config = { dbURL: `mongodb://localhost:27017/${testDB}`, collection: 'test' };
const testConfig = { dbURL: `mongodb://localhost:27017/${testDB}`, collection: 'test' };

const expectedNumberOfRecords = require('./support/data.json').length;
const expectedNumberOfRecords = require('./support/insertData.json').length;

describe('.streamToMongoDB', () => {
beforeEach(async (done) => {
Expand All @@ -22,58 +25,197 @@ describe('.streamToMongoDB', () => {
done();
});

// insert basic
describe('with no given options', () => {
it('it uses the default config to stream the expected number of documents to MongoDB', async (done) => {
runStreamTest(config, done);
it('uses the default config to insert the expected number of documents to MongoDB', async (done) => {
runInsertStream(testConfig, done);
});
});

// update basic
describe('with no given options for an update', () => {
it('uses the default config to update the records in the db', async (done) => {
runUpdateStream(testConfig, done);
});
});

// inserts with options
describe('inserts with given options', () => {
describe('with batchSize same as the number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
testConfig.batchSize = expectedNumberOfRecords;
runInsertStream(testConfig, done);
});
});

describe('with batchSize less than number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
testConfig.batchSize = expectedNumberOfRecords - 3;
runInsertStream(testConfig, done);
});
});

describe('with batchSize more than the number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
testConfig.batchSize = expectedNumberOfRecords * 100;
runInsertStream(testConfig, done);
});
});
});

describe('with given options', () => {
// updates with options
describe('updates with given options', () => {
describe('with batchSize same as the number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
config.batchSize = expectedNumberOfRecords;
runStreamTest(config, done);
testConfig.batchSize = expectedNumberOfRecords;
runUpdateStream(testConfig, done);
});
});

describe('with batchSize less than number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
config.batchSize = expectedNumberOfRecords - 3;
runStreamTest(config, done);
testConfig.batchSize = expectedNumberOfRecords - 3;
runUpdateStream(testConfig, done);
});
});

describe('with batchSize more than the number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
config.batchSize = expectedNumberOfRecords * 100;
runStreamTest(config, done);
testConfig.batchSize = expectedNumberOfRecords * 100;
runUpdateStream(testConfig, done);
});
});
});

// deletes with options
describe('deletes with given options', () => {
describe('with batchSize same as the number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
testConfig.batchSize = expectedNumberOfRecords;
runDeleteStream(testConfig, done);
});
});

describe('with batchSize less than number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
testConfig.batchSize = expectedNumberOfRecords - 3;
runDeleteStream(testConfig, done);
});
});

describe('with batchSize more than the number of documents to be streamed', () => {
it('it streams the expected number of documents to MongoDB', (done) => {
testConfig.batchSize = expectedNumberOfRecords * 100;
runDeleteStream(testConfig, done);
});
});
});
});

const connect = () => MongoDB.MongoClient.connect(config.dbURL);
const connect = () => MongoDB.MongoClient.connect(testConfig.dbURL);

const runStreamTest = (options, done) => {
fs.createReadStream(DATA_FILE_LOCATION)
const runInsertStream = (config, done) => {
fs.createReadStream(INSERT_DATA_FILE_LOCATION)
.pipe(JSONStream.parse('*'))
.pipe(StreamToMongoDB.streamToMongoDB(options))
.pipe(StreamToMongoDB.streamToMongoDB(config))
.on('error', (err) => {
done();
done.fail(err);
})
.on('close', () => {
ensureAllDocumentsInserted(done);
ensureAllDocumentsInserted(config, done);
});
};

const ensureAllDocumentsInserted = async (done) => {
const ensureAllDocumentsInserted = async (config, done) => {
const db = await connect();
const count = await db.collection(config.collection).count();
await db.close();
expect(count).toEqual(expectedNumberOfRecords);
done();
};

const runUpdateStream = (config, done) => {
fs.createReadStream(INSERT_DATA_FILE_LOCATION)
.pipe(JSONStream.parse('*'))
.pipe(StreamToMongoDB.streamToMongoDB(config))
.on('error', (err) => {
done.fail(err);
})
.on('close', () => {
updateAllDocuments(config, done);
});
};

const updateAllDocuments = (config, done) => {
// update every document to have the same total
const options = Object.assign(
{},
config,
{
operationType: 'update',
indexName: 'secret'
}
);
fs.createReadStream(UPDATE_DATA_FILE_LOCATION)
.pipe(JSONStream.parse('*'))
.pipe(StreamToMongoDB.streamToMongoDB(options))
.on('error', (err) => {
done.fail(err);
})
.on('close', () => {
ensureAllDocumentsUpdated(config, done);
});
};

const ensureAllDocumentsUpdated = async (config, done) => {
const db = await connect();
const data = await db.collection(config.collection).find({}).toArray();
data.forEach((d) => { expect(d.total).toEqual(UPDATE_DATA_FILE_VALUE); });
await db.close();
done();
};

const runDeleteStream = (config, done) => {
fs.createReadStream(INSERT_DATA_FILE_LOCATION)
.pipe(JSONStream.parse('*'))
.pipe(StreamToMongoDB.streamToMongoDB(config))
.on('error', (err) => {
done.fail(err);
})
.on('close', () => {
deleteAllDocuments(config, done);
});
};

const deleteAllDocuments = (config, done) => {
// update every document to have the same total
const options = Object.assign(
{},
config,
{
operationType: 'delete',
indexName: 'secret'
}
);
fs.createReadStream(UPDATE_DATA_FILE_LOCATION)
.pipe(JSONStream.parse('*'))
.pipe(StreamToMongoDB.streamToMongoDB(options))
.on('error', (err) => {
done.fail(err);
})
.on('close', () => {
ensureAllDocumentsDeleted(config, done);
});
};

const ensureAllDocumentsDeleted = async (config, done) => {
const db = await connect();
const count = await db.collection(config.collection).count();
expect(count).toEqual(0);
await db.close();
done();
};

const clearDB = async () => {
const dbConnection = await connect();
await dbConnection.dropDatabase();
Expand Down
File renamed without changes.
Loading