diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index a638723a..6874ecc7 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -45,132 +45,104 @@ function generateTopics(callback) { const topics = []; config.getLogger().debug(context, 'Generating topics'); + if (config.getConfig().mqtt.sharedSubscriptionsDisabled === true) { + // With leading slashes + topics.push('/+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push('/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push('/+/+/' + constants.MEASURES_SUFIX); + topics.push('/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX); + topics.push('/+/+/' + constants.CONFIGURATION_SUFIX + '/' + constants.CONFIGURATION_COMMAND_SUFIX); + topics.push( + '/' + + constants.MQTT_TOPIC_PROTOCOL + + '/+/+/' + + constants.CONFIGURATION_SUFIX + + '/' + + constants.CONFIGURATION_COMMAND_SUFIX + ); + topics.push('/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE); + topics.push('/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE); + + //Without leading slashes + topics.push('+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push(constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push('+/+/' + constants.MEASURES_SUFIX); + topics.push(constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX); + topics.push('+/+/' + constants.CONFIGURATION_SUFIX + '/' + constants.CONFIGURATION_COMMAND_SUFIX); + topics.push( + constants.MQTT_TOPIC_PROTOCOL + + '/+/+/' + + constants.CONFIGURATION_SUFIX + + '/' + + constants.CONFIGURATION_COMMAND_SUFIX + ); + topics.push('+/+/' + constants.CONFIGURATION_COMMAND_UPDATE); + topics.push(constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE); + } else { + let shareSubscriptionGroup = constants.MQTT_SHARE_SUBSCRIPTION_GROUP; + if (config.getConfig().mqtt.groupIdSufix !== undefined) { + shareSubscriptionGroup = + shareSubscriptionGroup.slice(0, -1) + + config.getConfig().mqtt.groupIdSufix + + shareSubscriptionGroup.slice(-1); + } + // With leading slashes + topics.push(shareSubscriptionGroup + '/+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push( + shareSubscriptionGroup + '/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+' + ); + topics.push(shareSubscriptionGroup + '/+/+/' + constants.MEASURES_SUFIX); + topics.push(shareSubscriptionGroup + '/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX); + topics.push( + shareSubscriptionGroup + + '/+/+/' + + constants.CONFIGURATION_SUFIX + + '/' + + constants.CONFIGURATION_COMMAND_SUFIX + ); + topics.push( + shareSubscriptionGroup + + '/' + + constants.MQTT_TOPIC_PROTOCOL + + '/+/+/' + + constants.CONFIGURATION_SUFIX + + '/' + + constants.CONFIGURATION_COMMAND_SUFIX + ); + topics.push(shareSubscriptionGroup + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE); + topics.push( + shareSubscriptionGroup + + '/' + + constants.MQTT_TOPIC_PROTOCOL + + '/+/+/' + + constants.CONFIGURATION_COMMAND_UPDATE + ); - // With leading slashes - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX + '/+' - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - '/' + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.MEASURES_SUFIX + - '/+' - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - '/' + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.MEASURES_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - '/+/+/' + - constants.CONFIGURATION_SUFIX + - '/' + - constants.CONFIGURATION_COMMAND_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - '/' + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.CONFIGURATION_SUFIX + - '/' + - constants.CONFIGURATION_COMMAND_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - '/' + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.CONFIGURATION_COMMAND_UPDATE - ); - - //Without leading slashes - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '+/+/' + constants.MEASURES_SUFIX + '/+' - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.MEASURES_SUFIX + - '/+' - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '+/+/' + constants.MEASURES_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.MEASURES_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - '+/+/' + - constants.CONFIGURATION_SUFIX + - '/' + - constants.CONFIGURATION_COMMAND_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.CONFIGURATION_SUFIX + - '/' + - constants.CONFIGURATION_COMMAND_SUFIX - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '+/+/' + constants.CONFIGURATION_COMMAND_UPDATE - ); - topics.push( - config.getConfig().mqtt.disableSharedSubscriptions - ? '' - : constants.MQTT_SHARE_SUBSCRIPTION_GROUP + - constants.MQTT_TOPIC_PROTOCOL + - '/+/+/' + - constants.CONFIGURATION_COMMAND_UPDATE - ); + //Without leading slashes + topics.push(shareSubscriptionGroup + '+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push(shareSubscriptionGroup + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+'); + topics.push(shareSubscriptionGroup + '+/+/' + constants.MEASURES_SUFIX); + topics.push(shareSubscriptionGroup + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX); + topics.push( + shareSubscriptionGroup + + '+/+/' + + constants.CONFIGURATION_SUFIX + + '/' + + constants.CONFIGURATION_COMMAND_SUFIX + ); + topics.push( + shareSubscriptionGroup + + constants.MQTT_TOPIC_PROTOCOL + + '/+/+/' + + constants.CONFIGURATION_SUFIX + + '/' + + constants.CONFIGURATION_COMMAND_SUFIX + ); + topics.push(shareSubscriptionGroup + '+/+/' + constants.CONFIGURATION_COMMAND_UPDATE); + topics.push( + shareSubscriptionGroup + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE + ); + } callback(null, topics); } @@ -369,6 +341,7 @@ function start(callback) { if (!mqttConfig) { return config.getLogger().error(context, 'Error MQTT is not configured'); } + if (mqttConfig.disabled) { return config.getLogger().warn(context, 'MQTT is disabled'); } diff --git a/lib/configService.js b/lib/configService.js index 8e40630f..fe9a023c 100644 --- a/lib/configService.js +++ b/lib/configService.js @@ -236,11 +236,9 @@ function processEnvironmentVariables() { config.mqtt.disabled = true; } - if ( - process.env.IOTA_MQTT_DISABLE_SHARED_SUBSCRIPTIONS && - process.env.IOTA_MQTT_DISABLE_SHARED_SUBSCRIPTIONS.trim().toLowerCase() === 'true' - ) { - config.mqtt.sharedSubscriptionsDisabled = true; + if (process.env.IOTA_MQTT_DISABLE_SHARED_SUBSCRIPTIONS) { + config.mqtt.sharedSubscriptionsDisabled = + process.env.IOTA_MQTT_DISABLE_SHARED_SUBSCRIPTIONS.trim().toLowerCase() === 'true'; } if (process.env.IOTA_MQTT_GROUP_ID_SUFIX) { diff --git a/lib/constants.js b/lib/constants.js index 65d2dc9e..8b447ba5 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -23,7 +23,6 @@ * Modified by: Fernando Méndez, Daniel Calvo - ATOS Research & Innovation */ -const config = require('./configService'); module.exports = { MEASURES_SUFIX: 'attrs', CONFIGURATION_SUFIX: 'configuration', @@ -47,10 +46,7 @@ module.exports = { MQTTB_ALARM: 'MQTTB-ALARM', MQTT_DEFAULT_RETRIES: 5, MQTT_DEFAULT_RETRY_TIME: 5, - MQTT_SHARE_SUBSCRIPTION_GROUP: - config.getConfig().mqtt && config.getConfig().mqtt.groupIdSufix - ? '$share/ul' + config.getConfig().mqtt.groupIdSufix + '/' - : '$share/ul/', + MQTT_SHARE_SUBSCRIPTION_GROUP: '$share/ul/', MQTT_TOPIC_PROTOCOL: 'ul', AMQP_DEFAULT_EXCHANGE: 'amq.topic',