Skip to content

Commit

Permalink
'v0.1.12'
Browse files Browse the repository at this point in the history
  • Loading branch information
cwjohan committed Oct 30, 2014
1 parent d678b96 commit 03770fb
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 21 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ to detect when the length is too much, then use the `'drain'` event to resume se
...
ack()

If multiple queues are being consumed, they are consumed with highest priority given to the queues consumed first (i.e., in the order in which the consume statements are executed).

Note that ack(true) may be used to indicate that no further data is expected from the given work queue.
This is useful, for example, in testing, when a clean exit from a test case is desired.

Expand Down Expand Up @@ -316,6 +318,8 @@ to detect when the length is too much, then use the `'drain'` event to resume se
ack();
});

If multiple queues are being consumed, they are consumed with highest priority given to the queues consumed first (i.e., in the order in which the consume statements are executed).

Note that ack(true) may be used to indicate that no further data is expected from the given work queue.
This is useful, for example, in testing, when a clean exit from a test case is desired.

Expand Down Expand Up @@ -467,6 +471,10 @@ to permit some rudimentary control of backpressure. Documented 'drain' event.
**v0.1.11**: Added shutdownSoon function to QueueMgr and WorkQueueBroker. Improved README.md and demos. Made test suite
use unique queue names to prevent interference from demos.

**v0.1.12**: Modified WorkQueueMgr to preserve the order of
queue names used when calling popAny. ECMA-262 does not specify enumeration order, so an array should be used. Reference:
https://code.google.com/p/v8/issues/detail?id=164

##Note:

Part of this work is derived from node-simple-redis-queue v0.9.3 by James Smith and
Expand Down
29 changes: 18 additions & 11 deletions lib/workQueueBroker.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ WorkQueueBroker = (function(_super) {

function WorkQueueBroker(configFilePath) {
this.queues = {};
this.consuming = {};
this.consumingCB = {};
this.consumingNames = [];
this.qmgr = new QueueMgr(configFilePath);
return this;
}
Expand Down Expand Up @@ -100,14 +101,17 @@ WorkQueueBroker = (function(_super) {
};

WorkQueueBroker.prototype.consume = function(queueName, onData) {
this.consuming[queueName] = onData;
if (!this.consumingCB[queueName]) {
this.consumingNames.push(queueName);
}
this.consumingCB[queueName] = onData;
process.nextTick(this.monitor_.bind(this));
return this;
};

WorkQueueBroker.prototype.ack_ = function(queueName, cancel) {
if (cancel) {
delete this.consuming[queueName];
this.destroyQueue(queueName);
} else {
this.monitor_();
}
Expand All @@ -117,20 +121,23 @@ WorkQueueBroker = (function(_super) {
WorkQueueBroker.prototype.monitor_ = function() {
var args,
_this = this;
args = Object.keys(this.consuming);
args.push(function(queueName, data) {
if (_this.consuming[queueName]) {
return _this.consuming[queueName](data, _this.ack_.bind(_this, queueName));
(args = this.consumingNames.slice()).push(function(queueName, data) {
if (_this.consumingCB[queueName]) {
return _this.consumingCB[queueName](data, _this.ack_.bind(_this, queueName));
}
});
return this.qmgr.popAny.apply(this.qmgr, args);
};

WorkQueueBroker.prototype.destroyQueue = function(queueName) {
if (this.isValidQueueName(queueName)) {
delete this.consuming[queueName];
delete this.queues[queueName];
}
this.consumingNames = this.consumingNames.reduce(function(acc, x) {
if (x !== queueName) {
acc.push(x);
}
return acc;
}, []);
delete this.consumingCB[queueName];
delete this.queues[queueName];
return this;
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-redis-queue",
"version": "0.1.11",
"version": "0.1.12",
"description": "A simple, lightweight queue using Redis lpush and brpop",
"main": "lib/index.js",
"repository": {
Expand Down
22 changes: 13 additions & 9 deletions src/workQueueBroker.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class WorkQueueBrokerError extends Error
class WorkQueueBroker extends events.EventEmitter
constructor: (configFilePath) ->
@queues = {}
@consuming = {}
@consumingCB = {}
@consumingNames = []
@qmgr = new QueueMgr(configFilePath)
return this

Expand Down Expand Up @@ -56,27 +57,30 @@ class WorkQueueBroker extends events.EventEmitter
return this

consume: (queueName, onData) ->
@consuming[queueName] = onData
@consumingNames.push queueName unless @consumingCB[queueName]
@consumingCB[queueName] = onData
process.nextTick @monitor_.bind(this)
return this

ack_: (queueName, cancel) ->
if cancel
delete @consuming[queueName]
@destroyQueue queueName
else
@monitor_()
return this

monitor_: () ->
args = Object.keys(@consuming)
args.push (queueName, data) =>
@consuming[queueName] data, @ack_.bind(this, queueName) if @consuming[queueName]
(args = @consumingNames.slice()).push (queueName, data) =>
@consumingCB[queueName] data, @ack_.bind(this, queueName) if @consumingCB[queueName]
@qmgr.popAny.apply @qmgr, args

destroyQueue: (queueName) ->
if @isValidQueueName queueName
delete @consuming[queueName]
delete @queues[queueName]
@consumingNames = @consumingNames.reduce (acc,x) ->
acc.push x unless x is queueName
return acc
, []
delete @consumingCB[queueName]
delete @queues[queueName]
return this

disconnect: ->
Expand Down

0 comments on commit 03770fb

Please sign in to comment.