From 7a438b72695f8e310b0685a50ce876034161764b Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sun, 27 Apr 2014 18:47:45 +0200 Subject: [PATCH] Fixed missing message in RedisAscoltatore. Sending a message on topic 'aa' was not forwarded with subscribers of 'aa/#'. This was due to some missing topic-rewriting business which have been added. --- lib/behave_like_an_ascoltatore.js | 7 +++++++ lib/redis_ascoltatore.js | 23 +++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/lib/behave_like_an_ascoltatore.js b/lib/behave_like_an_ascoltatore.js index 8e319d2..410eac3 100644 --- a/lib/behave_like_an_ascoltatore.js +++ b/lib/behave_like_an_ascoltatore.js @@ -92,6 +92,13 @@ module.exports = function() { }); }); + it("should support multi-level wildcard at the end of a topic with no separator", function(done) { + var that = this; + that.instance.sub("hello/*", wrap(done), function() { + that.instance.pub("hello"); + }); + }); + it("should support single-level wildcard at start of topic", function(done) { var that = this; that.instance.sub("+/hello", wrap(done), function() { diff --git a/lib/redis_ascoltatore.js b/lib/redis_ascoltatore.js index 26ca892..5036331 100644 --- a/lib/redis_ascoltatore.js +++ b/lib/redis_ascoltatore.js @@ -130,9 +130,13 @@ RedisAscoltatore.prototype._startSub = function() { // we need to skip out this callback, so we do not // break the client when an exception occurs var ascoltatore = that._ascoltatores[sub]; + topic = topic.replace(new RegExp('/$'), ''); + topic = that._recvTopic(topic); if (ascoltatore) { - ascoltatore.publish(that._recvTopic(topic), message); + ascoltatore.publish(topic, message); + } else { + console.log("no ascoltatore for subscription: " + sub); } }); }; @@ -170,12 +174,17 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) var subTopic = this._subTopic(topic); + if (!subTopic.match(new RegExp('[/*]$'))) { + subTopic += '/'; + } + if (this._containsWildcard(topic)) { this._sub.psubscribe(subTopic, newDone); } else { this._sub.subscribe(subTopic, newDone); } + debug("redis subscription topic is " + subTopic); this._subs_counter.add(subTopic); var ascoltatore = this._ascoltatores[subTopic]; @@ -198,7 +207,13 @@ RedisAscoltatore.prototype.publish = function publish(topic, message, done) { message = false; // so we can convert it to JSON } - this._client.publish(this._pubTopic(topic), message, function() { + topic = this._pubTopic(topic); + + if (!topic.match(new RegExp('/$'))) { + topic += '/'; + } + + this._client.publish(topic, message, function() { debug("new message published to " + topic); util.defer(done); }); @@ -210,6 +225,10 @@ RedisAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d var isWildcard = this._containsWildcard(topic), subTopic = this._subTopic(topic); + if (!subTopic.match(new RegExp('[/*]$'))) { + subTopic += '/'; + } + this._subs_counter.remove(subTopic); var ascoltatore = this._ascoltatores[subTopic];