Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Added publishing options in both redis and mongodb.
Browse files Browse the repository at this point in the history
Updated test harness.
RedisAscoltatore now sends JSONs.
  • Loading branch information
mcollina committed May 2, 2014
1 parent 8fe025d commit cb3d15d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 12 deletions.
7 changes: 4 additions & 3 deletions lib/mongo_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,15 @@ var MongoAscoltatore = function(opts) {
*/
MongoAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

MongoAscoltatore.prototype.publish = function(topic, message, done) {
MongoAscoltatore.prototype.publish = function(topic, message, options, done) {
this._raiseIfClosed();
message = message || "";
done = done || function() {};

var messageObj = {
value: message,
topic: topic
topic: topic,
options: options
};

debug('publishing message: ' + message + ' on ' + topic + '...');
Expand Down Expand Up @@ -221,7 +222,7 @@ MongoAscoltatore.prototype._more = function(latest) {
value = doc.value;
}

that._ascoltatore.publish(doc.topic, value);
that._ascoltatore.publish(doc.topic, value, doc.options);
}
}));
};
Expand Down
24 changes: 20 additions & 4 deletions lib/redis_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ RedisAscoltatore.prototype._startSub = function() {
that._updateReady("_ready_sub");
});

handler = function(sub, topic, message) {
handler = function(sub, topic, payload) {
debug("new message received for topic " + topic);
util.defer(function() {
// we need to skip out this callback, so we do not
Expand All @@ -133,8 +133,14 @@ RedisAscoltatore.prototype._startSub = function() {
topic = topic.replace(new RegExp('/$'), '');
topic = that._recvTopic(topic);

payload = JSON.parse(payload);

if (payload.binary) {
payload.message = new Buffer(payload.message, 'hex');
}

if (ascoltatore) {
ascoltatore.publish(topic, message);
ascoltatore.publish(topic, payload.message, payload.options);
} else {
debug("no ascoltatore for subscription: " + sub);
}
Expand Down Expand Up @@ -200,20 +206,30 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
ascoltatore.subscribe(topic, callback);
};

RedisAscoltatore.prototype.publish = function publish(topic, message, done) {
RedisAscoltatore.prototype.publish = function publish(topic, message, options, done) {
this._raiseIfClosed();

if (message === undefined || message === null) {
message = false; // so we can convert it to JSON
}

var payload = {
message: message,
options: options
};

if (Buffer.isBuffer(message)) {
payload.binary = true;
payload.message = message.toString('hex');
}

topic = this._pubTopic(topic);

if (!topic.match(new RegExp('/$'))) {
topic += '/';
}

this._client.publish(topic, message, function() {
this._client.publish(topic, JSON.stringify(payload), function() {
debug("new message published to " + topic);
util.defer(done);
});
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@
"David Halls <[email protected]>"
],
"devDependencies": {
"mocha": "~1.12.0",
"chai": "~1.7.0",
"mocha": "~1.18.2",
"chai": "~1.9.1",
"sinon": "~1.7.3",
"sinon-chai": "~2.4.0",
"optimist": "~0.6.0",
"async_bench": "~0.1.0",
"dox-foundation": "0.5.4",
"mosca": "git://github.com/mcollina/mosca.git",
"jshint": "~2.4.4",
"istanbul": "~0.1.40",
"coveralls": "~2.0.16",
"istanbul": "~0.2.8",
"coveralls": "~2.10.0",
"pre-commit": "0.0.4"
},
"dependencies": {
Expand Down
12 changes: 12 additions & 0 deletions test/mongo_ascoltatore_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,16 @@ describeAscoltatore("mongo", function() {
});
});
});

it("should publish with options", function(done) {
var that = this;
that.instance.subscribe("hello/*", function(topic, value, options) {
expect(value).to.equal("42");
expect(options.qos).to.equal(1);
expect(options.messageId).to.equal(5);
done();
}, function() {
that.instance.publish("hello/123", "42", { qos: 1, messageId: 5 });
});
});
});
14 changes: 13 additions & 1 deletion test/redis_ascoltatore_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describeAscoltatore("redis", function() {
var that = this;
var expected = fs.readFileSync(__dirname + "/image.png");
that.instance.sub("image", function(topic, value) {
expect(value).to.eql(expected);
expect(value).to.be.deep.equal(expected);
done();
}, function() {
that.instance.pub("image", expected);
Expand Down Expand Up @@ -64,4 +64,16 @@ describeAscoltatore("redis", function() {
that.instance.publish("hello");
});
});

it("should publish with options", function(done) {
var that = this;
that.instance.subscribe("hello/*", function(topic, value, options) {
expect(value).to.equal("42");
expect(options.qos).to.equal(1);
expect(options.messageId).to.equal(5);
done();
}, function() {
that.instance.publish("hello/123", "42", { qos: 1, messageId: 5 });
});
});
});

0 comments on commit cb3d15d

Please sign in to comment.