Skip to content

Commit

Permalink
ZongJi constructor accepts node-mysql Connection/Pool as arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
numtel committed May 14, 2017
1 parent c275ed4 commit 018ddec
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 14 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ For a complete implementation see [`example.js`](example.js)...

## ZongJi Class

The `ZongJi` constructor accepts one argument: an object containg MySQL connection details in the same format as used by `node-mysql`.
The `ZongJi` constructor accepts one argument of either:

* An object containing MySQL connection details in the same format as used by `node-mysql`
* Or, a `node-mysql` `Connection` or `Pool` object that will be used for querying column information.

If a `Connection` or `Pool` object is passed to the constructor, it will not be destroyed/ended by Zongji's `stop()` method.

Each instance includes the following methods:

Expand Down
46 changes: 35 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,47 @@
var mysql = require('mysql');
var Connection = require('mysql/lib/Connection');
var Pool = require('mysql/lib/Pool');

var util = require('util');
var EventEmitter = require('events').EventEmitter;
var generateBinlog = require('./lib/sequence/binlog');

var alternateDsn = [
{ type: Connection, config: function(obj) { return obj.config; } },
{ type: Pool, config: function(obj) { return obj.config.connectionConfig; } }
];

function ZongJi(dsn, options) {
this.set(options);

EventEmitter.call(this);

// to send table info query
var ctrlDsn = cloneObjectSimple(dsn);
ctrlDsn.database = 'information_schema';
this.ctrlConnection = mysql.createConnection(ctrlDsn);
this.ctrlConnection.on('error', this._emitError.bind(this));
this.ctrlConnection.on('unhandledError', this._emitError.bind(this));
var binlogDsn;

this.ctrlConnection.connect();
// one connection to send table info query
// Check first argument against possible connection objects
for(var i = 0; i < alternateDsn.length; i++) {
if(dsn instanceof alternateDsn[i].type) {
this.ctrlConnection = dsn;
this.ctrlConnectionOwner = false;
binlogDsn = cloneObjectSimple(alternateDsn[i].config(dsn));
}
}

if(!binlogDsn) {
// assuming that the object passed is the connection settings
var ctrlDsn = cloneObjectSimple(dsn);
this.ctrlConnection = mysql.createConnection(ctrlDsn);
this.ctrlConnection.on('error', this._emitError.bind(this));
this.ctrlConnection.on('unhandledError', this._emitError.bind(this));
this.ctrlConnection.connect();
this.ctrlConnectionOwner = true;

binlogDsn = dsn;
}
this.ctrlCallbacks = [];

this.connection = mysql.createConnection(dsn);
this.connection = mysql.createConnection(binlogDsn);
this.connection.on('error', this._emitError.bind(this));
this.connection.on('unhandledError', this._emitError.bind(this));

Expand Down Expand Up @@ -163,7 +186,7 @@ ZongJi.prototype._executeCtrlCallbacks = function() {
var tableInfoQueryTemplate = 'SELECT ' +
'COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME, ' +
'COLUMN_COMMENT, COLUMN_TYPE ' +
"FROM columns " + "WHERE table_schema='%s' AND table_name='%s'";
"FROM information_schema.columns " + "WHERE table_schema='%s' AND table_name='%s'";

ZongJi.prototype._fetchTableInfo = function(tableMapEvent, next) {
var self = this;
Expand Down Expand Up @@ -252,8 +275,9 @@ ZongJi.prototype.stop = function(){
self.connection.destroy();
self.ctrlConnection.query(
'KILL ' + self.connection.threadId,
function(error, reuslts){
self.ctrlConnection.destroy();
function(error, results){
if(self.ctrlConnectionOwner)
self.ctrlConnection.destroy();
}
);
};
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "zongji",
"version": "0.4.4",
"version": "0.4.5",
"description": "A mysql binlog listener running on Node.js",
"main": "index.js",
"directories": {
Expand Down Expand Up @@ -31,6 +31,6 @@
},
"dependencies": {
"iconv-lite": "^0.4.13",
"mysql": "~2.12.0"
"mysql": "~2.13.0"
}
}
56 changes: 56 additions & 0 deletions test/events.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
var mysql = require('mysql');
var settings = require('./settings/mysql');
var connector = require('./helpers/connector');
var querySequence = require('./helpers/querySequence');
Expand Down Expand Up @@ -92,6 +93,61 @@ module.exports = {

});
},
testPassedConnectionObj: function(test){
var testTable = 'conn_obj_test';
var connObjs = [
{ create: mysql.createConnection, end: function(obj) { obj.destroy(); } },
{ create: mysql.createPool, end: function(obj) { obj.end(); } }
];
querySequence(conn.db, [
'DROP TABLE IF EXISTS ' + conn.escId(testTable),
'CREATE TABLE ' + conn.escId(testTable) + ' (col INT UNSIGNED)',
'INSERT INTO ' + conn.escId(testTable) + ' (col) VALUES (10)',
], function(error, results){
if(error) console.error(error);
// Start second ZongJi instance
connObjs.forEach(function(connObj, index) {
var ctrlConn = connObj.create(settings.connection);
var zongji = new ZongJi(ctrlConn);
var events = [];

zongji.on('binlog', function(event) {
events.push(event);
});

zongji.start({
startAtEnd: true,
serverId: 12 + index, // Second instance must not use same server ID
includeEvents: ['tablemap', 'writerows']
});

connObj.zongji = zongji;
connObj.events = events;
});

// Give enough time to initialize
setTimeout(function(){
querySequence(conn.db, [
'INSERT INTO ' + conn.escId(testTable) + ' (col) VALUES (10)',
], function(error, results){
if(error) console.error(error);
// Should only have 2 events since ZongJi start
connObjs.forEach(function(connObj, index) {
expectEvents(test, connObj.events, [
{ /* do not bother testing anything on first event */ },
{ rows: [ { col: 10 } ] }
], 1, function(){
connObj.zongji.stop();
// When passing connection object, connection doesn't end on stop
connObj.end(connObj.zongji.ctrlConnection);
if(index === connObjs.length - 1) test.done();
});
});
});
}, 200);

});
},
testWriteUpdateDelete: function(test){
var testTable = 'events_test';
querySequence(conn.db, [
Expand Down

0 comments on commit 018ddec

Please sign in to comment.