From 0c0f8144e6051b55c5d2af5aad6421e2e1af89d2 Mon Sep 17 00:00:00 2001 From: Fabrice Ongenae Date: Sun, 23 Apr 2017 14:56:50 +0200 Subject: [PATCH] handle multi-responses with socketio on different channels --- lib/engine_socketio.js | 64 ++++++++++++++++++++++---------- test/scripts/hello_socketio.json | 2 + test/targets/simple_socketio.js | 6 ++- 3 files changed, 52 insertions(+), 20 deletions(-) diff --git a/lib/engine_socketio.js b/lib/engine_socketio.js index 059736d..9a6c974 100644 --- a/lib/engine_socketio.js +++ b/lib/engine_socketio.js @@ -59,7 +59,7 @@ function markEndTime(ee, context, startedAt) { } function isResponseRequired(spec) { - return (spec.emit && spec.emit.response && spec.emit.response.channel); + return (spec.emit && spec.emit.response && (spec.emit.response.channel || (_.isArray(spec.emit.response) && !_.isEmpty(spec.emit.response)))); } function processResponse(ee, data, response, context, callback) { @@ -162,32 +162,58 @@ SocketIoEngine.prototype.step = function (requestSpec, ee) { }; if (isResponseRequired(requestSpec)) { - let response = { - channel: template(requestSpec.emit.response.channel, context), - data: template(requestSpec.emit.response.data, context), - capture: template(requestSpec.emit.response.capture, context), - match: template(requestSpec.emit.response.match, context) - }; - // Listen for the socket.io response on the specified channel - let done = false; - socketio.on(response.channel, function receive(data) { - done = true; - processResponse(ee, data, response, context, function(err) { - if (!err) { - markEndTime(ee, context, startedAt); - } - // Stop listening on the response channel - socketio.off(response.channel); - return callback(err, context); + let requiredResponses = requestSpec.emit.response; + if (!_.isArray(requiredResponses)) { + requiredResponses = [requestSpec.emit.response]; + } + + // Group required responses by channel in order to keep open channels with multiple messages + const channelsResponses = _.groupBy(requiredResponses, 'channel'); + + let requiredResponsesCount = 0; + let currentResponsesCount = 0; + let requiredResponsesCountsByChannel = {}; + _.forEach(channelsResponses, function(channelResponses, channel) { + requiredResponsesCountsByChannel[channel] = 0; + // Expect that messages should be sent in the same order + const responses = []; + _.forEach(channelResponses, function(channelResponse, idx) { + requiredResponsesCountsByChannel[channel]++; + requiredResponsesCount++; + responses.push({ + channel: template(channelResponse.channel, context), + data: template(channelResponse.data, context), + capture: template(channelResponse.capture, context), + match: template(channelResponse.match, context) + }); + }); + + let index = -1; + // Listen for the socket.io response on the specified channel + socketio.on(channel, function receive(data) { + index++; + currentResponsesCount++; + processResponse(ee, data, responses[index], context, function(err) { + if (!err) { + markEndTime(ee, context, startedAt); + } + // Stop listening on the response channel only if all messages were received + if (requiredResponsesCountsByChannel[channel] - 1 === index) { + socketio.off(channel); + } + return callback(err, context); + }); }); }); + // Send the data on the specified socket.io channel socketio.emit(outgoing.channel, outgoing.data); // If we don't get a response within the timeout, fire an error let waitTime = self.config.timeout || 10; waitTime *= 1000; setTimeout(function responseTimeout() { - if (!done) { + // Check if all messages have been received + if (requiredResponsesCount !== currentResponsesCount) { let err = 'response timeout'; ee.emit('error', err); return callback(err, context); diff --git a/test/scripts/hello_socketio.json b/test/scripts/hello_socketio.json index 4897b68..51f3a04 100644 --- a/test/scripts/hello_socketio.json +++ b/test/scripts/hello_socketio.json @@ -24,6 +24,8 @@ {"emit": { "channel": "echo", "data": {"key": "{{ $randomString(10) }}"}, "response": { "channel": "echoed", "capture": {"json": "$.key", "as": "rand" } }}}, {"think": 1}, {"emit": { "channel": "echo", "data": {"key": "{{ rand }}"}, "response": { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }}}, + {"think": 1}, + {"emit": { "channel": "echo", "data": {"key": "{{ rand }}", "sendAck": true}, "response": [{ "channel": "ack", "match": {"json": "$.key", "value": "{{ rand }}" } }, { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }]}}, {"think": 1} ] } diff --git a/test/targets/simple_socketio.js b/test/targets/simple_socketio.js index 5b31001..1b0524e 100644 --- a/test/targets/simple_socketio.js +++ b/test/targets/simple_socketio.js @@ -46,7 +46,11 @@ io.on('connection', function connection(ws) { ws.on('echo', function incoming(message) { MESSAGE_COUNT++; - console.log('Socket.io echoing message: %s', message); + if (message.sendAck === true) { + console.log('Socket.io sending message ack: ', message); + ws.emit('ack', message); + } + console.log('Socket.io echoing message: ', message); ws.emit('echoed', message); }); });