diff --git a/etc/rsyslog/rsyslog.conf b/etc/rsyslog/rsyslog.conf new file mode 100644 index 0000000..1e387d5 --- /dev/null +++ b/etc/rsyslog/rsyslog.conf @@ -0,0 +1,7 @@ +module(load="imuxsock") +module(load="imklog") + +$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat + +*.* /var/log/messages +*.* @RSYSLOG_SERVER diff --git a/lightning-rod.js b/lightning-rod.js index 05e61e2..0318cf6 100644 --- a/lightning-rod.js +++ b/lightning-rod.js @@ -118,7 +118,15 @@ manageBoard.Init_Ligthning_Rod(function (check) { //---------------------------------------- var wampUrl = url_wamp+":"+port_wamp+"/ws"; - wampIP = wampUrl.split("//")[1].split(":")[0]; + try{ + wampIP = wampUrl.split("//")[1].split(":")[0]; + + } + catch(err){ + logger.warn('[WAMP] - Error in WAMP url format: '+ err); + process.exit(1); + } + //var wampRealm = nconf.get('config:wamp:realm'); var wampRealm = realm; @@ -152,11 +160,12 @@ manageBoard.Init_Ligthning_Rod(function (check) { logger.info('[WAMP] |--> Realm: '+ wampRealm); logger.info('[WAMP] |--> Session ID: '+ session._id); //logger.debug('[WAMP] |--> Connection details:\n'+ JSON.stringify(details)); - logger.info('[WAMP] |--> socket check timing: '+ wamp_socket_recovery + " - Timing: " + rpc_alive_time + " seconds"); //--WAMP-RECOVERY-SYSTEM-INIT------------------------------------------------------------------------------- if(wamp_socket_recovery == true || wamp_socket_recovery == "true") { + logger.info('[WAMP] |--> socket check timing: '+ wamp_socket_recovery + " - Timing: " + rpc_alive_time + " seconds"); + RECOVERY_SESSION = session; socketNotAlive = false; @@ -303,94 +312,6 @@ manageBoard.Init_Ligthning_Rod(function (check) { }); - /* - - // Check if the tcpkill process was killed after a previous connection recovery through this check we will avoid to start another tcpkill process - var tcpkill_status = running(tcpkill_pid); - logger.warn("[WAMP-RECOVERY] - TCPKILL STATUS: " + tcpkill_status + " - PID: " + tcpkill_pid); - - // at LR startup "tcpkill_pid" is NULL and in this condition "is-running" module return "true" that is a WRONG result! - if (tcpkill_status === false || tcpkill_pid == null){ - - logger.warn("[WAMP-RECOVERY] - TCPKILL - Cleaning WAMP zombie-socket..."); - - var tcpkill_kill_count = 0; - - //e.g.: tcpkill -9 port 8181 - tcpkill = spawn('tcpkill',['-9','port', port_wamp]); - tcpkill_pid = tcpkill.pid; - - tcpkill.stdout.on('data', function (data) { - logger.debug('[WAMP-RECOVERY] ... tcpkill stdout: ' + data); - }); - - tcpkill.stderr.on('data', function (data) { - - logger.debug('[WAMP-RECOVERY] ... tcpkill stderr:\n' + data); - - //it will check if tcpkill is in listening state on the port 8181 - if(data.toString().indexOf("listening") > -1){ - - // LISTENING: to manage the starting of tcpkill (listening on port 8181) - - }else if (data.toString().indexOf("win 0") > -1){ - - // TCPKILL DETECTED WAMP ACTIVITY (WAMP reconnection attempts) - // This is the stage triggered when the WAMP socket was killed by tcpkill and WAMP reconnection process automaticcally started: - // in this phase we need to kill tcpkill to allow WAMP reconnection. - try{ - - logger.debug('[WAMP-RECOVERY] ... killing tcpkill process with PID: ' + tcpkill_pid); - tcpkill.kill('SIGKILL'); //process.kill(tcpkill_pid); - - //double check: It will test after a while if the tcpkill process has been killed, OTHERWISE reboot board. - setTimeout(function(){ - - if ( running(tcpkill_pid)){ - - //manageBoard.execAction(['reboot']); - tcpkill.kill('SIGKILL'); //process.kill(tcpkill_pid); - //wampConnection.close(); - - } - - }, 2000); - - - }catch (e) { - - logger.error('[WAMP-RECOVERY] ... tcpkill killing error: ', e); - - } - - - } - - - }); - - tcpkill.on('close', function (code) { - - logger.debug('[WAMP-RECOVERY] ... tcpkill killed [code: '+code+']'); - logger.info("[WAMP-RECOVERY] - WAMP socket cleaned!"); - - //The previous WAMP socket was KILLED and the automatic WAMP recovery process will start - //so the connection recovery is completed and "online" flag is set again to TRUE - online = true; - socketNotAlive = false; - clearTimeout(expectIotronicResponse); - - }); - - - } - else{ - logger.warn('[WAMP-RECOVERY] ...tcpkill already started!'); - } - - */ - - } diff --git a/modules/board-manager/board-management.js b/modules/board-manager/board-management.js index d2bb333..df272b1 100644 --- a/modules/board-manager/board-management.js +++ b/modules/board-manager/board-management.js @@ -227,11 +227,11 @@ exports.moduleLoaderOnBoot = function() { logger.info("[SYSTEM] - Modules loading:"); var configFile = JSON.parse(fs.readFileSync(SETTINGS, 'utf8')); - var modules = configFile.config["board"]["modules"]; //console.log(modules); - var modules_keys = Object.keys( modules ); //console.log(modules_keys); + + //STARTING ENABLED MANAGERS for (var i = 0; i < modules_keys.length; i++) { (function (i) { @@ -261,11 +261,9 @@ exports.moduleLoaderOnBoot = function() { nodeRedManager.Boot(); break; - - case 'vnets_manager': - logger.info("[SYSTEM] --> " + module_name + ": " + enabled); - break; - + case 'vnets_manager': + logger.info("[SYSTEM] --> " + module_name + ": " + enabled); + break; case 'gpio_manager': logger.info("[SYSTEM] --> " + module_name + ": " + enabled); @@ -285,7 +283,6 @@ exports.moduleLoaderOnBoot = function() { break; - } @@ -294,9 +291,145 @@ exports.moduleLoaderOnBoot = function() { } -}; + //START BOARD-MANAGER CONNECTIONS TESTS PROCEDURES + if(wamp_socket_recovery == true || wamp_socket_recovery == "true") { + + logger.info('[BOOT] - Board Manager connection controller starting...'); + logger.info('[BOOT] --> Crossbar IP: ' + wampIP + " - Port: " + port_wamp); + + // connectionTester: library used to check the reachability of Iotronic-Server/WAMP-Server + var connectionTester = require('connection-tester'); + + conn_alive_timer = 60; //second between check-connection retries + conn_retry_counter = 0; //counter for check-connection retries + + setTimeout(function () { + + var output = connectionTester.test(wampIP, port_wamp, 10000); + var reachable = output.success; + var error_test = output.error; + + if (!reachable) { + + //CONNECTION STATUS: FALSE + logger.warn("[BOARD-CONNECTION-RECOVERY] - INTERNET CONNECTION STATUS: " + reachable + " - ERROR: " + error_test); + + checkInternetWampConnection = setInterval(function () { + + //logger.warn("[BOARD-CONNECTION-RECOVERY] - RETRY..."); + + connectionTester.test(wampIP, port_wamp, 10000, function (err, output) { + + var reachable = output.success; + var error_test = output.error; + + if (!reachable) { + + //CONNECTION STATUS: FALSE + logger.warn("[BOARD-CONNECTION-RECOVERY] - INTERNET CONNECTION STATUS: " + reachable + " - ERROR: " + error_test); + + } else { + + try { + + // Test if IoTronic is connected to the realm + board_session.call("s4t.iotronic.isAlive", [boardCode]).then( + function (response) { + + conn_retry_counter = 0; + clearInterval(checkInternetWampConnection); + + }, + function (err) { + + logger.warn("NO WAMP CONNECTION YET!") + + } + ); + + } catch (err) { + logger.warn('[BOARD-CONNECTION-RECOVERY] - Error calling "s4t.iotronic.isAlive"'); + } + + + } + + }); + + + }, conn_alive_timer * 1000); + + } else { + + checkInternetWampConnection = setInterval(function () { + + conn_retry_counter = conn_retry_counter + 1; + + try { + + // Test if IoTronic is connected to the realm + board_session.call("s4t.iotronic.isAlive", [boardCode]).then( + function (response) { + + conn_retry_counter = 0; + clearInterval(checkInternetWampConnection); + + }, + function (err) { + logger.warn("[CONN-RETRY] - " + conn_retry_counter); + logger.warn("[BOARD-CONNECTION-RECOVERY] - No WAMP connection yet!") + } + ); + + } catch (err) { + + logger.warn("[CONN-RETRY] - " + conn_retry_counter); + logger.warn('[BOARD-CONNECTION-RECOVERY] - Internet connection available BUT wamp session not established!'); + logger.warn("--> WAMP connection error:" + err); + + } + + if (conn_retry_counter >= 5) { + + logger.warn("LR restarting in 5 seconds"); + + restart_time = 5; + + // activate listener on-exit event after LR exit on-update-conf + process.on("exit", function () { + + require("child_process").spawn(process.argv.shift(), process.argv, { + cwd: process.cwd(), + detached: true, + stdio: "inherit" + }); + + }); + + //Restarting LR + setTimeout(function () { + + process.exit(); + + }, restart_time * 1000); + + + } + + + }, conn_alive_timer * 1000); + + } + + + }, 5000); + + } + + +}; // Init() LR function in order to control the correct LR configuration: @@ -347,8 +480,13 @@ exports.Init_Ligthning_Rod = function (callback) { } else { - log4js.addAppender(log4js.appenders.file(logfile)); - logger = log4js.getLogger('main'); //service logging configuration: "main" + try{ + log4js.addAppender(log4js.appenders.file(logfile)); + logger = log4js.getLogger('main'); //service logging configuration: "main" + } + catch (err) { + console.log("Error in log folder creation!") + } LogoLR(); @@ -938,13 +1076,10 @@ exports.execAction = function(args){ case 'restart_lr': try{ - var params = JSON.parse(params); } catch (err) { - - logger.debug("[SYSTEM] --> parsing parameters error: "+JSON.stringify(err)); } @@ -959,9 +1094,6 @@ exports.execAction = function(args){ } - - - // activate listener on-exit event after LR exit on-update-conf logger.debug("[SYSTEM] --> Listener on process 'exit' event activated:"); logger.debug("[SYSTEM] --> Lightning-rod PID: " + process.pid); @@ -1002,15 +1134,12 @@ exports.execAction = function(args){ } - - return d.promise; }; - var managePackage = function (pkg_mng, pkg_mng_cmd, pkg_opts, pkg_name, callback) { var response = { @@ -1141,6 +1270,7 @@ exports.getLRversion = function () { }; + // Login to Crossbar server and to Iotornic exports.IotronicLogin = function (session) { diff --git a/modules/plugins-manager/manage-plugins.js b/modules/plugins-manager/manage-plugins.js index b1fb88b..3d92216 100644 --- a/modules/plugins-manager/manage-plugins.js +++ b/modules/plugins-manager/manage-plugins.js @@ -170,7 +170,8 @@ function pluginStarter(plugin_name, timer, plugin_json_name, skip, plugin_checks - }else{ + } + else{ // the plugin is normally running console.log('[PLUGIN] - PluginChecker - '+ plugin_name + ' with PID: ' + plugins[plugin_name].pid + ' alive: '+ plugins[plugin_name].alive ); @@ -574,17 +575,27 @@ function pyAsyncStarter(plugin_name, plugin_json, plugin_checksum, action, versi }; - // Remove an existing socket - fs.unlink(socketPath, function(){ - // Create the server, give it our callback handler and listen at the path + try{ + // Remove an existing socket + fs.unlink(socketPath, function(){ + // Create the server, give it our callback handler and listen at the path - s_server = net.createServer(handler).listen(socketPath, function(){ - logger.debug('[PLUGIN-SOCKET] - Socket in listening...'); - logger.debug('[PLUGIN-SOCKET] --> socket: '+socketPath); - }) + s_server = net.createServer(handler).listen(socketPath, function(){ + logger.debug('[PLUGIN-SOCKET] - Socket in listening...'); + logger.debug('[PLUGIN-SOCKET] --> socket: '+socketPath); + }) - } - ); + } + ); + } + catch(err){ + + response.result = "ERROR"; + response.message = '(async) Error unlink socket file: ' + err; + logger.error('[PLUGIN] - '+plugin_name + ' - '+response.message); + d.resolve(response); + + } var options = { mode: 'text', @@ -611,43 +622,40 @@ function pyAsyncStarter(plugin_name, plugin_json, plugin_checksum, action, versi //Get the autostart parameter from the schema just uploaded plugin_autostart = pluginsSchemaConf.autostart; - } catch(err){ response.result = "ERROR"; response.message = 'Error parsing plugins.json configuration file: ' + err; logger.error('[PLUGIN] - '+plugin_name + ' - '+response.message); - d.resolve(response); } + if (action == "start") { - if(action == "start") { - - fs.writeFile(schema_outputFilename, plugin_json, function(err) { + fs.writeFile(schema_outputFilename, plugin_json, function (err) { - if(err) { + if (err) { response.result = "ERROR"; - response.message = 'Error opening '+plugin_name+'.json file: ' + err; - logger.error('[PLUGIN] - "'+plugin_name + '" - '+response.message); + response.message = 'Error opening ' + plugin_name + '.json file: ' + err; + logger.error('[PLUGIN] - "' + plugin_name + '" - ' + response.message); d.resolve(response); } else { - logger.info('[PLUGIN] - '+ plugin_name + ' - Plugin JSON schema saved to ' + schema_outputFilename); + logger.info('[PLUGIN] - ' + plugin_name + ' - Plugin JSON schema saved to ' + schema_outputFilename); // Updating the plugins.json file: // - check if the user changed the autostart parameter at this stage - if(plugin_autostart != undefined){ + if (plugin_autostart != undefined) { pluginsConf.plugins[plugin_name].autostart = plugin_autostart; - logger.info('[PLUGIN] - '+ plugin_name + ' - Autostart parameter set by user to ' + plugin_autostart); + logger.info('[PLUGIN] - ' + plugin_name + ' - Autostart parameter set by user to ' + plugin_autostart); } else { - logger.info('[PLUGIN] - '+ plugin_name + ' - Autostart parameter not changed!'); + logger.info('[PLUGIN] - ' + plugin_name + ' - Autostart parameter not changed!'); } @@ -655,12 +663,12 @@ function pyAsyncStarter(plugin_name, plugin_json, plugin_checksum, action, versi pluginsConf.plugins[plugin_name].status = "on"; pluginsConf.plugins[plugin_name].pid = PY_PID; - fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function(err) { + fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function (err) { - if(err) { - logger.error('[PLUGIN] - '+ plugin_name + ' - Error opening '+plugin_name+'.json file: ' + err); + if (err) { + logger.error('[PLUGIN] - ' + plugin_name + ' - Error opening ' + plugin_name + '.json file: ' + err); } else { - logger.info('[PLUGIN] - '+ plugin_name + ' - plugins.json updated -> autostart < ' + pluginsConf.plugins[plugin_name].autostart + ' > - status < '+ pluginsConf.plugins[plugin_name].status + ' > ' + pluginsConf.plugins[plugin_name].pid); + logger.info('[PLUGIN] - ' + plugin_name + ' - plugins.json updated -> autostart < ' + pluginsConf.plugins[plugin_name].autostart + ' > - status < ' + pluginsConf.plugins[plugin_name].status + ' > ' + pluginsConf.plugins[plugin_name].pid); // Start a timer to check every X minutes if the plugin is still alive! exports.pluginKeepAlive(plugin_name, plugin_checksum); @@ -674,19 +682,19 @@ function pyAsyncStarter(plugin_name, plugin_json, plugin_checksum, action, versi }); } - else if(action == "restart"){ + else if (action == "restart") { // - change the plugin status from "off" to "on" and update the PID value pluginsConf.plugins[plugin_name].status = "on"; pluginsConf.plugins[plugin_name].pid = PY_PID; - fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function(err) { + fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function (err) { - if(err) { - logger.error('[PLUGIN] - '+ plugin_name + ' - Error opening '+plugin_name+'.json file: ' + err); + if (err) { + logger.error('[PLUGIN] - ' + plugin_name + ' - Error opening ' + plugin_name + '.json file: ' + err); } else { - logger.info('[PLUGIN] - '+ plugin_name + ' - plugins.json updated -> autostart < ' + pluginsConf.plugins[plugin_name].autostart + ' > - status < '+ pluginsConf.plugins[plugin_name].status + ' > ' + pluginsConf.plugins[plugin_name].pid); + logger.info('[PLUGIN] - ' + plugin_name + ' - plugins.json updated -> autostart < ' + pluginsConf.plugins[plugin_name].autostart + ' > - status < ' + pluginsConf.plugins[plugin_name].status + ' > ' + pluginsConf.plugins[plugin_name].pid); } @@ -708,11 +716,17 @@ function pyAsyncStarter(plugin_name, plugin_json, plugin_checksum, action, versi if (err){ - PLUGIN_LOGGERS[plugin_name].warn("Plugin '"+plugin_name+"' error logs: \n" + JSON.stringify(err, null, "\t")); + try{ + + PLUGIN_LOGGERS[plugin_name].warn("Plugin '"+plugin_name+"' error logs: \n" + JSON.stringify(err, null, "\t")); + } + catch(err){ + logger.warn('[PLUGIN] - '+plugin_name + ' - Plugin logger error: ' + err); + logger.warn("Plugin '"+plugin_name+"' error logs: \n" + JSON.stringify(err, null, "\t")); + } response.result = "ERROR"; response.message = "Error plugin execution: please check plugin logs: \n" + err.traceback; - //console.log(err); pluginsConf.plugins[plugin_name].status = "off"; pluginsConf.plugins[plugin_name].pid = ""; @@ -721,87 +735,92 @@ function pyAsyncStarter(plugin_name, plugin_json, plugin_checksum, action, versi fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function(err) { if(err) { - + clearPluginTimer(plugin_name); response.result = "ERROR"; response.message = 'Error writing plugins.json: '+ err; logger.error('[PLUGIN] - pyshell error in plugin '+plugin_name + ' error: '+response.message); d.resolve(response); } else { - logger.debug("[PLUGIN] --> " + PLUGINS_SETTING + " updated!"); clearPluginTimer(plugin_name); - d.resolve(response); - } }) - } else{ - logger.debug('[PLUGIN-SHELL] - Python shell of "'+plugin_name+'" plugin terminated: {signal: '+ signal+', code: '+code+'}'); + try { - if(signal == null && code == 0){ + logger.debug('[PLUGIN-SHELL] - Python shell of "' + plugin_name + '" plugin terminated: {signal: ' + signal + ', code: ' + code + '}'); - logger.warn("[PLUGIN-SHELL] --> unexpected '"+plugin_name+"' plugin termination!"); + if (signal == null && code == 0) { - pluginsConf.plugins[plugin_name].status = "off"; - pluginsConf.plugins[plugin_name].pid = ""; + logger.warn("[PLUGIN-SHELL] --> unexpected '" + plugin_name + "' plugin termination!"); - // updates the JSON file - fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function(err) { + pluginsConf.plugins[plugin_name].status = "off"; + pluginsConf.plugins[plugin_name].pid = ""; - if(err) { + // updates the JSON file + fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function (err) { - response.result = "ERROR"; - response.message = 'Error writing plugins.json: '+ err; - logger.error('[PLUGIN] - stop plugin '+plugin_name + ' error: '+response.message); - d.resolve(response); + if (err) { - } else { + response.result = "ERROR"; + response.message = 'Error writing plugins.json: ' + err; + logger.error('[PLUGIN] - stop plugin ' + plugin_name + ' error: ' + response.message); + d.resolve(response); - logger.debug("[PLUGIN] --> " + PLUGINS_SETTING + " updated!"); - clearPluginTimer(plugin_name); + } else { - iotronic_plugin_status = "failed"; - session_plugins.call('s4t.iotronic.plugin.updateStatus', [boardCode, plugin_name, version, iotronic_plugin_status]).then( + logger.debug("[PLUGIN] --> " + PLUGINS_SETTING + " updated!"); + clearPluginTimer(plugin_name); - function (rpc_response) { + iotronic_plugin_status = "failed"; + session_plugins.call('s4t.iotronic.plugin.updateStatus', [boardCode, plugin_name, version, iotronic_plugin_status]).then( + function (rpc_response) { - if (rpc_response.result == "ERROR") { + if (rpc_response.result == "ERROR") { - response.result = "ERROR"; - response.message = 'Error notification plugin status: '+ rpc_response.message; - logger.error("[PLUGIN] --> Error notification plugin status for '" + plugin_name + "' plugin: " + rpc_response.message); - d.resolve(response); + response.result = "ERROR"; + response.message = 'Error notification plugin status: ' + rpc_response.message; + logger.error("[PLUGIN] --> Error notification plugin status for '" + plugin_name + "' plugin: " + rpc_response.message); + d.resolve(response); - } - else { + } else { - logger.debug("[PLUGIN] - Iotronic updating status response: " + rpc_response.message); + logger.debug("[PLUGIN] - Iotronic updating status response: " + rpc_response.message); - response.result = "SUCCESS"; - response.message = 'Plugin environment cleaned and Iotronic status updated to ' + iotronic_plugin_status; - logger.info("[PLUGIN] - plugin '"+plugin_name + "': "+response.message); - d.resolve(response); + response.result = "SUCCESS"; + response.message = 'Plugin environment cleaned and Iotronic status updated to ' + iotronic_plugin_status; + logger.info("[PLUGIN] - plugin '" + plugin_name + "': " + response.message); + d.resolve(response); + + } } + ); - } - ); + } - } + }); + + } else { + logger.debug("[PLUGIN-SHELL] --> Python plugin '" + plugin_name + "' terminated!") + } - }); - - }else{ - logger.debug("[PLUGIN-SHELL] --> Python plugin '"+plugin_name+"' terminated!") } + catch(err){ + response.result = "ERROR"; + response.message = 'Error in pyshell.end (closing): ' + err; + logger.error('[PLUGIN] - '+plugin_name + ' - '+response.message); + d.resolve(response); + + } } @@ -882,109 +901,113 @@ function pySyncStarter(plugin_name, version, plugin_json) { }; - // Remove an existing plugin socket - fs.unlink(socketPath, function(){ + try { + // Remove an existing plugin socket + fs.unlink(socketPath, function () { - var plugin_folder = PLUGINS_STORE + plugin_name; - var schema_outputFilename = plugin_folder + "/" + plugin_name + '.json'; + var plugin_folder = PLUGINS_STORE + plugin_name; + var schema_outputFilename = plugin_folder + "/" + plugin_name + '.json'; - // Create the server, give it our callback handler and listen at the path - s_server = net.createServer(handler).listen(socketPath, function() { - logger.debug('[PLUGIN-SOCKET] - Socket in listening...'); - logger.debug('[PLUGIN-SOCKET] --> socket: '+socketPath); + // Create the server, give it our callback handler and listen at the path + s_server = net.createServer(handler).listen(socketPath, function () { + logger.debug('[PLUGIN-SOCKET] - Socket in listening...'); + logger.debug('[PLUGIN-SOCKET] --> socket: ' + socketPath); - // after socket creation we will start the plugin wrapper - var options = { - mode: 'text', - pythonPath: '/usr/bin/python3', - pythonOptions: ['-u'], - scriptPath: __dirname, - args: [plugin_name, version, plugin_json] - }; + // after socket creation we will start the plugin wrapper + var options = { + mode: 'text', + pythonPath: '/usr/bin/python3', + pythonOptions: ['-u'], + scriptPath: __dirname, + args: [plugin_name, version, plugin_json] + }; - pyshell = new PythonShell('./python/sync-wrapper.py', options); - // it will create a python instance like this: - // python -u /opt/stack4things/lightning-rod/modules/plugins-manager/python/sync-wrapper.py py_sync {"name":"S4T"} + pyshell = new PythonShell('./python/sync-wrapper.py', options); + // it will create a python instance like this: + // python -u /opt/stack4things/lightning-rod/modules/plugins-manager/python/sync-wrapper.py py_sync {"name":"S4T"} - logger.debug("[PLUGIN-SHELL] - PID wrapper: "+pyshell.childProcess.pid); + logger.debug("[PLUGIN-SHELL] - PID wrapper: " + pyshell.childProcess.pid); - if(logger.level.levelStr == 'DEBUG') + if (logger.level.levelStr == 'DEBUG') // listening 'print' output - pyshell.on('message', function (message) { - // received a message sent from the Python script (a simple "print" statement) - console.log("[PLUGIN-WRAPPER] - PYTHON: "+message); - }); - + pyshell.on('message', function (message) { + // received a message sent from the Python script (a simple "print" statement) + console.log("[PLUGIN-WRAPPER] - PYTHON: " + message); + }); - // end the input stream and allow the process to exit - pyshell.end(function (err, code, signal) { - if (err){ + // end the input stream and allow the process to exit + pyshell.end(function (err, code, signal) { - response.result = "ERROR"; - response.message = err; - d.resolve(response); - - }else{ - logger.debug('[PLUGIN-SHELL] - Python shell terminated: {signal: '+ signal+', code: '+code+'}'); - } - - }); + if (err) { + response.result = "ERROR"; + response.message = err; + d.resolve(response); - //update parameters and plugins.json conf file - fs.writeFile(schema_outputFilename, plugin_json, function(err) { + } else { + logger.debug('[PLUGIN-SHELL] - Python shell terminated: {signal: ' + signal + ', code: ' + code + '}'); + } - if(err) { + }); - logger.error('[PLUGIN] --> Error parsing '+plugin_name+'.json file: ' + err); - } else { + //update parameters and plugins.json conf file + fs.writeFile(schema_outputFilename, plugin_json, function (err) { - logger.debug('[PLUGIN] --> Plugin JSON schema saved to ' + schema_outputFilename); + if (err) { - try{ + logger.error('[PLUGIN] --> Error parsing ' + plugin_name + '.json file: ' + err); - //Reading the plugin configuration file - var pluginsConf = JSON.parse(fs.readFileSync(PLUGINS_SETTING, 'utf8')); + } else { - // - change the plugin status from "off" to "on" and update the PID value - pluginsConf.plugins[plugin_name].status = "on"; - pluginsConf.plugins[plugin_name].pid = pyshell.childProcess.pid; + logger.debug('[PLUGIN] --> Plugin JSON schema saved to ' + schema_outputFilename); - //updates the JSON file - fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function(err) { + try { - if(err) { - logger.error('[PLUGIN] --> Error writing plugins.json file: ' + err); - } else { - logger.debug("[PLUGIN] --> JSON file plugins.json updated -> " + plugin_name + ': status < '+ pluginsConf.plugins[plugin_name].status + ' > ' + pluginsConf.plugins[plugin_name].pid); - } + //Reading the plugin configuration file + var pluginsConf = JSON.parse(fs.readFileSync(PLUGINS_SETTING, 'utf8')); - }); + // - change the plugin status from "off" to "on" and update the PID value + pluginsConf.plugins[plugin_name].status = "on"; + pluginsConf.plugins[plugin_name].pid = pyshell.childProcess.pid; - } - catch(err){ - logger.error('Error updating JSON file plugins.json: '+ JSON.stringify(err)); - } + //updates the JSON file + fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function (err) { + if (err) { + logger.error('[PLUGIN] --> Error writing plugins.json file: ' + err); + } else { + logger.debug("[PLUGIN] --> JSON file plugins.json updated -> " + plugin_name + ': status < ' + pluginsConf.plugins[plugin_name].status + ' > ' + pluginsConf.plugins[plugin_name].pid); + } + }); - } + } catch (err) { + logger.error('Error updating JSON file plugins.json: ' + JSON.stringify(err)); + } - }); + } - }) + }); - } + }) - ); + } + ); + } + catch(err){ + response.result = "ERROR"; + response.message = '(sync) Error unlink socket file: ' + err; + logger.error('[PLUGIN] - '+plugin_name + ' - '+response.message); + d.resolve(response); + } return d.promise; @@ -1433,7 +1456,7 @@ exports.pluginsBootLoader = function (){ if(enabled_num > 0) { - logger.info('[PLUGIN] |- Restarting enabled plugins on the device: '); + logger.info('[PLUGIN] - Restarting enabled plugins on the device: '); for (var i = 0; i < enabled_num; i++) { @@ -1478,7 +1501,8 @@ exports.pluginsBootLoader = function (){ } - }else{ + } + else{ logger.info('[PLUGIN] - No enabled plugins to be restarted!'); } @@ -1528,7 +1552,7 @@ exports.pluginsLoader = function (){ if(plugin_num > 0) { - var enabledPlugins = { "plugins":{}}; + var enabledPlugins = { "plugins":{} }; for (var i = 0; i < plugin_num; i++) { @@ -1556,7 +1580,7 @@ exports.pluginsLoader = function (){ if(enabled_num > 0) { - logger.info('[PLUGIN] |- Restarting enabled plugins on the device: '); + logger.info('[PLUGIN] - Restarting enabled plugins on the device: '); //console.log(enabledPlugins); for (var i = 0; i < enabled_num; i++) { @@ -1569,7 +1593,6 @@ exports.pluginsLoader = function (){ var plugin_type = enabledPlugins.plugins[plugin_name].type; var plugin_version = enabledPlugins.plugins[plugin_name].version; - if(plugin_type == "nodejs") var ext = '.js'; else if(plugin_type == "python") @@ -2095,13 +2118,13 @@ exports.kill = function (args){ fs.writeFile(PLUGINS_SETTING, JSON.stringify(pluginsConf, null, 4), function(err) { if(err) { - + clearPluginTimer(plugin_name); response.result = "ERROR"; response.message = 'Error writing plugins.json: '+ err; logger.error('[PLUGIN] - stop plugin '+plugin_name + ' error: '+response.message); d.resolve(response); - - } else { + } + else { logger.debug("[PLUGIN] --> " + PLUGINS_SETTING + " updated!"); clearPluginTimer(plugin_name); response.result = "SUCCESS"; @@ -2395,7 +2418,6 @@ exports.restartPlugin = function(args){ var pluginsConf = JSON.parse(fs.readFileSync(PLUGINS_SETTING, 'utf8')); var plugin_type = pluginsConf.plugins[plugin_name].type; - } catch(err){ response.result = "ERROR"; @@ -2450,9 +2472,10 @@ exports.restartPlugin = function(args){ ); - }else { + } + else { - console.log(response) + console.log(response); response.result = "ERROR"; response.message = "Error restarting plugin '" + plugin_name + "' during killing procedure... please retry."; @@ -2488,8 +2511,6 @@ exports.restartPlugin = function(args){ }; - - // RPC to manage the removal of a plugin from the device: it is called by Iotronic via RPC exports.getPluginLogs = function(args){ @@ -2580,6 +2601,8 @@ exports.Boot = function (){ exports.pluginsBootLoader(); + logger.warn( '[PLUGIN] - Plugins will start without checksum check!'); + checkIotronicWampConnection = setInterval(function(){ logger.warn("[PLUGIN-CONNECTION-RECOVERY] - RETRY..."); @@ -2680,9 +2703,15 @@ exports.Boot = function (){ } catch(err){ logger.warn('[PLUGIN-CONNECTION-RECOVERY] - Internet connection available BUT wamp session not established!'); - if(PLUGIN_MODULE_LOADED == false) + logger.warn("WAMP connection error:" + err); + if(PLUGIN_MODULE_LOADED == false){ + exports.pluginsBootLoader(); + logger.warn( '[PLUGIN] - Plugin will start without checksum check!'); + + } + } @@ -2696,5 +2725,4 @@ exports.Boot = function (){ }, 5000); - }; \ No newline at end of file diff --git a/modules/plugins-manager/python/plugin_apis.py b/modules/plugins-manager/python/plugin_apis.py index 307541f..7c5f46a 100644 --- a/modules/plugins-manager/python/plugin_apis.py +++ b/modules/plugins-manager/python/plugin_apis.py @@ -1,14 +1,22 @@ import logging import json import os +import sys SETTINGS_PATH = os.environ['IOTRONIC_HOME']+"/settings.json" -def getLogger(plugin_name): - logging.basicConfig(filename='/var/log/iotronic/plugins/'+plugin_name+'.log', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.DEBUG) - #logging.debug('TEXT') - #logging.info('TEXT') - #logging.warning('TEXT) +def getLogger(plugin_name, console=None): + + # logging.root.handlers = [] + lr_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + logging.basicConfig(filename='/var/log/iotronic/plugins/'+plugin_name+'.log', level=logging.DEBUG) + + # set up logging to console + if (console != None) and (console == True): + cl = logging.StreamHandler(sys.stdout) + cl.setLevel(logging.DEBUG) + logging.getLogger("").addHandler(cl) + return logging def getExtraInfo(): @@ -26,4 +34,3 @@ def getExtraInfo(): print("Error parsing settings.json: " + str(err)) return extra - diff --git a/modules/services-manager/manage-services.js b/modules/services-manager/manage-services.js index 1c07f93..c0f0840 100644 --- a/modules/services-manager/manage-services.js +++ b/modules/services-manager/manage-services.js @@ -10,7 +10,7 @@ //## //# Unless required by applicable law or agreed to in writing, software //# distributed under the License is distributed on an "AS IS" BASIS, -//# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.stderr of process //# See the License for the specific language governing permissions and //# limitations under the License. //## @@ -50,32 +50,9 @@ exports.enableService = function(args){ var i = findValue(servicesProcess, serviceName, 'key'); - /* - if(servicesProcess[i] != undefined){ - console.log("--------------------------------------------"); - console.log(servicesProcess[i].process.pid, db_tunnel_pid); - console.log("--------------------------------------------"); - } - else{ - console.log("--------------------------------------------"); - console.log(servicesProcess[i], db_tunnel_pid); - console.log("--------------------------------------------"); - } - - - if(servicesProcess[i] != undefined) - var service_pid = servicesProcess[i].process.pid; - else - var service_pid = db_tunnel_pid; - */ - if(running(db_tunnel_pid)){ // Call when Restore tunnel API is called and after injection LR conf is called - /* - servicesProcess[i].restore = true; - console.log("onRESTORE " + servicesProcess[i].restore); - */ //kill tunnel process process.kill(db_tunnel_pid); @@ -128,16 +105,6 @@ exports.enableService = function(args){ }); - - /* - response.result = "SUCCESS"; - //response.pid = newTunnel.process.pid; - response.message = "Service '"+ serviceName +"' successfully restored (after reconnection) on port " + publicPort; - logger.info('[SERVICE] --> ' + response.message); - d.resolve(response); - */ - - }else{ response.result = "WARNING"; @@ -147,8 +114,6 @@ exports.enableService = function(args){ } - - } else{ @@ -335,28 +300,7 @@ function createTunnel(serviceName, localPort, publicPort, callback) { logger.error('[SERVICE] - onError - '+newTunnel.key+' stderr of process ' + newTunnel.process.pid + ': '+ data); }); newTunnel.process.on('close', function(code){ - logger.debug('[SERVICE] - onClose - '+newTunnel.key+' child process ' + newTunnel.process.pid + ' exited with code '+ code); - - /* - //clean data structure - var i = findValue(servicesProcess, serviceName, 'key'); - console.log("onKILL " + i + " " + servicesProcess[i].restore); - - if(servicesProcess[i].restore == false){ - //exports.enableService([serviceName, localPort, publicPort, "false", null]); - //servicesProcess[i].restore = false; - servicesProcess.splice(i,1); - //servicesProcess[i].process.pid = - createTunnel(serviceName, localPort, publicPort, function (newTunnel) { - - logger.info("[SERVICE] --> Service '"+ serviceName +"' successfully exposed on port " + publicPort); - - }); - } - */ - - }); callback(newTunnel); diff --git a/package.json b/package.json index 6f1b002..1d9b79d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mdslab/iotronic-lightning-rod", - "version": "2.3.3", + "version": "2.3.6", "description": "Implementation of the Lightning-rod, the Stack4Things node-side probe (this version works with the standalone version of IoTronic) http://stack4things.unime.it/", "main": "lightning-rod.js", "directories": { diff --git a/utils/pluginExec/create_plugin_env b/utils/pluginExec/create_plugin_env new file mode 100755 index 0000000..0108c78 --- /dev/null +++ b/utils/pluginExec/create_plugin_env @@ -0,0 +1,16 @@ +#!/bin/ash + +echo "PluginExec Environment Creation" + +PLUGIN_ENV="pluginExec" +mkdir -p $PLUGIN_ENV/plugins +mkdir -p $PLUGIN_ENV/node_modules + +ln -s /usr/lib/node_modules/@mdslab/iotronic-lightning-rod/node_modules/python-shell/ $PLUGIN_ENV/node_modules/ + +echo " - folder "$PLUGIN_ENV" created." + +cp -R $NODE_PATH/@mdslab/iotronic-lightning-rod/utils/pluginExec/lib/ $PLUGIN_ENV/ +cp $NODE_PATH/@mdslab/iotronic-lightning-rod/utils/pluginExec/exec_plg $PLUGIN_ENV/ + +echo " - environment created." diff --git a/utils/pluginExec/exec_plg b/utils/pluginExec/exec_plg new file mode 100755 index 0000000..fd316c5 --- /dev/null +++ b/utils/pluginExec/exec_plg @@ -0,0 +1,341 @@ +#!/usr/bin/env node + +//############################################################################################ +//## +//# Copyright (C) 2019 Nicola Peditto +//## +//# Licensed under the Apache License, Version 2.0 (the "License"); +//# you may not use this file except in compliance with the License. +//# You may obtain a copy of the License at +//## +//# http://www.apache.org/licenses/LICENSE-2.0 +//## +//# Unless required by applicable law or agreed to in writing, software +//# distributed under the License is distributed on an "AS IS" BASIS, +//# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//# See the License for the specific language governing permissions and +//# limitations under the License. +//## +//############################################################################################ + +// Version 1.0.0 +// ./exec_plg + +var fs = require("fs"); +//const {PythonShell} = require('python-shell'); +var PythonShell = require('python-shell'); +var net = require('net'); + + + +// Inputs +var args = process.argv; + +if(args.length < 5){ + + if(args[3] == "list"){ + + var plugin_env = args[2]; + + if(plugin_env == "local") + var plugin_location = "./plugins"; + else if (plugin_env == "injected") + var plugin_location = "/var/lib/iotronic/plugins"; + else{ + console.log("Wrong plugin environment selected: allowed 'local' or 'injected'"); + process.exit(); + } + + fs.readdir(plugin_location, (err, files) => { + + if ( (files.length == 0) || (files.length == 1) ){ + + console.log("No plugins available!"); + + process.exit(); + + } + else{ + files.forEach(file => { + if (file != "plugins.json") + console.log(file); + }); + } + + }); + + + + } + else{ + + if(args.length != 2) + console.log("Wrong arguments!"); + + console.log("Usage:\n\t ./exec_plg "); + console.log("\n\t ./exec_plg list \n"); + + process.exit(); + + } + + + +} +else{ + + var plugin_env = args[2]; + + if(plugin_env == "local") + var plugin_location = "./plugins"; + else if (plugin_env == "injected") + var plugin_location = "/var/lib/iotronic/plugins"; + else{ + console.log("Wrong plugin environment selected: allowed 'local' or 'injected'"); + process.exit(); + } + + var plugin_type = args[3]; + var plugin_name = args[4]; + + var PLUGINS_FILE_LIST = fs.readdirSync(plugin_location); + + + if(PLUGINS_FILE_LIST.includes(args[4])){ + var plugin_version = "DEV"; + var plugin_json_name = plugin_location + "/" + plugin_name + "/" + plugin_name + ".json"; + var plugin_json = fs.readFileSync(plugin_json_name); + + console.log("Plugin:\n\t" + plugin_name); + console.log("Plugin Type:\n\t" + plugin_type); + console.log("Plugin input parameters:\n\t" + JSON.stringify(JSON.parse(plugin_json))); + + var s_server = null; + var socketPath = '/tmp/plugin-'+plugin_name; + + var response = { + message: '', + result: '' + }; + + if(plugin_type == "async"){ + + // Callback for socket + var handler = function(socket){ + + // Listen for data from client + socket.on('data',function(bytes){ + + var data = bytes.toString(); // Decode byte string + var data_parsed = JSON.parse(data); // Parse JSON response + + if(data_parsed.result == "ERROR"){ + + response.result = "ERROR"; + response.message = data_parsed.payload; + console.log('[PLUGIN] - Error in '+plugin_name + ':\n'+JSON.stringify(response.message, null, "\t")); + + }else{ + + response.result = "SUCCESS"; + response.message = data_parsed.payload; + console.log('[PLUGIN] - '+plugin_name + ': '+ JSON.stringify(response.message, null, "\t")); + + } + + }); + + // On client close + socket.on('end', function() { + console.log('[PLUGIN-SOCKET] - Socket disconnected'); + s_server.close(function(){ + console.log('[PLUGIN-SOCKET] - Server socket closed'); + }); + + }); + + + }; + + + // Remove an existing socket + fs.unlink(socketPath, function(){ + // Create the server, give it our callback handler and listen at the path + + s_server = net.createServer(handler).listen(socketPath, function(){ + console.log('[PLUGIN-SOCKET] - Socket in listening...'); + console.log('[PLUGIN-SOCKET] --> socket: '+socketPath); + }) + + }); + + + var options = { + mode: 'text', + pythonPath: '/usr/bin/python3', + pythonOptions: ['-u'], + //scriptPath: __dirname, + args: [plugin_name, plugin_version, plugin_json, plugin_location] + }; + + var pyshell = new PythonShell('./lib/async-wrapper.py', options); + + var PY_PID = pyshell.childProcess.pid; + console.log("[PLUGIN-SHELL] - PID wrapper: "+ PY_PID); + + + pyshell.on('message', function (message) { + // received a message sent from the Python script (a simple "print" statement) + console.log("[PLUGIN-WRAPPER] - PYTHON: "+message); + }); + + // end the input stream and allow the process to exit + pyshell.end(function (err, code, signal) { + + if (err){ + + response.result = "ERROR"; + response.message = "Error plugin execution: please check plugin logs: " + err.traceback; + + console.log(response) + + } + else{ + + console.log('[PLUGIN-SHELL] - Python shell of "'+plugin_name+'" plugin terminated: {signal: '+ signal+', code: '+code+'}'); + + if(signal == null && code == 0){ + console.log("[PLUGIN-SHELL] --> unexpected '"+plugin_name+"' plugin termination!"); + }else{ + console.log("[PLUGIN-SHELL] --> python plugin '"+plugin_name+"' terminated!") + } + + } + + + + }); + + } + else if(plugin_type == "sync"){ + + // Callback for socket + var handler = function(socket){ + + // Listen for data from client + socket.on('data', function(bytes){ + + var data = bytes.toString(); // Decode byte string + var data_parsed = JSON.parse(data); // Parse JSON response + + if(data_parsed.result == "ERROR"){ + + response.result = "ERROR"; + response.message = "Error in plugin execution: " + data_parsed.payload; + console.log('[PLUGIN] - Error in '+plugin_name + ':\n'+JSON.stringify(response.message, null, "\t")); + + }else{ + + try{ + response.result = "SUCCESS"; + response.message = data_parsed.payload; + console.log('[PLUGIN] - '+plugin_name + ': '+ JSON.stringify(response.message, null, "\t")); + } + catch(err){ + response.result = "ERROR"; + response.message = JSON.stringify(err); + console.log('Error parsing '+plugin_name + ' plugin response: '+ response.message); + } + + } + + }); + + // On client close + socket.on('end', function() { + + console.log('[PLUGIN-SOCKET] - Socket disconnected'); + + s_server.close(function(){ + + console.log('[PLUGIN-SOCKET] - Server socket closed'); + + }); + + }); + + }; + + // Remove an existing plugin socket + fs.unlink(socketPath, function(){ + + + // Create the server, give it our callback handler and listen at the path + s_server = net.createServer(handler).listen(socketPath, function() { + console.log('[PLUGIN-SOCKET] - Socket in listening...'); + console.log('[PLUGIN-SOCKET] --> socket: '+socketPath); + + // after socket creation we will start the plugin wrapper + var options = { + mode: 'text', + pythonPath: '/usr/bin/python3', + pythonOptions: ['-u'], + //scriptPath: __dirname, + args: [plugin_name, plugin_version, plugin_json, plugin_location] + }; + + var pyshell = new PythonShell('./lib/sync-wrapper.py', options); + // it will create a python instance like this: + // python -u /opt/stack4things/lightning-rod/modules/plugins-manager/python/sync-wrapper.py py_sync {"name":"S4T"} + + console.log("[PLUGIN-SHELL] - PID wrapper: "+pyshell.childProcess.pid); + + // listening 'print' output + pyshell.on('message', function (message) { + // received a message sent from the Python script (a simple "print" statement) + console.log("[PLUGIN-WRAPPER] - PYTHON: "+message); + }); + + + // end the input stream and allow the process to exit + pyshell.end(function (err, code, signal) { + + if (err){ + + response.result = "ERROR"; + response.message = err; + console.log(response); + + }else{ + console.log('[PLUGIN-SHELL] - Python shell terminated: {signal: '+ signal+', code: '+code+'}'); + } + + }); + + + + }) + + + } + + ); + + } + else{ + console.log("WRONG plugin type: '" + plugin_type +"' - Types allowed: async | sync") + } + } + else{ + console.log("This plugin does not exist!"); + process.exit(); + } + + + + + + + +} + diff --git a/utils/pluginExec/lib/async-wrapper.py b/utils/pluginExec/lib/async-wrapper.py new file mode 100644 index 0000000..26fce0e --- /dev/null +++ b/utils/pluginExec/lib/async-wrapper.py @@ -0,0 +1,133 @@ +############################################################################################# +### +## Copyright (C) 2018 Nicola Peditto +### +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +### +## http://www.apache.org/licenses/LICENSE-2.0 +### +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +### +############################################################################################# + +import sys +import imp +import socket +from datetime import datetime +import time +import os +import json +import threading + +# Inputs +plugin_name = sys.argv[1] +plugin_version = sys.argv[2] +plugin_params = sys.argv[3] +plugin_path = sys.argv[4] + + +# Globals +socket_path = "/tmp/plugin-"+plugin_name +plugin_path = plugin_path+"/"+plugin_name+"/"+plugin_name+".py" +plugin = imp.load_source("plugin", plugin_path) + + +if sys.version_info[0] < 3: + import Queue + q_result = Queue.Queue() +else: + import queue + q_result = queue.Queue() + + +# Thread to run user's plugin logic +class Plugin(threading.Thread): + + def __init__(self, params, q_result, api): + + threading.Thread.__init__(self) + self.setName(plugin_name) + self.name = plugin_name + self.params = json.loads(params) + self.q_result = q_result + self.api = api + + + def run(self): + + try: + + print("Plugin Thread starting...") + print("--> PARAMS:" + str(self.params)) + + result = "Plugin " +plugin_name+" is running!" + + response = { + "message": str(result), + "result": "SUCCESS" + } + + self.q_result.put(json.dumps(response)) + + plugin.main(self.name, self.params, api) + + + except Exception as err: + + logging = api.getLogger(plugin_name, console=True) + logging.exception("Exception occurred in plugin") + + response = { + "message": str(err), + "result": "ERROR" + } + + self.q_result.put(json.dumps(response)) + + + + + +# WRAPPER MAIN +if __name__ == '__main__': + + api = imp.load_source("api", os.environ['LIGHTNINGROD_HOME'] + "/modules/plugins-manager/python/plugin_apis.py") + + logging = api.getLogger(plugin_name, console=True) + logging.info(plugin_name + " started with these parameters: " + str(plugin_params) ) + + worker = Plugin( + plugin_params, + q_result, + api + ) + + # 1. thread plugin starting + worker.start() + + # 2. waiting for plugin result injected in the queue + while q_result.empty(): + pass + + # 3. Get data from plugin queue and parsing + data = q_result.get() + data_parsed = json.loads(data) + + # 4. Create package for Plugin Manager + msg = json.dumps({ + "plugin": plugin_name, + "payload": str(data_parsed["message"]), + "result": str(data_parsed["result"]) + }) + + # 5. Connect to the unix local socket to send the plugin package + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(socket_path) + client.send(msg.encode('utf-8')) + client.close() diff --git a/utils/pluginExec/lib/sync-wrapper.py b/utils/pluginExec/lib/sync-wrapper.py new file mode 100644 index 0000000..0913708 --- /dev/null +++ b/utils/pluginExec/lib/sync-wrapper.py @@ -0,0 +1,130 @@ +############################################################################################# +### +## Copyright (C) 2018 Nicola Peditto +### +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +### +## http://www.apache.org/licenses/LICENSE-2.0 +### +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +### +############################################################################################# + +import sys +import imp +import socket +from datetime import datetime +import time +import os +import threading +import json + + +# Inputs +plugin_name = sys.argv[1] +plugin_version = sys.argv[2] +plugin_params = sys.argv[3] +plugin_path = sys.argv[4] + + + +# Globals +socket_path = '/tmp/plugin-'+plugin_name +plugin_path = plugin_path+"/"+plugin_name+"/"+plugin_name+".py" +plugin = imp.load_source("plugin", plugin_path) + + +if sys.version_info[0] < 3: + import Queue + q_result = Queue.Queue() +else: + import queue + q_result = queue.Queue() + + +# Thread to run user's plugin logic +class Plugin(threading.Thread): + + def __init__(self, params, q_result, api): + threading.Thread.__init__(self) + self.setName(plugin_name) + self.name = plugin_name + self.params = json.loads(params) + self.q_result = q_result + self.api = api + + def run(self): + + try: + + print("Plugin Thread starting...") + print("--> PARAMS:" + str(self.params)) + + result = plugin.main(self.name, self.params, api) + + response = { + "message": str(result), + "result": "SUCCESS" + } + + self.q_result.put(json.dumps(response)) + + + except Exception as err: + + logging = api.getLogger(plugin_name, console=True) + logging.exception("Exception occurred in plugin") + + response = { + "message": str(err), + "result": "ERROR" + } + + self.q_result.put(json.dumps(response)) + + +# WRAPPER MAIN +if __name__ == '__main__': + + api = imp.load_source("api", os.environ['LIGHTNINGROD_HOME'] + "/modules/plugins-manager/python/plugin_apis.py") + + logging = api.getLogger(plugin_name, console=True) + logging.info(plugin_name + " started with these parameters: " + str(plugin_params) ) + + worker = Plugin( + plugin_params, + q_result, + api + ) + + # 1. thread plugin starting + worker.start() + + # 2. waiting for plugin result injected in the queue + while q_result.empty(): + pass + + # 3. Get data from plugin queue and parsing + data = q_result.get() + data_parsed = json.loads(data) + + # 4. Create package for Plugin Manager + + msg = json.dumps({ + "plugin": plugin_name, + "payload": str(data_parsed["message"]), + "result": str(data_parsed["result"]) + }) + + + # 5. Connect to the unix local socket to send the plugin package + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(socket_path) + client.send(msg.encode('utf-8')) + client.close()