diff --git a/CHANGE_LOG.md b/CHANGE_LOG.md new file mode 100644 index 0000000..6c07b17 --- /dev/null +++ b/CHANGE_LOG.md @@ -0,0 +1,61 @@ +##Change Log + +**v0.0.0**: Initial version. + +**v0.0.1-3**: Changes to README.md. Implementation of jasmine-node tests. + +**v0.1.2**: Refactored to implement new QueueMgr and WorkQueueBroker interfaces; Implementation of connection strategies. + +**v0.1.3**: Further Refactoring to implement new QueueMgr and WorkQueueBroker interfaces; +Merged v0.1.3 from flex branch into master. + +**v0.1.4**: Fix for issue #1 - Where to find redis-queue-config file is too restrictive - Now uses +QUEUE_CONFIG_FILE environment variable to find the config file; +An alternative is to specify the config file path in the queueMgr or workQueueBroker constructor. +Changed testing from Jasmine to Mocha.; Implemented Mocha tests; +Introduced interleaved tests. + +**v0.1.5**: Various comment and README.md changes; +Corrected error in provision of Redis Cloud hostname; + +**v0.1.6**: Compiled to capture latest mod to .coffee source. + +**v0.1.7**: Fix for issue #4 - Stall occurs when one of two work queues on same connection becomes empty. + +**v0.1.8**: Fix for REDISCLOUD_URL having no auth field; Created change log in README.md. + +**v0.1.9**: Added usage examples to README.md for WorkQueueBroker. Added commandQueueLength function +to permit some rudimentary control of backpressure. Documented 'drain' event. + +**v0.1.10**: Changed `grunt test` to use mocha rather than jasmine-node. Improved usage documentation. + +**v0.1.11**: Added shutdownSoon function to QueueMgr and WorkQueueBroker. Improved README.md and demos. Made test suite +use unique queue names to prevent interference from demos. + +**v0.1.12**: Modified WorkQueueMgr to preserve the order of +queue names used when calling popAny. ECMA-262 5.1 (15.2.3.14 Object.keys and 12.6.4 The for-in Statement) does not +specify enumeration order, so an array should be used. Also, see: https://code.google.com/p/v8/issues/detail?id=164 + +**v0.1.13**: Modified connStrategyBlueMixRedisCloud to use a configured redis version. Added config info to README.md. + +**v0.1.14**: Added 'clean' task to Gruntfile. Fixed some potential problems found by jshint. Tidied Gruntfile. +Replaced some URLs in the demo source that were no longer working (404 not found). + +**v0.1.15**: Send now checks that queue has not been destroyed. +Added 'compile-test' task to Gruntfile. Fixed +incorrect calls to isValidQueueName. Added tests for WorkQueue +exceptions. Grunt now uses grunt-mocha-test plugin for better +reporting. + +**v0.1.16**: Reverted WorkQueue behaviour back to previous version since v0.1.15 change was too restrictive. +Added destroy function to WorkQueue. Updated README.md with info about the new destroy function. Also, added +some architecture notes. + +**v0.1.17**: Added ability to schedule parallel jobs in the consume function via an optional arity parameter. +Added @outstanding to queueMgr class. worker04 example uses a second WorkQueueBroker instance when arity is +greater than 1 to send result back to provider04. + +**v0.2.0**: Renamed QueueMgr class to Channel. Renamed WorkQueueBroker class to WorkQueueMgr. Updated test +and demo code to use the new class names, which have been adopted to correspond better to what they represent. +The README.md file has been split up into several separate files to improve readability. + diff --git a/COFFEESCRIPT_USAGE_EXAMPLES.md b/COFFEESCRIPT_USAGE_EXAMPLES.md new file mode 100644 index 0000000..58db4f7 --- /dev/null +++ b/COFFEESCRIPT_USAGE_EXAMPLES.md @@ -0,0 +1,159 @@ +###Channel Coffescript Usage Example + +1. Ensure you have a Redis server installed and running. For example, once installed, you can run it locally by + + redis-server & + +1. Require `node-redis-queue` Channel + + Channel = require('node-redis-queue').Channel + +1. Create a Channel instance and connect to Redis + + channel = new Channel() + channel.connect -> + console.log 'ready' + myMainLogic() + + Alternatively, you can provide an existing Redis connection (i.e., a redis client instance) + + channel.attach redisConn + +1. Optionally, handle error events + + channel.on 'error', (error) -> + console.log 'Stopping due to: ' + error + process.exit() + +1. Optionally, handle lost connection events + + channel.on 'end', -> + console.log 'Connection lost' + +1. Optionally, clear previous data from the queue, providing a callback + to handle the data. + + channel.clear queueName, -> + console.log 'cleared' + doImportantStuff() + +1. Optionally, push data to your queue + + channel.push queueName, myData + +1. Optionally, pop data off your queue + + channel.pop queueName, (myData) -> + console.log 'data = ' + myData + + or, alternatively, pop off any of multiple queues + + channel.popAny queueName1, queueName2, (myData) -> + console.log 'data = ' + myData + + Once popping data from a queue, avoid pushing data to the same queue from the same connection, since + a hang could result. This appears to be a Redis limitation when using blocking reads. + +1. When done, quit the Channel instance + + channel.disconnect() + + or, alternatively, if consuming data from the queue, end the connection + + channel.end() + + or, if there may be a number of redis commands queued, + + channel.shutdownSoon() + +###WorkQueueMgr Coffeescript Usage Example + +1. Ensure you have a Redis server installed and running. For example, once installed, you can run it locally by + + redis-server & + +1. Require `node-redis-queue` WorkQueueMgr + + WorkQueueMgr = require('node-redis-queue').WorkQueueMgr + +1. Create a WorkQueueMgr instance and connect to Redis + + mgr = new WorkQueueMgr() + mgr.connect -> + console.log 'ready' + myMainLogic() + + Alternatively, you can provide an existing Redis connection (i.e., a redis client instance) + + mgr.attach redisConn + +1. Optionally, handle error events + + mgr.on 'error', (error) -> + console.log 'Stopping due to: ' + error + process.exit() + +1. Optionally, handle lost connection events + + mgr.on 'end', -> + console.log 'Connection lost' + +1. Create a work queue instance + + queue = mgr.createQueue queueName + +1. Optionally, clear previous data from the queue, providing a callback + to handle the data. + + queue.clear -> + console.log 'cleared' + doImportantStuff() + +1. Optionally, send data to your queue + + queue.send myData + +1. Optionally, consume data from your queue and call ack when ready to consume another data item + + queue.consume (myData, ack) -> + console.log 'data = ' + myData + ... + ack() + + or, alternatively, + + queue.consume (myData, ack) -> + console.log 'data = ' + myData + ... + ack() + , arity + + where arity is an integer indicating the number of async callbacks to schedule in parallel. See demo 04 for example usage. + + If multiple queues are being consumed, they are consumed with highest priority given to the queues consumed first + (i.e., in the order in which the consume statements are executed). + + Note that ack(true) may be used to indicate that no further data is expected from the given work queue. + This is useful, for example, in testing, when a clean exit from a test case is desired. + + Once consuming from a queue, avoid sending data to the same queue from the same connection + (i.e., the same mgr instance), since a hang could result. This appears to be a Redis limitation when using + blocking reads. You can test `mgr.channel.outstanding` for zero to determine if it is OK to send on the same connection. + +1. Optionally, destroy a work queue if it no longer is needed. Assign null to the queue variable to free up memory. + + queue.destroy() + queue = null + +1. When done, quit the WorkQueueMgr instance + + mgr.disconnect() + + or, alternatively, if consuming data from the queue, end the connection + + mgr.end() + + or, if there may be a number of redis commands queued, + + mgr.shutdownSoon() + diff --git a/DEVELOPER_INFO.md b/DEVELOPER_INFO.md new file mode 100644 index 0000000..543ddf5 --- /dev/null +++ b/DEVELOPER_INFO.md @@ -0,0 +1,30 @@ +##Running the test suite + +Use either `grunt test` or `npm test` to run the suite of tests using Mocha. The test cases reside in the `test` directory. + +##Running grunt for development tasks + +`grunt` runs coffeelint and then coffee. + +`grunt coffeelint` runs coffeelint on all the .coffee source code in the src directory. + +`grunt coffee` runs coffee on all the .coffee source code in the src directory, converting it to .js code in the +corresponding lib directory. + +`grunt jshint` runs jshint on all the .js code except one in the demo/lib/helpers directory. Note that jshint may +have a lot of complaints about the generated .js code, but is useful to spot potential problems. + +`grunt clean` runs a script that removes vim backup files (i.e., files ending with '~' and .js files in the test directory). + +`grunt test` runs the suite of tests using Mocha. It looks for .coffee files in the test directory. + +`grunt bump` bumps the patch number up in the package.json file. + +`grunt git-tag` commits the latest staged code changes and tags the code with the version obtained from the package.json file. + +`grunt release` runs coffee on all the .coffee source code in the src directory, converting it to .js code, and +then runs the git-tag task to commit the latest staged code changes and tag the code with the version obtained from the +package.json file. + +`grunt compile-test` runs coffee on the test .coffee code. This is only for debugging of test cases when you need to see the generated javascript code. + diff --git a/JAVASCRIPT_USAGE_EXAMPLES.md b/JAVASCRIPT_USAGE_EXAMPLES.md new file mode 100644 index 0000000..ce4d48c --- /dev/null +++ b/JAVASCRIPT_USAGE_EXAMPLES.md @@ -0,0 +1,166 @@ +###Channel Javascript Usage Example + +1. Ensure you have a Redis server installed and running. For example, once installed, you can run it locally by + + redis-server & + +1. Require `node-redis-queue` Channel + + var Channel = require('node-redis-queue').Channel; + + +1. Create a Channel instance and connect to Redis + + var channel = new Channel(); + channel.connect(function() { + console.log('ready'); + myMainLogic(); + }); + +1. Optionally, handle error events + + channel.on('error', function(error) { + console.log('Stopping due to: ' + error); + process.exit(); + }); + +1. Optionally, handle lost connection events + + channel.on('end', function() { + console.log('Connection lost'); + }); + +1. Optionally, clear previous data from the queue, providing a callback. + + channel.clear(function() { + console.log('cleared'); + doImportantStuff(); + }); + +1. Optionally, push data to your queue + + channel.push(queueName, myData); + +1. Optionally, pop data off your queue, providing a callback to + handle the data + + channel.pop(queueName, function(myData) { + console.log('data = ' + myData); + }); + + or, alternatively, pop off any of multiple queues + + channel.popAny(queueName1, queueName2, function(myData) { + console.log('data = ' + myData); + }); + + Once popping data from a queue, avoid pushing data to the same queue from the same connection, since + a hang could result. This appears to be a Redis limitation when using blocking reads. + +1. When done, quit the Channel instance + + channel.disconnect(); + + or, alternatively, if monitoring, end the connection + + channel.end(); + + or, if there may be a number of redis commands queued, + + channel.shutdownSoon(); + +###WorkQueueMgr Javascript Usage Example + +1. Ensure you have a Redis server installed and running. For example, once installed, you can run it locally by + + redis-server & + +1. Require `node-redis-queue` WorkQueueMgr + + var WorkQueueMgr = require('node-redis-queue').WorkQueueMgr; + +1. Create a WorkQueueMgr instance and connect to Redis + + var mgr = new WorkQueueMgr(); + mgr.connect(function () { + console.log('ready'); + myMainLogic(); + }); + + Alternatively, you can provide an existing Redis connection (i.e., a redis client instance) + + mgr.attach(redisConn); + +1. Optionally, handle error events + + mgr.on('error', function(error) { + console.log('Stopping due to: ' + error); + process.exit(); + }); + +1. Optionally, handle lost connection events + + mgr.on('end', function { + console.log('Connection lost'); + }); + +1. Create a work queue instance + + var queue = mgr.createQueue(queueName); + +1. Optionally, clear previous data from the queue, providing a callback + to handle the data. + + queue.clear(queueName, function() { + console.log('cleared'); + doImportantStuff(); + }); + +1. Optionally, send data to your queue + + queue.send(myData); + +1. Optionally, consume data from your queue and call ack when ready to consume another data item + + queue.consume(function(myData, ack) { + console.log('data = ' + myData); + ... + ack(); + }); + + or, alternatively, + + queue.consume(function(myData, ack) { + console.log('data = ' + myData); + ... + ack(); + }, arity); + + where arity is an integer indicating the number of async callbacks to schedule in parallel. See demo 04 for example usage. + + If multiple queues are being consumed, they are consumed with highest priority given to the queues consumed first (i.e., in the order in which the consume statements are executed). + + Note that ack(true) may be used to indicate that no further data is expected from the given work queue. + This is useful, for example, in testing, when a clean exit from a test case is desired. + + Once consuming from a queue, avoid sending data to the same queue from the same connection (i.e., the same mgr instance), + since a hang could result. This appears to be a Redis limitation when using blocking reads. You can test + `mgr.channel.outstanding` for zero to determine if it is OK to send on the same connection. + +1. Optionally, destroy a work queue if it no longer is needed. Assign null to the queue variable to free up memory. + + queue.destroy(); + queue = null; + +1. When done, quit the WorkQueueMgr instance + + mgr.disconnect(); + + or, alternatively, if consuming data from the queue, end the connection + + mgr.end(); + + or, if there may be a number of redis commands queued, + + mgr.shutdownSoon(); + diff --git a/demo/HOW_TO_RUN_DEMOS.md b/demo/HOW_TO_RUN_DEMOS.md new file mode 100644 index 0000000..94c75f8 --- /dev/null +++ b/demo/HOW_TO_RUN_DEMOS.md @@ -0,0 +1,69 @@ +##Running the demos -- preliminary steps + +1. Open two Git Bash console windows. +1. In each window run `cd demo/lib` and then `export NODE_PATH=../../..`. +1. If redis-server is not already running, open an additional console window and run `redis-server` or `redis-server &` to start the Redis server in the background. The demos assume default login, no password. + +##Running demo 01 -- Channel example + +This demo shows how to send a series of URLs to a consumer process that computes an SHA1 value for each URL. + +1. In the first console window Run `node worker01.js`. It will wait for some data to appear in the queue. +1. In the second console window, run `node provider01.js`, which will place four URLs in the queue. Shortly + thereafter, the worker01 process will pick up the four URLs and display them, fetch a page body for each, and compute an SHA1 value for each. +1. Repeat step 2 a few times. +1. In the second console window, run `node provider01.js stop`, which will put a stop command in the queue. Shortly + thereafter, the worker01 process will stop. + +##Running demo 02 -- Channel example + +This demo shows how to send a series of URLs to a consumer process that computes an SHA1 value for each URL and returns the SHA1 result to the provider process. + +1. In the first console window Run `node worker02.js`. It will wait for some data to appear in the queue. +1. In the second console window, run `node provider02.js 01`, which will place four URLs in the queue. Shortly + thereafter, the worker02 process will pick up the four URLs, display them, fetch a page body for each, and compute an SHA1 value for each, and then return the SHA1 result to the provider02 instance, which will display the result. +1. Repeat step 2 a few times. +1. In the second console window, run `node provider02.js stop`, which will put a stop command in the queue. Shortly + thereafter, the worker02 process will stop. + +##Running demo 03 -- WorkQueueMgr example + +This demo shows how a worker can service multiple queues using WorkQueueMgr. The provider process, by default, sends three strings to one queue and three strings to the other. + +1. In the first console window Run `node worker03.js`. It will wait for some data to appear in the queue. +1. In the second console window, run `node provider03.js`, which will place three strings in each queue. Shortly + thereafter, the worker03 process will pick up the six strings from their respective queues and display them. +1. Repeat step 2 a few times. +1. In the second console window, run `node provider03.js stop`, which will put a stop command in the queue. Shortly + thereafter, the worker03 process will stop. + +Note that, when running worker03, one optionally may use a 'mem' parameter to monitor memory usage. For example: + +`node worker03.js mem | grep '>>>'` + +When monitoring memory usage, run `node provider03.js 400` repeatedly, say as many as 50 times, to pump a sufficient amount of data to worker03 to detect any leaks. Sample memory usage output: + + >>>current = 3118200, max = 0 + >>>current = 3248152, max = 0 + >>>current = 3265896, max = 3265896 + >>>current = 3214184, max = 3265896 + >>>current = 3469112, max = 3469112 + >>>current = 3474064, max = 3474064 + >>>current = 3466856, max = 3474064 + >>>current = 3471904, max = 3474064 + >>>current = 3470080, max = 3474064 + +##Running demo 04 -- WorkQueueMgr example + +This demo is almost the same as demo 02 but uses WorkQueueMgr rather than Channel. It shows how to send a series of URLs to a consumer process that computes an SHA1 value for each URL and returns the SHA1 result to the provider process. + +1. In the first console window Run `node worker04.js`. It will wait for some data to appear in the queue. +1. In the second console window, run `node provider04.js 01`, which will place four URLs in the queue. Shortly + thereafter, the worker04 process will pick up the four URLs, display them, fetch a page body for each, and compute an SHA1 value for each, and then return the SHA1 result to the provider04 instance, which will display the result. +1. Repeat step 2 a few times. +1. In the second console window, run `node provider04.js stop`, which will put a stop command in the queue. Shortly + thereafter, the worker04 process will stop. + +Try the above again using `node worker04 3` in step 1. Observe that the worker will process three input requests in parallel and +that the results may become available in a different order than the input requests. + diff --git a/lib/channel.js b/lib/channel.js new file mode 100644 index 0000000..4faddad --- /dev/null +++ b/lib/channel.js @@ -0,0 +1,152 @@ +'use strict'; + +var Channel, ChannelError, events, + __hasProp = {}.hasOwnProperty, + __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }, + __slice = [].slice; + +events = require('events'); + +ChannelError = (function(_super) { + + __extends(ChannelError, _super); + + function ChannelError() { + return ChannelError.__super__.constructor.apply(this, arguments); + } + + return ChannelError; + +})(Error); + +Channel = (function(_super) { + + __extends(Channel, _super); + + function Channel(configFilePath) { + configFilePath = process.env.QUEUE_CONFIG_FILE || configFilePath || '../redis-queue-config.json'; + this.configurator = require('./redisQueueConfig'); + this.config = this.configurator.getConfig(configFilePath); + this.outstanding = 0; + } + + Channel.prototype.connect = function(onReady) { + this.client = this.configurator.getClient(this.config); + return this.attach(this.client, onReady); + }; + + Channel.prototype.attach = function(client, onReady) { + var _this = this; + this.client = client; + if (!(this.client instanceof Object)) { + throw new ChannelError('No client supplied'); + } + this.client.on('ready', function() { + _this.ready = true; + if (onReady && typeof onReady === 'function') { + onReady(); + } + return _this.emit('ready'); + }); + this.client.on('error', function(err) { + return _this.emit('error', err); + }); + this.client.on('end', function() { + _this.ready = false; + return _this.emit('end'); + }); + this.client.on('drain', function() { + return _this.emit('drain'); + }); + return this; + }; + + Channel.prototype.push = function(key, data) { + this.client.lpush(key, JSON.stringify(data)); + return this; + }; + + Channel.prototype.pop = function(key, onData) { + var _this = this; + ++this.outstanding; + this.client.brpop(key, 0, function(err, replies) { + --_this.outstanding; + if (err != null) { + return _this.emit('error', err); + } else { + if ((replies != null) && replies instanceof Array && replies.length === 2) { + if (onData) { + return onData(JSON.parse(replies[1])); + } + } else { + if (replies != null) { + return _this.emit('error', new ChannelError('Replies not Array of two elements')); + } + } + } + }); + return this; + }; + + Channel.prototype.popAny = function() { + var keys, onData, _i, _ref, + _this = this; + keys = 2 <= arguments.length ? __slice.call(arguments, 0, _i = arguments.length - 1) : (_i = 0, []), onData = arguments[_i++]; + ++this.outstanding; + (_ref = this.client).brpop.apply(_ref, __slice.call(keys).concat([0], [function(err, replies) { + --_this.outstanding; + if (err != null) { + return _this.emit('error', err); + } else { + if ((replies != null) && replies instanceof Array && replies.length === 2) { + if (onData) { + return onData(replies[0], JSON.parse(replies[1])); + } + } else { + if (replies != null) { + return _this.emit('error', new ChannelError('Replies not Array of two elements')); + } + } + } + }])); + return this; + }; + + Channel.prototype.clear = function() { + var keysToClear, onClear, _i, _ref; + keysToClear = 2 <= arguments.length ? __slice.call(arguments, 0, _i = arguments.length - 1) : (_i = 0, []), onClear = arguments[_i++]; + return (_ref = this.client).del.apply(_ref, __slice.call(keysToClear).concat([onClear])); + }; + + Channel.prototype.disconnect = function() { + this.client.quit(); + return true; + }; + + Channel.prototype.end = function() { + this.client.end(); + return true; + }; + + Channel.prototype.shutdownSoon = function(delay) { + var _this = this; + return process.nextTick(function() { + if (_this.client.offline_queue.length === 0) { + return _this.client.end(); + } else { + return setTimeout(function() { + return _this.shutdownSoon(delay); + }, delay || 500); + } + }); + }; + + Channel.prototype.commandQueueLength = function() { + return this.client.command_queue.length; + }; + + return Channel; + +})(events.EventEmitter); + +exports.channel = Channel; diff --git a/lib/workQueueMgr.js b/lib/workQueueMgr.js new file mode 100644 index 0000000..0f2d561 --- /dev/null +++ b/lib/workQueueMgr.js @@ -0,0 +1,202 @@ +'use strict'; + +var Channel, WorkQueue, WorkQueueMgr, WorkQueueMgrError, events, + __hasProp = {}.hasOwnProperty, + __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }; + +events = require('events'); + +Channel = require('..').Channel; + +WorkQueue = (function() { + + function WorkQueue(queueName, send_, consume_, clear, destroy_) { + this.queueName = queueName; + this.send_ = send_; + this.consume_ = consume_; + this.clear = clear; + this.destroy_ = destroy_; + return this; + } + + WorkQueue.prototype.send = function(data) { + this.send_(data); + return this; + }; + + WorkQueue.prototype.consume = function(onData, arity) { + this.consume_(onData, arity); + return this; + }; + + WorkQueue.prototype.clear = function(onClearComplete) { + this.clear_(onClearComplete); + return this; + }; + + WorkQueue.prototype.destroy = function() { + this.destroy_(); + return this; + }; + + return WorkQueue; + +})(); + +WorkQueueMgrError = (function(_super) { + + __extends(WorkQueueMgrError, _super); + + function WorkQueueMgrError() { + return WorkQueueMgrError.__super__.constructor.apply(this, arguments); + } + + return WorkQueueMgrError; + +})(Error); + +WorkQueueMgr = (function(_super) { + + __extends(WorkQueueMgr, _super); + + function WorkQueueMgr(configFilePath) { + this.queues = {}; + this.consumingCB = {}; + this.consumingNames = []; + this.channel = new Channel(configFilePath); + } + + WorkQueueMgr.prototype.connect = function(onReady) { + this.channel.connect(onReady); + this.initEmitters_(); + return this; + }; + + WorkQueueMgr.prototype.attach = function(client) { + this.client = client; + this.channel.attach(this.client); + return this.initEmitters_(); + }; + + WorkQueueMgr.prototype.initEmitters_ = function() { + var _this = this; + this.channel.on('ready', function() { + return _this.emit('ready'); + }); + this.channel.on('error', function(err) { + return _this.emit('error', err); + }); + this.channel.on('timeout', function() { + return _this.emit('timeout'); + }); + return this.channel.on('end', function() { + return _this.emit('end'); + }); + }; + + WorkQueueMgr.prototype.createQueue = function(queueName, options) { + return this.queues[queueName] = new WorkQueue(queueName, this.send.bind(this, queueName), this.consume.bind(this, queueName), this.channel.clear.bind(this.channel, queueName), this.destroyQueue.bind(this, queueName)); + }; + + WorkQueueMgr.prototype.send = function(queueName, data) { + this.ensureValidQueueName(queueName); + this.channel.push(queueName, data); + return this; + }; + + WorkQueueMgr.prototype.consume = function(queueName, onData, arity) { + var _this = this; + if (arity == null) { + arity = 1; + } + this.ensureValidQueueName(queueName); + if (!this.consumingCB[queueName]) { + this.consumingNames.push(queueName); + } + this.consumingCB[queueName] = onData; + process.nextTick(function() { + var _results; + _results = []; + while (arity--) { + _results.push(_this.monitor_()); + } + return _results; + }); + return this; + }; + + WorkQueueMgr.prototype.ack_ = function(queueName, cancel) { + if (cancel) { + this.stopConsuming_(queueName); + } else { + this.monitor_(); + } + return this; + }; + + WorkQueueMgr.prototype.monitor_ = function() { + var args, + _this = this; + (args = this.consumingNames.slice()).push(function(queueName, data) { + if (_this.consumingCB[queueName]) { + return _this.consumingCB[queueName](data, _this.ack_.bind(_this, queueName)); + } + }); + return this.channel.popAny.apply(this.channel, args); + }; + + WorkQueueMgr.prototype.stopConsuming_ = function(queueName) { + this.consumingNames = this.consumingNames.reduce(function(acc, x) { + if (x !== queueName) { + acc.push(x); + } + return acc; + }, []); + delete this.consumingCB[queueName]; + return this; + }; + + WorkQueueMgr.prototype.destroyQueue = function(queueName) { + this.ensureValidQueueName(queueName); + if (this.consumingCB[queueName]) { + this.stopConsuming_(queueName); + } + delete this.queues[queueName]; + return this; + }; + + WorkQueueMgr.prototype.disconnect = function() { + this.channel.disconnect(); + return true; + }; + + WorkQueueMgr.prototype.end = function() { + this.channel.end(); + return true; + }; + + WorkQueueMgr.prototype.shutdownSoon = function() { + return this.channel.shutdownSoon(); + }; + + WorkQueueMgr.prototype.isValidQueueName = function(queueName) { + return this.queues[queueName] != null; + }; + + WorkQueueMgr.prototype.ensureValidQueueName = function(queueName) { + if (!this.queues[queueName]) { + throw new WorkQueueMgrError('Unknown queue "' + queueName + '"'); + } + }; + + WorkQueueMgr.prototype.commandQueueLength = function() { + return this.channel.commandQueueLength(); + }; + + return WorkQueueMgr; + +})(events.EventEmitter); + +exports.queue = WorkQueue; + +exports.mgr = WorkQueueMgr; diff --git a/src/workQueueMgr.coffee b/src/workQueueMgr.coffee new file mode 100644 index 0000000..da74269 --- /dev/null +++ b/src/workQueueMgr.coffee @@ -0,0 +1,124 @@ +'use strict' +events = require 'events' + +Channel = require('..').Channel + +class WorkQueue + constructor: (@queueName, @send_, @consume_, @clear, @destroy_) -> + return this + + send: (data) -> + @send_ data + return this + + consume: (onData, arity) -> + @consume_ onData, arity + return this + + clear: (onClearComplete) -> + @clear_ onClearComplete + return this + + destroy: -> + @destroy_() + return this + +class WorkQueueMgrError extends Error + +class WorkQueueMgr extends events.EventEmitter + constructor: (configFilePath) -> + @queues = {} + @consumingCB = {} + @consumingNames = [] + @channel = new Channel(configFilePath) + + connect: (onReady) -> + @channel.connect onReady + @initEmitters_() + return this + + attach: (@client) -> + @channel.attach @client + @initEmitters_() + + initEmitters_: -> + @channel.on 'ready', => + @emit 'ready' + @channel.on 'error', (err) => + @emit 'error', err + @channel.on 'timeout', => + @emit 'timeout' + @channel.on 'end', => + @emit 'end' + + createQueue: (queueName, options) -> + return @queues[queueName] = new WorkQueue queueName, + @send.bind(this, queueName), + @consume.bind(this, queueName), + @channel.clear.bind(@channel, queueName), + @destroyQueue.bind(this, queueName) + + send: (queueName, data) -> + @ensureValidQueueName queueName + @channel.push queueName, data + return this + + consume: (queueName, onData, arity = 1) -> + @ensureValidQueueName queueName + @consumingNames.push queueName unless @consumingCB[queueName] + @consumingCB[queueName] = onData + process.nextTick => + @monitor_() while arity-- + return this + + ack_: (queueName, cancel) -> + if cancel + @stopConsuming_ queueName + else + @monitor_() + return this + + monitor_: () -> + (args = @consumingNames.slice()).push (queueName, data) => + @consumingCB[queueName] data, @ack_.bind(this, queueName) if @consumingCB[queueName] + @channel.popAny.apply @channel, args + + stopConsuming_: (queueName) -> + @consumingNames = @consumingNames.reduce (acc,x) -> + acc.push x unless x is queueName + return acc + , [] + delete @consumingCB[queueName] + return this + + destroyQueue: (queueName) -> + @ensureValidQueueName queueName + @stopConsuming_ queueName if @consumingCB[queueName] + delete @queues[queueName] + return this + + disconnect: -> + @channel.disconnect() + return true + + end: -> + @channel.end() + return true + + shutdownSoon: -> + @channel.shutdownSoon() + + isValidQueueName: (queueName) -> + return @queues[queueName]? + + ensureValidQueueName: (queueName) -> + unless @queues[queueName] + throw (new WorkQueueMgrError 'Unknown queue "' + queueName + '"') + return + + commandQueueLength: -> + @channel.commandQueueLength() + +exports.queue = WorkQueue +exports.mgr = WorkQueueMgr +