-
Notifications
You must be signed in to change notification settings - Fork 5
/
amqp.js
141 lines (132 loc) · 4.32 KB
/
amqp.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
const amqp = require('amqplib');
const stringifysafe = require('json-stringify-safe');
const queueSetup = require('./lib/queue-setup');
const { promisify } = require('util');
/**
* Class to contain an instantiated connection/channel to AMQP with a given
* config.
*/
class AMQPWrapper {
/**
* Instantiate an AMQP wrapper with a given config.
* @param {object} config
* @param {string} config.url
* @param {string} config.exchange
* @param {object} config.queue
* @param {string} config.queue.name
* @param {Array<string>|string} config.queue.routingKey
* @param {object} config.queue.options
*/
constructor (config) {
if (!config || !config.url || !config.exchange) {
throw new Error('simple-amqplib: Invalid config');
}
this.config = config;
this.connection = null;
this.channel = null;
this.prefetch = config.prefetch || 10;
}
/**
* @async
* @description Connects, establishes a channel, sets up exchange/queues/bindings/dead
* lettering.
* @returns {Promise}
*/
async connect () {
const { config } = this;
this.connection = await amqp.connect(config.url);
this.channel = await this.connection.createConfirmChannel();
this.channel.prefetch(this.prefetch);
await this.channel.assertExchange(config.exchange, 'topic', {});
if (config.queue && config.queue.name) {
return queueSetup.setupForConsume(this.channel, config);
}
}
/**
* @async
* @description
* Closes connection.
* @returns {Promise}
*/
async close () {
if (this.connection) {
return this.connection.close();
}
}
/**
* Publish a message using the specified routing key.
* @param {string} routingKey The name of the queue to use.
* @param {string} message The message to publish.
* @param {Object} options Any options to pass through to the underlying
* publish.
* @param {Function(err)} callback The callback to call when done.
*/
/**
* @async
* @description
* Publish a message to the given routing key, with given options.
* @param {string} routingKey
* @param {object|string} message
* @param {object} options
* @returns {Promise}
*/
async publish (routingKey, message, options = {}) {
if (typeof message === 'object') {
message = stringifysafe(message);
}
// NB: amqplib's ConfirmChannel.publish does not actually return a promise.
// See https://www.squaremobius.net/amqp.node/channel_api.html#flowcontrol
return promisify(this.channel.publish.bind(this.channel, this.config.exchange, routingKey, Buffer.from(message), options))();
}
/**
* @async
* Start consuming on the queue specified in the config you passed on
* instantiation, using the given message handler callback.
* @param {function} handleMessage
* @param {object} options
* @description
* handleMessage() is expected to be of the form:
* handleMessage(parsedMessage, callback).
* If callback is called with a non-null error, then the message will be
* nacked. You can call it like:
* callback(err, requeue) in order
* to instruct rabbit whether to requeue the message
* (or discard/dead letter).
*
* If not given, requeue is assumed to be false.
*
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
* @returns {Promise}
*/
async consume (handleMessage, _options = {}) {
const { returnRawMessage = false, ...options } = _options;
const { channel } = this;
function callback (message) {
function done (err, requeue) {
if (requeue === undefined) {
requeue = false;
}
if (err) {
return channel.nack(message, false, requeue);
}
channel.ack(message);
}
try {
const messagePayload = message.content.toString();
const parsedPayload = JSON.parse(messagePayload);
if (returnRawMessage) {
handleMessage(parsedPayload, message, done);
} else {
handleMessage(parsedPayload, done);
}
} catch (error) {
console.log(error);
// Do not requeue on exception - it means something unexpected
// (and prob. non-transitory) happened.
done(error, false);
}
}
return channel.consume(this.config.queue.name, callback, options);
}
}
module.exports = AMQPWrapper;