Skip to content

Commit

Permalink
handle multi-responses with socketio on different channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabrice Ongenae committed Apr 24, 2017
1 parent e77ee2d commit 0c0f814
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 20 deletions.
64 changes: 45 additions & 19 deletions lib/engine_socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ function markEndTime(ee, context, startedAt) {
}

function isResponseRequired(spec) {
return (spec.emit && spec.emit.response && spec.emit.response.channel);
return (spec.emit && spec.emit.response && (spec.emit.response.channel || (_.isArray(spec.emit.response) && !_.isEmpty(spec.emit.response))));
}

function processResponse(ee, data, response, context, callback) {
Expand Down Expand Up @@ -162,32 +162,58 @@ SocketIoEngine.prototype.step = function (requestSpec, ee) {
};

if (isResponseRequired(requestSpec)) {
let response = {
channel: template(requestSpec.emit.response.channel, context),
data: template(requestSpec.emit.response.data, context),
capture: template(requestSpec.emit.response.capture, context),
match: template(requestSpec.emit.response.match, context)
};
// Listen for the socket.io response on the specified channel
let done = false;
socketio.on(response.channel, function receive(data) {
done = true;
processResponse(ee, data, response, context, function(err) {
if (!err) {
markEndTime(ee, context, startedAt);
}
// Stop listening on the response channel
socketio.off(response.channel);
return callback(err, context);
let requiredResponses = requestSpec.emit.response;
if (!_.isArray(requiredResponses)) {
requiredResponses = [requestSpec.emit.response];
}

// Group required responses by channel in order to keep open channels with multiple messages
const channelsResponses = _.groupBy(requiredResponses, 'channel');

let requiredResponsesCount = 0;
let currentResponsesCount = 0;
let requiredResponsesCountsByChannel = {};
_.forEach(channelsResponses, function(channelResponses, channel) {
requiredResponsesCountsByChannel[channel] = 0;
// Expect that messages should be sent in the same order
const responses = [];
_.forEach(channelResponses, function(channelResponse, idx) {
requiredResponsesCountsByChannel[channel]++;
requiredResponsesCount++;
responses.push({
channel: template(channelResponse.channel, context),
data: template(channelResponse.data, context),
capture: template(channelResponse.capture, context),
match: template(channelResponse.match, context)
});
});

let index = -1;
// Listen for the socket.io response on the specified channel
socketio.on(channel, function receive(data) {
index++;
currentResponsesCount++;
processResponse(ee, data, responses[index], context, function(err) {
if (!err) {
markEndTime(ee, context, startedAt);
}
// Stop listening on the response channel only if all messages were received
if (requiredResponsesCountsByChannel[channel] - 1 === index) {
socketio.off(channel);
}
return callback(err, context);
});
});
});

// Send the data on the specified socket.io channel
socketio.emit(outgoing.channel, outgoing.data);
// If we don't get a response within the timeout, fire an error
let waitTime = self.config.timeout || 10;
waitTime *= 1000;
setTimeout(function responseTimeout() {
if (!done) {
// Check if all messages have been received
if (requiredResponsesCount !== currentResponsesCount) {
let err = 'response timeout';
ee.emit('error', err);
return callback(err, context);
Expand Down
2 changes: 2 additions & 0 deletions test/scripts/hello_socketio.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
{"emit": { "channel": "echo", "data": {"key": "{{ $randomString(10) }}"}, "response": { "channel": "echoed", "capture": {"json": "$.key", "as": "rand" } }}},
{"think": 1},
{"emit": { "channel": "echo", "data": {"key": "{{ rand }}"}, "response": { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }}},
{"think": 1},
{"emit": { "channel": "echo", "data": {"key": "{{ rand }}", "sendAck": true}, "response": [{ "channel": "ack", "match": {"json": "$.key", "value": "{{ rand }}" } }, { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }]}},
{"think": 1}
]
}
Expand Down
6 changes: 5 additions & 1 deletion test/targets/simple_socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ io.on('connection', function connection(ws) {

ws.on('echo', function incoming(message) {
MESSAGE_COUNT++;
console.log('Socket.io echoing message: %s', message);
if (message.sendAck === true) {
console.log('Socket.io sending message ack: ', message);
ws.emit('ack', message);
}
console.log('Socket.io echoing message: ', message);
ws.emit('echoed', message);
});
});
Expand Down

0 comments on commit 0c0f814

Please sign in to comment.