Skip to content
This repository has been archived by the owner on Jul 2, 2020. It is now read-only.

Commit

Permalink
added PROCESSORS_PARALLELISM parameter to allow controlling how many …
Browse files Browse the repository at this point in the history
…data processors execute in parallel
  • Loading branch information
jaroslavl1 committed Aug 11, 2015
1 parent 6a0ee03 commit 2a88bdb
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Same storage path used by result upload service must be accessible to result pro
Supported environment parameters:
- AMQ_USER, AMQ_PASSWORD for authenticating to RabbitMQ. This is temporary until a better way to retrieve credentials by services is available.
- PROCESSORS_PATH - path where result processors can be found. If not specified, "processors" directory is used
- PROCESSORS_PARALLELISM - number of data processors that can be executed in parallel. If not present, number of CPU cores is used.
- MSGW_PORT - port for metrics gateway. If not specified, 8080 is used
- METRICS_BATCH_SIZE - batch size to use when sending data to metrics gateway

Expand Down
12 changes: 12 additions & 0 deletions service/notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var log4js = require('log4js');
var amqp = require('amqplib');
var VError = require('verror');
var when = require('when');
var os = require('os');

var logger = log4js.getLogger('notification.js');

Expand All @@ -34,6 +35,13 @@ function getAmqCredentials() {
};
}

/**
* Returns number of processors that may execute in parallel.
*/
function getParallelism() {
return Number(process.env.PROCESSORS_PARALLELISM) || os.cpus().length;
}

/**
* Returns controller that can be used to ack/nack given message.
*
Expand Down Expand Up @@ -113,6 +121,10 @@ function initQueues(channel, processorDescs, msgConsumer) {
function initChannel(conn, processorDescs, msgConsumer) {
// TODO: consider channel per processor, but since Node.js is 1 thread maybe its not needed
return conn.createChannel().then(function(ch) {
var parallelism = getParallelism();
logger.info('Allowing up to ' + parallelism + ' parallel data processor executions');
ch.prefetch(parallelism, true);

// TODO: handle channel recreation in case of close caused by error
function onClose() {
channel = null;
Expand Down
4 changes: 2 additions & 2 deletions service/processors.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ function executeProcessor(processorDesc, processingMetadata, contentMetadata) {
child.once('error', function(err) {
logger.debug('Execution of processor failed ', err);
});
child.once('exit', function(code) {
logger.debug('Processor exited with ' + code);
child.once('exit', function() {
logger.debug('Processor exited');
});
child.once('close', function() {
readStream.destroy();
Expand Down

0 comments on commit 2a88bdb

Please sign in to comment.