Skip to content

Commit

Permalink
'v0.2.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
cwjohan committed Nov 28, 2014
1 parent 9794e72 commit 2601a08
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 91 deletions.
3 changes: 3 additions & 0 deletions CHANGE_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ send operations now emit an 'error' event if there are multiple outstanding pop
outstanding. This signals when a hang likely would happen. If you get one of these errors, then use
`connect2` rather than `connect`.

**v0.2.4**: Cleaned up some sloppiness around attaching to and closing of multiple connections.
Added `clearAll` function to WorkQueueMgr. Modified provider03 demo program to use clearAll.

24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ The `client2` property is used internally for push/send, while the `client` prop
is used for pop/consume. Use this feature to avoid hangs on push/send when multiple pop/consume operations
may be outstanding.

####attach(client)
####attach(client, client2)

This is an alternative to calling `connect`. It attaches to the given Redis client connection.
This is an alternative to calling `connect`. It attaches to the given Redis client connection or connections.
The `client2` parameter is optional and may be omitted.

####push(queueName, data)

Expand Down Expand Up @@ -88,8 +89,8 @@ Once data becomes available, it calls the callback with two parameters: a queue

####clear(queueNames..., onClearCB)

Removes the data from the queues specified by the given queue names. Calls the given callback when the
operation is complete.
Removes the data from the queues specified by the given queue names. Calls the given callback
function once all the given queues have been cleared.

####disconnect()

Expand Down Expand Up @@ -179,18 +180,25 @@ The channel's `client2` property is used internally for push/send, while the cha
is used for pop/consume. Use this feature to avoid hangs on push/send when multiple pop/consume operations
may be outstanding.

####attach(client)
####attach(client, client2)

This is an alternative to calling `connect`. It attaches to a given Redis client connection.
This is an alternative to calling `connect`. It attaches to a given Redis client connection or connections.
The `client2` parameter is optional and may be omitted.

####createQueue(queueName)

Returns a WorkQueue instance for the given queue name.

####clear(queueNames..., onClearCB)

Removes the data from one or more queues specified by the given queue names. Calls the
given callback when the operation is complete.
Removes the data from one or more queues specified by the given queue names. Calls the given callback
function once all the given queues have been cleared.

####clearAll(onClearCB)

Removes data from all the queues that have been created by this WorkQueueMgr instance
and not subsequently destroyed. Calls the given callback function once all the queues
have been cleared.

####disconnect()

Expand Down
11 changes: 9 additions & 2 deletions demo/HOW_TO_RUN_DEMOS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
node-redis-queue-demos
======================

Examples of how to use node-redis-queue.

##Running the demos -- preliminary steps

1. Open two Git Bash console windows.
Expand Down Expand Up @@ -64,6 +69,8 @@ This demo is almost the same as demo 02 but uses WorkQueueMgr rather than Channe
1. In the second console window, run `node provider04.js stop`, which will put a stop command in the queue. Shortly
thereafter, the worker04 process will stop.

Try the above again using `node worker04 3` in step 1. Observe that the worker will process three input requests in parallel and
that the results may become available in a different order than the input requests.
To check out the arity feature, try the above again using `node worker04.js 3` in step 1. Observe that the worker will process three input requests in parallel and that the results may become available in a different order than the input requests.

To check out the timeout feature, try the above yet again using `node worker04.js 1 1`. Observe the timeout messages emitted
about once every second.

22 changes: 3 additions & 19 deletions demo/lib/provider03.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Use this app in conjunction with worker03.js. See the worker03 source code
for more details.
*/

var WorkQueueMgr, clear, clearWorkQueues, createWorkQueues, expectedItemsQ1, expectedItemsQ2, initEventHandlers, itemCntQ1, itemCntQ2, mgr, queue1, queue1Name, queue2, queue2Name, sendData, sendStop, shutDown, stop, timesToRepeat;
var WorkQueueMgr, clear, createWorkQueues, expectedItemsQ1, expectedItemsQ2, initEventHandlers, itemCntQ1, itemCntQ2, mgr, queue1, queue1Name, queue2, queue2Name, sendData, sendStop, shutDown, stop, timesToRepeat;

queue1 = null;

Expand Down Expand Up @@ -58,7 +58,8 @@ mgr.connect(function() {
sendStop();
return shutDown();
} else if (clear) {
return clearWorkQueues(function() {
return mgr.clearAll(function() {
console.log('Cleared "' + queue1.queueName + '" and "' + queue2.queueName + '"');
return shutDown();
});
} else {
Expand All @@ -83,23 +84,6 @@ createWorkQueues = function() {
queue2 = mgr.createQueue(queue2Name);
};

clearWorkQueues = function(done) {
var queuesToClear;
queuesToClear = 2;
queue1.clear(function() {
console.log('Cleared "' + queue1.queueName + '"');
if (!--queuesToClear) {
return done();
}
});
return queue2.clear(function() {
console.log('Cleared "' + queue2.queueName + '"');
if (!--queuesToClear) {
return done();
}
});
};

sendData = function() {
var item, _i, _j, _len, _len1;
while (timesToRepeat--) {
Expand Down
12 changes: 2 additions & 10 deletions demo/src/provider03.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ mgr.connect () ->
sendStop()
shutDown()
else if clear
clearWorkQueues ->
mgr.clearAll ->
console.log 'Cleared "' + queue1.queueName + '" and "' + queue2.queueName + '"'
shutDown()
else
sendData()
Expand All @@ -70,15 +71,6 @@ createWorkQueues = ->
queue2 = mgr.createQueue queue2Name
return

clearWorkQueues = (done) ->
queuesToClear = 2
queue1.clear () ->
console.log 'Cleared "' + queue1.queueName + '"'
done() unless --queuesToClear
queue2.clear () ->
console.log 'Cleared "' + queue2.queueName + '"'
done() unless --queuesToClear

sendData = ->
while timesToRepeat--
for item in expectedItemsQ1
Expand Down
53 changes: 33 additions & 20 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,47 +31,54 @@ Channel = (function(_super) {
}

Channel.prototype.connect = function(onReady) {
this.client2 = this.client = this.configurator_.getClient(this.config_);
return this.attach(this.client, onReady);
this.readyCntDn_ = 1;
this.client = this.configurator_.getClient(this.config_);
this.client.once('ready', this.onReady_.bind(this, onReady));
return this.attach(this.client);
};

Channel.prototype.connect2 = function(onReady) {
this.readyCntDn_ = 2;
this.client = this.configurator_.getClient(this.config_);
this.client.once('ready', this.onReady_.bind(this, onReady));
this.client2 = this.configurator_.getClient(this.config_);
this.readyCnt_ = 0;
this.attach(this.client, this.onReady2_.bind(this, onReady));
return this.attach(this.client, this.onReady2_.bind(this, onReady));
this.client2.once('ready', this.onReady_.bind(this, onReady));
return this.attach(this.client, this.client2);
};

Channel.prototype.onReady2_ = function(onReady) {
if (++this.readyCnt_ === 2 && onReady && typeof onReady === 'function') {
Channel.prototype.onReady_ = function(onReady) {
if (--this.readyCntDn_ === 0 && onReady && typeof onReady === 'function') {
return onReady();
}
};

Channel.prototype.attach = function(client, onReady) {
Channel.prototype.attach = function(client, client2) {
var _this = this;
this.client = client;
if (!(this.client instanceof Object)) {
throw new ChannelError('No client supplied');
this.client2 = client2;
if (!this.client2) {
this.client2 = this.client;
}
this.client.on('ready', function() {
_this.ready = true;
if (onReady && typeof onReady === 'function') {
onReady();
}
return _this.emit('ready');
});
this.client.on('error', function(err) {
return _this.emit('error', err);
});
this.client.on('end', function() {
_this.ready = false;
return _this.emit('end');
});
this.client.on('drain', function() {
return _this.emit('drain');
});
if (this.client2) {
this.client2.on('error', function(err) {
return _this.emit('error', err);
});
this.client2.on('end', function() {
return _this.emit('end');
});
this.client2.on('drain', function() {
return _this.emit('drain');
});
}
return this;
};

Expand Down Expand Up @@ -168,19 +175,25 @@ Channel = (function(_super) {

Channel.prototype.disconnect = function() {
this.client.quit();
if (this.client !== this.client2) {
this.client2.quit();
}
return this;
};

Channel.prototype.end = function() {
this.client.end();
if (this.client !== this.client2) {
this.client2.end();
}
return this;
};

Channel.prototype.shutdownSoon = function(delay) {
var _this = this;
process.nextTick(function() {
if (_this.client.offline_queue.length === 0) {
return _this.client.end();
if (_this.client2.offline_queue.length === 0) {
return _this.end();
} else {
return setTimeout(function() {
return _this.shutdownSoon(delay);
Expand Down
17 changes: 11 additions & 6 deletions lib/workQueueMgr.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ WorkQueueMgr = (function(_super) {
return this;
};

WorkQueueMgr.prototype.attach = function(client) {
this.client = client;
this.channel.attach(this.client);
WorkQueueMgr.prototype.attach = function() {
var clients, _ref;
clients = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
this.clients = clients;
(_ref = this.channel).attach.apply(_ref, this.clients);
return this.initEmitters_();
};

Expand All @@ -90,9 +92,6 @@ WorkQueueMgr = (function(_super) {
this.channel.on('timeout', function(keys, cancel) {
return _this.emit('timeout', keys, cancel);
});
this.channel.on('ready', function() {
return _this.emit('ready');
});
this.channel.on('error', function(err) {
return _this.emit('error', err);
});
Expand Down Expand Up @@ -180,6 +179,12 @@ WorkQueueMgr = (function(_super) {
return this;
};

WorkQueueMgr.prototype.clearAll = function(onClear) {
var args;
(args = Object.keys(this.queues)).push(onClear);
return this.channel.clear.apply(this.channel, args);
};

WorkQueueMgr.prototype.destroyQueue_ = function(queueName) {
this.ensureValidQueueName_(queueName);
if (this.consumingCB[queueName]) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-redis-queue",
"version": "0.2.3",
"version": "0.2.4",
"description": "A simple, lightweight queue using Redis lpush and brpop",
"main": "lib/index.js",
"repository": {
Expand Down
44 changes: 25 additions & 19 deletions src/channel.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,37 @@ class Channel extends events.EventEmitter
@outstanding = 0

connect: (onReady) ->
@client2 = @client = @configurator_.getClient(@config_)
@attach @client, onReady
@readyCntDn_ = 1
@client = @configurator_.getClient @config_
@client.once 'ready', @onReady_.bind this,onReady
@attach @client

connect2: (onReady) ->
@client = @configurator_.getClient(@config_)
@client2 = @configurator_.getClient(@config_)
@readyCnt_ = 0
@attach @client, @onReady2_.bind(this, onReady)
@attach @client, @onReady2_.bind(this, onReady)
@readyCntDn_ = 2
@client = @configurator_.getClient @config_
@client.once 'ready', @onReady_.bind this, onReady
@client2 = @configurator_.getClient @config_
@client2.once 'ready', @onReady_.bind this, onReady
@attach @client, @client2

onReady2_: (onReady) ->
onReady() if ++@readyCnt_ is 2 and onReady and typeof onReady is 'function'
onReady_: (onReady) ->
onReady() if --@readyCntDn_ is 0 and onReady and typeof onReady is 'function'

attach: (@client, onReady) ->
unless @client instanceof Object
throw new ChannelError 'No client supplied'
@client.on 'ready', =>
@ready = true
onReady() if onReady and typeof onReady is 'function'
@emit 'ready'
attach: (@client, @client2) ->
@client2 = @client unless @client2
@client.on 'error', (err) =>
@emit 'error', err
@client.on 'end', =>
@ready = false
@emit 'end'
@client.on 'drain', =>
@emit 'drain'
if @client2
@client2.on 'error', (err) =>
@emit 'error', err
@client2.on 'end', =>
@emit 'end'
@client2.on 'drain', =>
@emit 'drain'
return this

push: (queueName, data) ->
Expand Down Expand Up @@ -100,16 +104,18 @@ class Channel extends events.EventEmitter

disconnect: ->
@client.quit()
@client2.quit() unless @client is @client2
return this

end: ->
@client.end()
@client2.end() unless @client is @client2
return this

shutdownSoon: (delay) ->
process.nextTick =>
if @client.offline_queue.length is 0
@client.end()
if @client2.offline_queue.length is 0
@end()
else
setTimeout =>
@shutdownSoon delay
Expand Down
Loading

0 comments on commit 2601a08

Please sign in to comment.