Skip to content
This repository has been archived by the owner on Jul 2, 2020. It is now read-only.

Commit

Permalink
added support for LOG_LEVEL parameter and passing of log level to dat…
Browse files Browse the repository at this point in the history
…a processors
  • Loading branch information
jaroslavl1 committed Sep 15, 2015
1 parent 899dca6 commit 5a108c8
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Supported environment parameters:
- PROCESSORS_PATH - path where result processors can be found. If not specified, "processors" directory is used
- PROCESSORS_PARALLELISM - number of data processors that can be executed in parallel. If not present, number of CPU cores is used.
- METRICS_BATCH_SIZE - batch size to use when sending data to metrics gateway
- LOG_LEVEL configures log level. TRACE, DEBUG, INFO, WARN, ERROR, FATAL are valid values.

## Building

Expand Down Expand Up @@ -96,5 +97,4 @@ Unless at least one processor is available the process will exit immediately. No
- related to the fact we don't store processor execution state/result
- currently there is no way for processor to tell the service version of the produced content. All result processors must thus produce data of the same version. Metrics gateway service may support multiple data format versions (i.e v1, v2 on its REST). If case of change we have to update code of all result processors.
- if needed this could be solved by processor descriptor saying what data it produces on STDOUT and for whom
- log level is not passed to data processor, it may result in unnecessary messages being sent to STDERR then being filtered out
- no support for chaining multiple processors after each other. Celery supports this. We could use more streams than STDOUT and processor descriptor could specify where the stream output should go (i.e metrics-gateway or other processor).
3 changes: 3 additions & 0 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
var log4js = require('log4js');
// replaces console.log function with log4j
log4js.replaceConsole();
if (process.env.LOG_LEVEL) {
log4js.setGlobalLogLevel(process.env.LOG_LEVEL);
}
var logger = log4js.getLogger('server.js');

var grace = require('grace');
Expand Down
7 changes: 6 additions & 1 deletion service/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ function getProcessorNames(processorDescs) {
*/
function msgConsumer(msg, ackControl) {
var contentStr = msg.content.toString();
logger.debug(" [x] Received '%s'", contentStr);
if (logger.isLevelEnabled(log4js.levels.TRACE)) {
logger.trace(" [x] Received '%s'", contentStr);
} else if (logger.isLevelEnabled(log4js.levels.DEBUG)) {
// with debug level log less
logger.debug(" [x] Received '%s...'", contentStr.substr(0, 50));
}
// process file
var processingMetadata = msg.properties.headers; // path, accessToken, tenantId
var contentMetadata = JSON.parse(contentStr);
Expand Down
4 changes: 3 additions & 1 deletion service/metrics-gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ function getSendUri() {
* @param callback callback that will be notified upon success/error
*/
function send(processingMetadata, data, callback) {
if (logger.level.level <= log4js.levels.DEBUG.level) {
if (logger.isLevelEnabled(log4js.levels.TRACE)) {
logger.debug('Sending to /mgs/rest/v1/gateway/event: ' + JSON.stringify(data));
} else if (logger.isLevelEnabled(log4js.levels.DEBUG)) {
logger.debug('Sending to /mgs/rest/v1/gateway/event: ' + JSON.stringify(data).substr(0, 50) + '...');
}

var body = JSON.stringify(Array.isArray(data) ? data : [data]);
Expand Down
18 changes: 14 additions & 4 deletions service/processors.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ var processorsMap = {};
var childLogRegExp = new RegExp('([A-Z]+):([^:]+):(.*)');

/**
* Map of supported data processor log levels. Corresponds to Python logger log levels.
* Map of Python log levels used by child processor logger to log4js log levels.
*/
var childLogLevelMap = {'DEBUG': 'debug', 'INFO': 'info', 'WARNING': 'warn', 'ERROR': 'error', 'CRITICAL': 'fatal'};
var childLogLevelMap = {'DEBUG': 'DEBUG', 'INFO': 'INFO', 'WARNING': 'WARN', 'ERROR': 'ERROR', 'CRITICAL': 'FATAL'};

/**
* Map of log4js level strings to Python log levels used by child processor logger.
*/
var log4jsToChildLogLevelMap = {'ALL': 'DEBUG', 'TRACE': 'DEBUG', 'DEBUG': 'DEBUG', 'INFO': 'INFO', 'WARN': 'WARNING',
'ERROR': 'ERROR', 'FATAL': 'CRITICAL', 'MARK': 'CRITICAL', 'OFF': 'CRITICAL'};

/**
* Verifies processor at given path with given descriptor. Performs verification of the descriptor and test execution of
Expand Down Expand Up @@ -185,16 +191,20 @@ function onLogFromChild(processorDesc, str) {
* has to use specialized argument parsing libraries which may be buggy or work differently. Its much easier to access
* process environment variables. Environment variables will be prefixed with P_. String case is preserved.
*
* @param processorDesc
* @param contentMetadata
*/
function getEnvParams(contentMetadata) {
function getEnvParams(processorDesc, contentMetadata) {
var envParams = util._extend({}, process.env);
var keys = Object.keys(contentMetadata);
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
// we convert key to uppercase since in Windows env variable keys are not case sensitive, while in Linux they are
envParams['P_' + key.toUpperCase()] = contentMetadata[key];
}
// translate log level
var log4jsLevel = processorDesc.logger.level.levelStr;
envParams['P_LOG_LEVEL'] = log4jsToChildLogLevelMap[log4jsLevel];
return envParams;
}

Expand Down Expand Up @@ -232,7 +242,7 @@ function exec(command, options) {
function executeProcessor(processorDesc, processingMetadata, contentMetadata) {
logger.debug('Executing processor ' + processorDesc.name);
var readStream = fs.createReadStream(processingMetadata.path);
var envParams = getEnvParams(contentMetadata);
var envParams = getEnvParams(processorDesc, contentMetadata);
var child = exec(processorDesc.command, {cwd: processorDesc.path, env: envParams});
child.once('error', function(err) {
logger.debug('Execution of processor failed ', err);
Expand Down
22 changes: 17 additions & 5 deletions tests/system/dev-processors/dummy/processor.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
'use strict';

console.error('Some sample logging');
var logLevelsMap = {
'DEBUG': 1, 'INFO': 2, 'WARNING': 3, 'ERROR': 4, 'CRITICAL': 5
};

log('DEBUG', 'processor.js', 'Some sample logging');

var paramCount = 0;
console.error('Process parameters:');
log('DEBUG', 'processor.js', 'Process parameters:');
var keys = Object.keys(process.env);
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
if (key.lastIndexOf('P_', 0) === 0) {
var value = process.env[key];
paramCount++;
console.error(key.substr(2) + ' = ' + value);
log('DEBUG', 'processor.js', key.substr(2) + ' = ' + value);
}
}

console.error('Process stdin:');
log('DEBUG', 'processor.js', 'Process stdin:');
process.stdin.setEncoding('utf8');
process.stdin.on('data', function(str) {
console.error(str);
log('DEBUG', 'processor.js', str);
});
process.stdin.on('end', function() {
process.exit(0);
Expand Down Expand Up @@ -46,3 +50,11 @@ for (var i = 0; i < 50; i++) {
}));
}
console.log(']');

function log(level, location, message) {
var logLevel = logLevelsMap[level];
var configuredLogLevel = logLevelsMap[process.env.P_LOG_LEVEL || 'DEBUG'];
if (logLevel >= configuredLogLevel) {
console.error(level + ':' + location + ':' + message);
}
}

0 comments on commit 5a108c8

Please sign in to comment.