Skip to content

Commit

Permalink
handle multi-responses with socketio on different channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabrice Ongenae authored and fabong committed May 22, 2018
1 parent 4a366e1 commit ee7fd6c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
64 changes: 45 additions & 19 deletions lib/engine_socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function markEndTime(ee, context, startedAt) {
}

function isResponseRequired(spec) {
return (spec.emit && spec.emit.response && spec.emit.response.channel);
return (spec.emit && spec.emit.response && (spec.emit.response.channel || (_.isArray(spec.emit.response) && !_.isEmpty(spec.emit.response))));
}

function isAcknowledgeRequired(spec) {
Expand Down Expand Up @@ -193,32 +193,58 @@ SocketIoEngine.prototype.step = function (requestSpec, ee) {
};

if (isResponseRequired(requestSpec)) {
let response = {
channel: template(requestSpec.emit.response.channel, context),
data: template(requestSpec.emit.response.data, context),
capture: template(requestSpec.emit.response.capture, context),
match: template(requestSpec.emit.response.match, context)
};
// Listen for the socket.io response on the specified channel
let done = false;
socketio.on(response.channel, function receive(data) {
done = true;
processResponse(ee, data, response, context, function(err) {
if (!err) {
markEndTime(ee, context, startedAt);
}
// Stop listening on the response channel
socketio.off(response.channel);
return endCallback(err, context);
let requiredResponses = requestSpec.emit.response;
if (!_.isArray(requiredResponses)) {
requiredResponses = [requestSpec.emit.response];
}

// Group required responses by channel in order to keep open channels with multiple messages
const channelsResponses = _.groupBy(requiredResponses, 'channel');

let requiredResponsesCount = 0;
let currentResponsesCount = 0;
let requiredResponsesCountsByChannel = {};
_.forEach(channelsResponses, function(channelResponses, channel) {
requiredResponsesCountsByChannel[channel] = 0;
// Expect that messages should be sent in the same order
const responses = [];
_.forEach(channelResponses, function(channelResponse, idx) {
requiredResponsesCountsByChannel[channel]++;
requiredResponsesCount++;
responses.push({
channel: template(channelResponse.channel, context),
data: template(channelResponse.data, context),
capture: template(channelResponse.capture, context),
match: template(channelResponse.match, context)
});
});

let index = -1;
// Listen for the socket.io response on the specified channel
socketio.on(channel, function receive(data) {
index++;
currentResponsesCount++;
processResponse(ee, data, responses[index], context, function(err) {
if (!err) {
markEndTime(ee, context, startedAt);
}
// Stop listening on the response channel only if all messages were received
if (requiredResponsesCountsByChannel[channel] - 1 === index) {
socketio.off(channel);
}
return callback(err, context);
});
});
});

// Send the data on the specified socket.io channel
socketio.emit(outgoing.channel, outgoing.data);
// If we don't get a response within the timeout, fire an error
let waitTime = self.config.timeout || 10;
waitTime *= 1000;
setTimeout(function responseTimeout() {
if (!done) {
// Check if all messages have been received
if (requiredResponsesCount !== currentResponsesCount) {
let err = 'response timeout';
ee.emit('error', err);
return callback(err, context);
Expand Down
1 change: 1 addition & 0 deletions test/scripts/hello_socketio.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
{"emit": { "channel": "echo", "data": "ping", "acknowledge": {"match": {"value": "pong"}} }},
{"think": 1},
{"emit": { "channel": "echo", "data": "ping", "acknowledge": {"match": {"json": "$.1.answer", "value": 42}} }},
{"emit": { "channel": "echo", "data": {"key": "{{ rand }}", "sendAck": true}, "response": [{ "channel": "ack", "match": {"json": "$.key", "value": "{{ rand }}" } }, { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }]}},
{"think": 1}
]
}
Expand Down
6 changes: 5 additions & 1 deletion test/targets/simple_socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ function createServer() {
ws.on('echo', function incoming(message, cb) {
MESSAGE_COUNT++;
if (message === 'ping') {
cb("pong", {answer: 42});
cb('pong', {answer: 42});
}
if (message.sendAck === true) {
console.log('Socket.io sending message ack: ', message);
ws.emit('ack', message);
}
debug('Socket.io echoing message: %s', message);
ws.emit('echoed', message);
Expand Down

0 comments on commit ee7fd6c

Please sign in to comment.