Skip to content

Commit

Permalink
changed listen so it now adds nack for adding messages back to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCox007 committed Nov 21, 2017
1 parent e6c5f19 commit 78b3958
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ let connection;
}
});
};*/
function connect(config,cb)
{
const connect = (config,cb) => {

exchange = config.exchange;
exchangeFanout = config.exchange+".fanout";
Expand Down Expand Up @@ -72,7 +71,7 @@ function connect(config,cb)
});
}

const publish = (msg,key) => {
const publish = (msg, key) => {
return new Promise(function(resolve, reject) {
try {

Expand Down Expand Up @@ -150,7 +149,7 @@ const generateUuid = () => {
Math.random().toString();
};

function RPCListen(queue,cb, ...args) {
const RPCListen = (queue,cb, ...args) => {
const q = "RPC." + queue;
channel.assertQueue(q, {durable: false});
channel.prefetch(1);
Expand Down Expand Up @@ -182,20 +181,20 @@ function RPCListen(queue,cb, ...args) {
});
}(msg))
});
}
function listen(queue,key,cb){
};
const listen = (queue,key,cb) => {
channel.assertQueue(queue, {durable:true},function(err, q) {
console.log(' [*] Waiting for data on'+q.queue);
channel.bindQueue(q.queue, exchange, key);
// channel.bindQueue(q.queue, exchangeFanout, key);
//Fetch 5 messages in a time and wait for ack on those
channel.prefetch(5);
channel.consume(q.queue, function(msg) {
cb(function(channel,msg) {channel.ack(msg);}.bind(this,channel,msg),msg.content.toString());
cb(() => {channel.ack(msg);},() => {channel.nack();},msg.content.toString());
}, {noAck: false});
});
}
function listenFanout(queue,key,cb){
};
const listenFanout = (queue,key,cb) => {
channel.assertQueue(queue, {durable:true},function(err, q) {
console.log(' [*] Waiting for data on'+q.queue);

Expand All @@ -206,7 +205,7 @@ function listenFanout(queue,key,cb){
cb(() => {channel.ack(msg)},msg.content.toString());
}, {noAck: false});
});
}
};



Expand Down

0 comments on commit 78b3958

Please sign in to comment.