From 2a88bdb0f5c116911e97f0dea910a6366b8411fa Mon Sep 17 00:00:00 2001 From: Jaroslav Libak Date: Tue, 11 Aug 2015 14:22:07 +0200 Subject: [PATCH] added PROCESSORS_PARALLELISM parameter to allow controlling how many data processors execute in parallel --- README.md | 1 + service/notification.js | 12 ++++++++++++ service/processors.js | 4 ++-- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 15fcdfa..ae17dac 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ Same storage path used by result upload service must be accessible to result pro Supported environment parameters: - AMQ_USER, AMQ_PASSWORD for authenticating to RabbitMQ. This is temporary until a better way to retrieve credentials by services is available. - PROCESSORS_PATH - path where result processors can be found. If not specified, "processors" directory is used +- PROCESSORS_PARALLELISM - number of data processors that can be executed in parallel. If not present, number of CPU cores is used. - MSGW_PORT - port for metrics gateway. If not specified, 8080 is used - METRICS_BATCH_SIZE - batch size to use when sending data to metrics gateway diff --git a/service/notification.js b/service/notification.js index 6b52a34..975b7ee 100644 --- a/service/notification.js +++ b/service/notification.js @@ -10,6 +10,7 @@ var log4js = require('log4js'); var amqp = require('amqplib'); var VError = require('verror'); var when = require('when'); +var os = require('os'); var logger = log4js.getLogger('notification.js'); @@ -34,6 +35,13 @@ function getAmqCredentials() { }; } +/** + * Returns number of processors that may execute in parallel. + */ +function getParallelism() { + return Number(process.env.PROCESSORS_PARALLELISM) || os.cpus().length; +} + /** * Returns controller that can be used to ack/nack given message. * @@ -113,6 +121,10 @@ function initQueues(channel, processorDescs, msgConsumer) { function initChannel(conn, processorDescs, msgConsumer) { // TODO: consider channel per processor, but since Node.js is 1 thread maybe its not needed return conn.createChannel().then(function(ch) { + var parallelism = getParallelism(); + logger.info('Allowing up to ' + parallelism + ' parallel data processor executions'); + ch.prefetch(parallelism, true); + // TODO: handle channel recreation in case of close caused by error function onClose() { channel = null; diff --git a/service/processors.js b/service/processors.js index a75793f..99fba3b 100644 --- a/service/processors.js +++ b/service/processors.js @@ -214,8 +214,8 @@ function executeProcessor(processorDesc, processingMetadata, contentMetadata) { child.once('error', function(err) { logger.debug('Execution of processor failed ', err); }); - child.once('exit', function(code) { - logger.debug('Processor exited with ' + code); + child.once('exit', function() { + logger.debug('Processor exited'); }); child.once('close', function() { readStream.destroy();