A base worker class for connecting to and dealing with RabbitMQ.
This is mostly a convenience wrapper around amqplib.
npm install @showrunner/amqp-worker
const QueueWorker = require('@showrunner/amqp-worker')
class MyWorker extends QueueWorker {
constructor () {
super()
this.queue = 'my-queue'
}
messageHandler (msg) {
const data = msg.content
this.channel.ack(data)
}
}
const worker = new MyWorker()
worker.listen()
.then(() => console.log('listening!'))
.catch((err) => console.log('ERROR!', err))
If you're not into ES6 Classes, you can use a function and the prototype chain
to manage this as well. Since this is an ES6 class however, prototyping
requires the use of Reflect
:
const QueueWorker = require('@showrunner/amqp-worker')
function MyWorker () {
Object.assign(this, Reflect.construct(QueueWorker, arguments, MyWorker))
this.queue = 'my-queue'
}
Reflect.setPrototypeOf(MyWorker.prototype, QueueWorker.prototype)
MyWorker.prototype.messageHandler = function (msg) {
const data = msg.content
this.channel.ack(data)
}
const worker = new MyWorker()
worker.listen()
.then(() => console.log('listening!'))
.catch((err) => console.log('ERROR!', err))
The upside to this construct is that it works for both ES6 style classes and traditional JS function prototypes.
Create a new instance of a queue worker. Optionally pass in a hostname and a port for the RabbitMQ server. These may also be specified in environment variables:
RABBIT_MQ_HOST
- default:localhost
RABBIT_MQ_POST
- default:5672
An object of objects may be passed in to provide default options for asserting a queue, consuming a queue or sending a message to a queue. Options may be overridden or additional options may be provided at runtime
opts.assertOpts
- default options forthis.channel.assertQueue
opts.consumeOpts
- default options forthis.channel.consume
opts.sendOpts
- default options forthis.channel.sendToQueue
All queue worker instances must implement:
messageHandler(msg:object):undefined
- the message handlerqueue:string
- the name of the queue
They may as well implement:
handleError(err:Error):undefined
- error handler (default: throws errors)beforeDisconnect():Promise
- do something before disconnectionserializeMessage():Buffer
- called on the msg passed in tosendMessage
. noop by default, allows you to control how messages get changed into Buffers
Connect to the specified RabbitMQ server and attach a listener for error messages on the connection
Create a channel on the active connection, or, make a connection if one doesn't exist and create a channel
Disconnect the channel and the server, running the optional beforeDisconnect
handler.
Assert the specified queue and attach the messageHandler as a queue consumer.
Will create a connection and/or a channel as necessary. The options for assertQueue
and consume
are the same as for the amqplib functions.
Send a message to the queue, using the specified options, if any. Your message
must either be an instance of the Buffer object, or you must have overridden
QueueWorker#serializeMessage
to return a Buffer.
You may use a different channel to send a message to a queue by passing in a channel
key on the sendOpts
object.
This MUST be implemented!
Handle messages coming in from RabbitMQ. The msg
object is the same
as provided for the consume
method in amqplib:
{
content: Buffer,
fields: Object,
properties: Object
}
You can use this.channel.ack
and this.channel.nack
(or any of the channel
methods) to tell the server you've handled (or rejected) the message.
This MAY be implemented
Do something before disconnecting the channel and the connecftion
This MAY be implemented
Do something with any errors that might be thrown
What queue to use when asserting
Copyright © 2018, Scripto LLC. Apache-2.0 licensed.