From 7872ebede63ca2831cc82e3496c22d77e8acd212 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 09:02:17 +0200 Subject: [PATCH 1/9] chandle close and reconnecct --- lib/bindings/MQTTBinding.js | 83 +++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 9 deletions(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index 89a74ac0..b34cb402 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -34,6 +34,7 @@ var iotAgentLib = require('iotagent-node-lib'), op: 'IOTAUL.MQTT.Binding' }, mqttClient, + mqttConn, config = require('../configService'); /** @@ -155,20 +156,81 @@ function start(callback) { options.password = config.getConfig().mqtt.password; } - mqttClient = mqtt.connect('mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port, options); - mqttClient.on('error', function(e) { - config.getLogger().fatal('GLOBAL-002: Couldn\'t connect with MQTT broker: %j', e); - callback(e); - }); + var retries, retryTime; - mqttClient.on('message', commonBindings.mqttMessageHandler); + if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retries) { + retries = config.getConfig().amqp.retries; + } else { + retries = constants.MQTT_DEFAULT_RETRIES; + } + if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retrytime) { + retryTime = config.getConfig().mqtt.retryTime; + } else { + retryTime = constants.MQTT_DEFAULT_RETRY_TIME; + } + var isConnecting = false; + var numRetried = 0; + config.getLogger().info(context, 'Starting MQTT binding'); - mqttClient.on('connect', function(ack) { - config.getLogger().info(context, 'MQTT Client connected'); - recreateSubscriptions(callback); + function createConnection(callback) { + config.getLogger().info(context, 'creating connnection'); + if (isConnecting) { + return; + } + isConnecting = true; + mqttClient = mqtt.connect( + 'mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port, + options + ); + // TDB: check if error + if (!mqttClient) { + config.getLogger().error(context, 'error'); + if (numRetried <= retries) { + numRetried++; + return setTimeout(createConnection, retryTime * 1000, callback); + } + } + isConnecting = false; + mqttClient.on('error', function(e) { + /*jshint quotmark: double */ + config.getLogger().fatal("GLOBAL-002: Couldn't connect with MQTT broker: %j", e); + /*jshint quotmark: single */ + callback(e); + }); + mqttClient.on('message', commonBindings.mqttMessageHandler); + mqttClient.on('connect', function(ack) { + config.getLogger().info(context, 'MQTT Client connected'); + recreateSubscriptions(callback); + }); + mqttClient.on('close', function() { + // If mqttConn is null, the connection has been closed on purpose + if (mqttConn) { + config.getLogger().error(context, 'reconnecting'); + if (numRetried <= retries) { + numRetried++; + return setTimeout(createConnection, retryTime * 1000); + } + } else { + return; + } + }); + + config.getLogger().info(context, 'connected'); + mqttConn = mqttClient; + if (callback) { + callback(); + } + } + + async.waterfall([createConnection], function(error) { + if (error) { + config.getLogger().debug('MQTT error %j', error); + } + callback(); }); } + /** * Stops the IoT Agent and all the transport plugins. */ @@ -180,6 +242,9 @@ function stop(callback) { mqttClient.end.bind(mqttClient, true) ], function() { config.getLogger().info('MQTT Binding Stopped'); + if (mqttConn) { + mqttConn = null; + } callback(); }); } From ac1810403cf7f431d81e4394b4797470d9f93ab8 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 09:23:10 +0200 Subject: [PATCH 2/9] add mqtt options about retries and retry_time --- lib/bindings/MQTTBinding.js | 2 +- lib/configService.js | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index b34cb402..9989d5ea 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -159,7 +159,7 @@ function start(callback) { var retries, retryTime; if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retries) { - retries = config.getConfig().amqp.retries; + retries = config.getConfig().mqtt.retries; } else { retries = constants.MQTT_DEFAULT_RETRIES; } diff --git a/lib/configService.js b/lib/configService.js index 4f1ab2ac..84bfaad5 100644 --- a/lib/configService.js +++ b/lib/configService.js @@ -44,6 +44,8 @@ function processEnvironmentVariables() { 'IOTA_MQTT_PASSWORD', 'IOTA_MQTT_QOS', 'IOTA_MQTT_RETAIN', + 'IOTA_MQTT_RETRIES', + 'IOTA_MQTT_RETRY_TIME', 'IOTA_AMQP_HOST', 'IOTA_AMQP_PORT', 'IOTA_AMQP_USERNAME', @@ -62,7 +64,9 @@ function processEnvironmentVariables() { 'IOTA_MQTT_USERNAME', 'IOTA_MQTT_PASSWORD', 'IOTA_MQTT_QOS', - 'IOTA_MQTT_RETAIN' + 'IOTA_MQTT_RETAIN', + 'IOTA_MQTT_RETRIES', + 'IOTA_MQTT_RETRY_TIME' ], amqpVariables = [ 'IOTA_AMQP_HOST', @@ -115,6 +119,14 @@ function processEnvironmentVariables() { config.mqtt.retain = process.env.IOTA_MQTT_RETAIN === 'true'; } + if (process.env.IOTA_MQTT_RETRIES) { + config.mqtt.retries = process.env.IOTA_MQTT_RETRIES; + } + + if (process.env.IOTA_MQTT_RETRY_TIME) { + config.mqtt.retryTime = process.env.IOTA_MQTT_RETRY_TIME; + } + if (anyIsSet(amqpVariables)) { config.amqp = {}; } From b4d3204a2ff88164b9d81182855689fe207ff2a9 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 09:33:14 +0200 Subject: [PATCH 3/9] add keepalive mqtt option --- lib/bindings/MQTTBinding.js | 4 +++- lib/configService.js | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index 9989d5ea..8bcc1095 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -155,7 +155,9 @@ function start(callback) { options.username = config.getConfig().mqtt.username; options.password = config.getConfig().mqtt.password; } - + if (config.getConfig().mqtt.keepalive) { + options.keepalive = parseInt(config.getConfig().mqtt.keepalive) || 0; + } var retries, retryTime; if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retries) { diff --git a/lib/configService.js b/lib/configService.js index 84bfaad5..e45ffa10 100644 --- a/lib/configService.js +++ b/lib/configService.js @@ -66,7 +66,8 @@ function processEnvironmentVariables() { 'IOTA_MQTT_QOS', 'IOTA_MQTT_RETAIN', 'IOTA_MQTT_RETRIES', - 'IOTA_MQTT_RETRY_TIME' + 'IOTA_MQTT_RETRY_TIME', + 'IOTA_MQTT_KEEPALIVE' ], amqpVariables = [ 'IOTA_AMQP_HOST', @@ -127,6 +128,10 @@ function processEnvironmentVariables() { config.mqtt.retryTime = process.env.IOTA_MQTT_RETRY_TIME; } + if (process.env.IOTA_MQTT_KEEPALIVE) { + config.mqtt.keepalive = process.env.IOTA_MQTT_KEEPALIVE; + } + if (anyIsSet(amqpVariables)) { config.amqp = {}; } From 83f8c6edf856e565315b93d44a3ece55dc04e9b0 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 09:57:02 +0200 Subject: [PATCH 4/9] set isConnecting --- lib/bindings/MQTTBinding.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index 8bcc1095..39eaa3f2 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -149,8 +149,6 @@ function start(callback) { connectTimeout: 60 * 60 * 1000 }; - config.getLogger().info(context, 'Starting MQTT binding'); - if (config.getConfig().mqtt && config.getConfig().mqtt.username && config.getConfig().mqtt.password) { options.username = config.getConfig().mqtt.username; options.password = config.getConfig().mqtt.password; @@ -175,7 +173,7 @@ function start(callback) { config.getLogger().info(context, 'Starting MQTT binding'); function createConnection(callback) { - config.getLogger().info(context, 'creating connnection'); + config.getLogger().info(context, 'creating connection'); if (isConnecting) { return; } @@ -184,15 +182,15 @@ function start(callback) { 'mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port, options ); + isConnecting = false; // TDB: check if error if (!mqttClient) { - config.getLogger().error(context, 'error'); + config.getLogger().error(context, 'error mqttClient not created'); if (numRetried <= retries) { numRetried++; return setTimeout(createConnection, retryTime * 1000, callback); } } - isConnecting = false; mqttClient.on('error', function(e) { /*jshint quotmark: double */ config.getLogger().fatal("GLOBAL-002: Couldn't connect with MQTT broker: %j", e); From a5b877a59ef782e73987983c6dbc6496a9b50cfa Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 11:12:14 +0200 Subject: [PATCH 5/9] fix callback --- lib/bindings/MQTTBinding.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index 39eaa3f2..a3861801 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -69,7 +69,9 @@ function recreateSubscriptions(callback) { } else { iotAgentLib.alarms.release(constants.MQTTB_ALARM); config.getLogger().debug('Successfully subscribed to the following topics:\n%j\n', topics); - callback(null); + if (callback) { + callback(null); + } } }); } @@ -200,7 +202,7 @@ function start(callback) { mqttClient.on('message', commonBindings.mqttMessageHandler); mqttClient.on('connect', function(ack) { config.getLogger().info(context, 'MQTT Client connected'); - recreateSubscriptions(callback); + recreateSubscriptions(); }); mqttClient.on('close', function() { // If mqttConn is null, the connection has been closed on purpose @@ -220,7 +222,7 @@ function start(callback) { if (callback) { callback(); } - } + } // function createConnection async.waterfall([createConnection], function(error) { if (error) { From c86e4f6df8406ba1543f9b3a17a6893491bd04c9 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 12:02:47 +0200 Subject: [PATCH 6/9] add default consts for MQTT retry values --- lib/constants.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/constants.js b/lib/constants.js index 612769a5..45a15ac2 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -47,6 +47,8 @@ module.exports = { COMMAND_STATUS_COMPLETED: 'OK', MQTTB_ALARM: 'MQTTB-ALARM', + MQTT_DEFAULT_RETRIES: 5, + MQTT_DEFAULT_RETRY_TIME: 5, AMQP_DEFAULT_EXCHANGE: 'amq.topic', AMQP_DEFAULT_QUEUE: 'iotaqueue', From 2bc027d293c55bee24bd7efc7e940a595856138f Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 8 Jul 2019 12:20:20 +0200 Subject: [PATCH 7/9] add missed mqtt keepalive env var --- lib/configService.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/configService.js b/lib/configService.js index e45ffa10..e8bf6c70 100644 --- a/lib/configService.js +++ b/lib/configService.js @@ -46,6 +46,7 @@ function processEnvironmentVariables() { 'IOTA_MQTT_RETAIN', 'IOTA_MQTT_RETRIES', 'IOTA_MQTT_RETRY_TIME', + 'IOTA_MQTT_KEEPALIVE', 'IOTA_AMQP_HOST', 'IOTA_AMQP_PORT', 'IOTA_AMQP_USERNAME', From 3bda7c8449634df28f138ed61ea5447498d8cc93 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Wed, 10 Jul 2019 07:44:36 +0200 Subject: [PATCH 8/9] bump to 1.7.6 --- package.json | 2 +- rpm/SPECS/iotaul.spec | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 94d0c705..4625abb2 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "iotagent-ul", "description": "IoT Agent for the Ultrlight 2.0 protocol", - "version": "1.7.50", + "version": "1.7.60", "homepage": "https://github.com/telefonicaid/iotagent-ul", "author": { "name": "Daniel Moran", diff --git a/rpm/SPECS/iotaul.spec b/rpm/SPECS/iotaul.spec index ff9859a1..c0c6cdd7 100644 --- a/rpm/SPECS/iotaul.spec +++ b/rpm/SPECS/iotaul.spec @@ -170,6 +170,10 @@ fi %{_install_dir} %changelog + +* Wed Jul 10 2019 Alvaro Vega Garcia 1.7.60 +- Fix: reconnect when MQTT closes connection (including mqtt retries and keepalive conf options) + * Fri Apr 26 2019 Fermin Galan 1.7.50 * Mon Aug 06 2018 Fermin Galan 1.7.0 From 093c3e95464653117bac1031058044006d733c82 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Wed, 10 Jul 2019 07:56:59 +0200 Subject: [PATCH 9/9] fix log --- lib/bindings/MQTTBinding.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index a3861801..f65844a2 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -207,8 +207,8 @@ function start(callback) { mqttClient.on('close', function() { // If mqttConn is null, the connection has been closed on purpose if (mqttConn) { - config.getLogger().error(context, 'reconnecting'); if (numRetried <= retries) { + config.getLogger().error(context, 'reconnecting...'); numRetried++; return setTimeout(createConnection, retryTime * 1000); }