From ee7fd6c3802396637b51fa4778d0f281ce1b8f61 Mon Sep 17 00:00:00 2001 From: Fabrice Ongenae Date: Sun, 23 Apr 2017 14:56:50 +0200 Subject: [PATCH] handle multi-responses with socketio on different channels --- lib/engine_socketio.js | 64 ++++++++++++++++++++++---------- test/scripts/hello_socketio.json | 1 + test/targets/simple_socketio.js | 6 ++- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/lib/engine_socketio.js b/lib/engine_socketio.js index 0db9246..3974f89 100644 --- a/lib/engine_socketio.js +++ b/lib/engine_socketio.js @@ -44,7 +44,7 @@ function markEndTime(ee, context, startedAt) { } function isResponseRequired(spec) { - return (spec.emit && spec.emit.response && spec.emit.response.channel); + return (spec.emit && spec.emit.response && (spec.emit.response.channel || (_.isArray(spec.emit.response) && !_.isEmpty(spec.emit.response)))); } function isAcknowledgeRequired(spec) { @@ -193,32 +193,58 @@ SocketIoEngine.prototype.step = function (requestSpec, ee) { }; if (isResponseRequired(requestSpec)) { - let response = { - channel: template(requestSpec.emit.response.channel, context), - data: template(requestSpec.emit.response.data, context), - capture: template(requestSpec.emit.response.capture, context), - match: template(requestSpec.emit.response.match, context) - }; - // Listen for the socket.io response on the specified channel - let done = false; - socketio.on(response.channel, function receive(data) { - done = true; - processResponse(ee, data, response, context, function(err) { - if (!err) { - markEndTime(ee, context, startedAt); - } - // Stop listening on the response channel - socketio.off(response.channel); - return endCallback(err, context); + let requiredResponses = requestSpec.emit.response; + if (!_.isArray(requiredResponses)) { + requiredResponses = [requestSpec.emit.response]; + } + + // Group required responses by channel in order to keep open channels with multiple messages + const channelsResponses = _.groupBy(requiredResponses, 'channel'); + + let requiredResponsesCount = 0; + let currentResponsesCount = 0; + let requiredResponsesCountsByChannel = {}; + _.forEach(channelsResponses, function(channelResponses, channel) { + requiredResponsesCountsByChannel[channel] = 0; + // Expect that messages should be sent in the same order + const responses = []; + _.forEach(channelResponses, function(channelResponse, idx) { + requiredResponsesCountsByChannel[channel]++; + requiredResponsesCount++; + responses.push({ + channel: template(channelResponse.channel, context), + data: template(channelResponse.data, context), + capture: template(channelResponse.capture, context), + match: template(channelResponse.match, context) + }); + }); + + let index = -1; + // Listen for the socket.io response on the specified channel + socketio.on(channel, function receive(data) { + index++; + currentResponsesCount++; + processResponse(ee, data, responses[index], context, function(err) { + if (!err) { + markEndTime(ee, context, startedAt); + } + // Stop listening on the response channel only if all messages were received + if (requiredResponsesCountsByChannel[channel] - 1 === index) { + socketio.off(channel); + } + return callback(err, context); + }); }); }); + // Send the data on the specified socket.io channel socketio.emit(outgoing.channel, outgoing.data); // If we don't get a response within the timeout, fire an error let waitTime = self.config.timeout || 10; waitTime *= 1000; setTimeout(function responseTimeout() { - if (!done) { + // Check if all messages have been received + if (requiredResponsesCount !== currentResponsesCount) { let err = 'response timeout'; ee.emit('error', err); return callback(err, context); diff --git a/test/scripts/hello_socketio.json b/test/scripts/hello_socketio.json index 0b41f85..844f925 100644 --- a/test/scripts/hello_socketio.json +++ b/test/scripts/hello_socketio.json @@ -28,6 +28,7 @@ {"emit": { "channel": "echo", "data": "ping", "acknowledge": {"match": {"value": "pong"}} }}, {"think": 1}, {"emit": { "channel": "echo", "data": "ping", "acknowledge": {"match": {"json": "$.1.answer", "value": 42}} }}, + {"emit": { "channel": "echo", "data": {"key": "{{ rand }}", "sendAck": true}, "response": [{ "channel": "ack", "match": {"json": "$.key", "value": "{{ rand }}" } }, { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }]}}, {"think": 1} ] } diff --git a/test/targets/simple_socketio.js b/test/targets/simple_socketio.js index 9570046..510d6e0 100644 --- a/test/targets/simple_socketio.js +++ b/test/targets/simple_socketio.js @@ -58,7 +58,11 @@ function createServer() { ws.on('echo', function incoming(message, cb) { MESSAGE_COUNT++; if (message === 'ping') { - cb("pong", {answer: 42}); + cb('pong', {answer: 42}); + } + if (message.sendAck === true) { + console.log('Socket.io sending message ack: ', message); + ws.emit('ack', message); } debug('Socket.io echoing message: %s', message); ws.emit('echoed', message);