diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index e69de29b..342c77db 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -0,0 +1 @@ +- allow receive command notifications from CB (iotagent-node-lib#1455 \ No newline at end of file diff --git a/lib/bindings/HTTPBindings.js b/lib/bindings/HTTPBindings.js index 115df2a8..84ea188c 100644 --- a/lib/bindings/HTTPBindings.js +++ b/lib/bindings/HTTPBindings.js @@ -274,7 +274,7 @@ function handleIncomingMeasure(req, res, next) { * @param {Object} attribute Attribute in NGSI format. * @return {Function} Command execution function ready to be called with async.series. */ -function generateCommandExecution(apiKey, device, attribute) { +function generateCommandExecution(apiKey, device, group, attribute) { const cmdName = attribute.name; const cmdAttributes = attribute.value; context = fillService(context, device); @@ -362,7 +362,7 @@ function generateCommandExecution(apiKey, device, attribute) { function commandHandler(device, attributes, callback) { context = fillService(context, device); utils.getEffectiveApiKey(device.service, device.subservice, device, function (error, apiKey) { - async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), function (error) { + async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device, null)), function (error) { if (error) { // prettier-ignore config.getLogger().error(context, @@ -526,13 +526,15 @@ function stop(callback) { } } -function sendPushNotifications(device, values, callback) { - async.series(values.map(generateCommandExecution.bind(null, null, device)), function (error) { +function sendPushNotifications(device, group, values, callback) { + const executions = _.flatten(values.map(generateCommandExecution.bind(null, group.apikey, device, group))); + + async.series(executions, function (error) { callback(error); }); } -function storePollNotifications(device, values, callback) { +function storePollNotifications(device, group, values, callback) { function addPollNotification(item, innerCallback) { iotAgentLib.addCommand(device.service, device.subservice, device.id, item, innerCallback); } @@ -541,11 +543,52 @@ function storePollNotifications(device, values, callback) { } function notificationHandler(device, values, callback) { - if (device.endpoint) { - sendPushNotifications(device, values, callback); - } else { - storePollNotifications(device, values, callback); + config.getLogger().debug(context, 'values for command %j and device %j', values, device); + + function invokeWithConfiguration(apiKey, callback) { + let group = {}; + iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function ( + error, + foundGroup + ) { + if (!error) { + group = foundGroup; + } + var cmdValue = { type: 'command' }; + for (let val of values) { + if (val.name === 'cmd') { + cmdValue.name = val.value; + } else if (val.name === 'params') { + cmdValue.value = val.value; + } else { + // other fields like status, info, onDelivered, OnError + cmdValue[val.name] = val.value; + } + } + var cmdValues = [cmdValue]; + config.getLogger().debug(context, 'cmdValues %j', cmdValues); + iotAgentLib.executeUpdateSideEffects( + device, + device.id, + device.type, + device.service, + device.subservice, + cmdValues, + function () { + if (device.endpoint || group.endpoint) { + sendPushNotifications(device, group, cmdValues, callback); + } else { + storePollNotifications(device, group, cmdValues, callback); + } + } + ); + }); } + + async.waterfall( + [apply(utils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration], + callback + ); } exports.start = start; diff --git a/lib/iotagent-ul.js b/lib/iotagent-ul.js index baa363cd..baae9e36 100644 --- a/lib/iotagent-ul.js +++ b/lib/iotagent-ul.js @@ -165,10 +165,28 @@ function deviceUpdatingHandler(device, callback) { * @param {Array} values Values recieved in the notification. */ function notificationHandler(device, values, callback) { - transportSelector.applyFunctionFromBinding( - [device, values], - 'notificationHandler', - device.transport || config.getConfig().defaultTransport, + function invokeWithConfiguration(apiKey, callback) { + let group = {}; + iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function ( + error, + foundGroup + ) { + if (!error) { + group = foundGroup; + } + transportSelector.applyFunctionFromBinding( + [device, values], + 'notificationHandler', + device.transport || + (group && group.transport ? group.transport : undefined) || + config.getConfig().defaultTransport, + callback + ); + }); + } + + async.waterfall( + [apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration], callback ); } diff --git a/test/unit/ngsiv2/HTTP_commands_test.js b/test/unit/ngsiv2/HTTP_commands_test.js index a5f5736f..d48ed540 100644 --- a/test/unit/ngsiv2/HTTP_commands_test.js +++ b/test/unit/ngsiv2/HTTP_commands_test.js @@ -240,6 +240,44 @@ describe('HTTP: Commands', function () { }); }); }); + + describe('When a command arrive to the Agent for a device with the HTTP protocol throught a CB notification', function () { + const commandOptions = { + url: 'http://localhost:' + config.iota.server.port + '/notify', + method: 'POST', + json: utils.readExampleFile('./test/unit/ngsiv2/contextRequests/notifyCommand.json'), + headers: { + 'fiware-service': 'smartgondor', + 'fiware-servicepath': '/gardens' + } + }; + + beforeEach(function () { + contextBrokerMock + .matchHeader('fiware-service', 'smartgondor') + .matchHeader('fiware-servicepath', '/gardens') + .post( + '/v2/entities?options=upsert', + utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus1.json') + ) + .reply(204); + + mockedClientServer = nock('http://localhost:9876') + .post('/command', function (body) { + return body === 'MQTT_2@PING|data=22'; + }) + .reply(200, 'MQTT_2@PING|data=22'); + }); + + it('should return a 200 OK without errors', function (done) { + request(commandOptions, function (error, response, body) { + should.not.exist(error); + response.statusCode.should.equal(200); + contextBrokerMock.done(); + done(); + }); + }); + }); }); describe('HTTP: Commands with expressions', function () { @@ -435,3 +473,100 @@ describe('HTTP: Commands with expressions 2', function () { }); }); }); + +describe('HTTP: Commands with expressions', function () { + beforeEach(function (done) { + const provisionOptions = { + url: 'http://localhost:' + config.iota.server.port + '/iot/devices', + method: 'POST', + json: utils.readExampleFile('./test/deviceProvisioning/provisionCommand7.json'), + headers: { + 'fiware-service': 'smartgondor', + 'fiware-servicepath': '/gardens' + } + }; + + config.logLevel = 'INFO'; + + nock.cleanAll(); + + contextBrokerMock = nock('http://192.168.1.1:1026') + .matchHeader('fiware-service', 'smartgondor') + .matchHeader('fiware-servicepath', '/gardens') + .post('/v2/registrations') + .reply(201, null, { Location: '/v2/registrations/6319a7f5254b05844116584d' }); + + iotagentMqtt.start(config, function () { + request(provisionOptions, function (error, response, body) { + done(); + }); + }); + }); + + afterEach(function (done) { + nock.cleanAll(); + async.series([iotAgentLib.clearAll, iotagentMqtt.stop], done); + }); + + describe('When a command arrive to the Agent for a device with the HTTP protocol', function () { + const commandOptions = { + url: 'http://localhost:' + config.iota.server.port + '/v2/op/update', + method: 'POST', + json: utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateCommand1.json'), + headers: { + 'fiware-service': 'smartgondor', + 'fiware-servicepath': '/gardens' + } + }; + + beforeEach(function () { + contextBrokerMock + .matchHeader('fiware-service', 'smartgondor') + .matchHeader('fiware-servicepath', '/gardens') + .post( + '/v2/entities?options=upsert', + utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus1.json') + ) + .reply(204); + + contextBrokerMock + .matchHeader('fiware-service', 'smartgondor') + .matchHeader('fiware-servicepath', '/gardens') + .post( + '/v2/entities?options=upsert', + utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus6.json') + ) + .reply(204); + + mockedClientServer = nock('http://localhost:9876') + .post('/command', function (body) { + return body === 'MQTT_2@PING|MQTT_2AnMQTTDevice'; + }) + .reply(200, 'MQTT_2@PING|data=22'); + }); + + it('should return a 204 OK without errors', function (done) { + request(commandOptions, function (error, response, body) { + should.not.exist(error); + response.statusCode.should.equal(204); + done(); + }); + }); + it('should update the status in the Context Broker', function (done) { + request(commandOptions, function (error, response, body) { + setTimeout(function () { + contextBrokerMock.done(); + done(); + }, 100); + }); + }); + it('should publish the command information in the MQTT topic', function (done) { + request(commandOptions, function (error, response, body) { + setTimeout(function () { + mockedClientServer.done(); + done(); + }, 100); + }); + }); + }); +}); diff --git a/test/unit/ngsiv2/contextRequests/notifyCommand.json b/test/unit/ngsiv2/contextRequests/notifyCommand.json new file mode 100644 index 00000000..690617fe --- /dev/null +++ b/test/unit/ngsiv2/contextRequests/notifyCommand.json @@ -0,0 +1,64 @@ +{ + "subscriptionId": "60b0cedd497e8b681d40b58e", + "data": [{ + "id": "123456abcdefg", + "type": "cmd1Execution", + "targetEntityId": { + "type": "Text", + "value": "Second MQTT Device", + "metadata": {} + }, + "targetEntityType": { + "type": "Text", + "value": "AnMQTTDevice", + "metadata": {} + }, + "execTs": { + "type": "DateTime", + "value": "2020-05-27T00:00:00.000Z", + "metadata": {} + }, + "cmd": { + "type": "Text", + "value": "PING", + "metadata": {} + }, + "params": { + "type": "Text", + "value": { "data": "22" }, + "metadata": {} + }, + "status": { + "type": "Text", + "value": "FORWARDED", + "metadata": {} + }, + "info": { + "type": "Text", + "value": null, + "metadata": {} + }, + "onDelivered": { + "type": "Request" + }, + "onOk": { + "type": "Request" + }, + "onError": { + "type": "Request" + }, + "onInfo": { + "type": "Request" + }, + "cmdExecution": { + "type": "value", + "value": true, + "metadata": {} + }, + "dateExpiration": { + "type": "DateTime", + "value": "2030-05-27T20:00:00.000Z", + "metadata": {} + } + }] +}