Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deferPublish should accept string msg as well #304

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ options object.
The jitter applied to the start of querying lookupd instances periodically.
* ```lowRdyTimeout: 50``` <br/>
The timeout in milliseconds for switching between connections when the Reader
maxInFlight is less than the number of connected NSQDs.
maxInFlight is less than the number of connected NSQDs.
* ```tls: false``` <br/>
Use TLS if nsqd has TLS support enabled.
* ```tlsVerification: true``` <br/>
Expand Down Expand Up @@ -158,8 +158,9 @@ These methods are available on a Writer object:
* `publish(topic, msgs, [callback])` <br/>
`topic` is a string. `msgs` is either a string, a `Buffer`, JSON serializable
object, a list of strings / `Buffers` / JSON serializable objects. `callback` takes a single `error` argument.
* `deferPublish(topic, msg, timeMs, [callback])` <br/>
`topic` is a string. `msg` is either a string, a `Buffer`, JSON serializable object. `timeMs` is the delay by which the message should be delivered. `callback` takes a single `error` argument.
* `deferPublish(topic, msgs, timeMs, [callback])` <br/>
`topic` is a string. `msgs` is either a string, a `Buffer`, JSON serializable
object, a list of strings / `Buffers` / JSON serializable objects. `timeMs` is the delay by which the message should be delivered, and `-max-req-timeout` on NSQD must larger than `timeMS`. `callback` takes a single `error` argument.

### Simple example

Expand Down Expand Up @@ -274,7 +275,7 @@ w.on('ready', () => {
w.publish('sample_topic', 'it really tied the room together')
w.deferPublish('sample_topic', ['This message gonna arrive 1 sec later.'], 1000)
w.publish('sample_topic', [
'Uh, excuse me. Mark it zero. Next frame.',
'Uh, excuse me. Mark it zero. Next frame.',
'Smokey, this is not \'Nam. This is bowling. There are rules.'
])
w.publish('sample_topic', 'Wu?', err => {
Expand Down Expand Up @@ -356,7 +357,7 @@ w.on('closed', () => {
* Bug: Non-fatal nsqd errors would cause RDY count to decrease and never
return to normal. This will happen for example when finishing messages
that have exceeded their amount of time to process a message.
*
*
* **0.7.10**
* Properly handles non-string errors
* **0.7.9**
Expand Down
15 changes: 11 additions & 4 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class Writer extends EventEmitter {
* of the messages should either be strings or buffers with the payload encoded.

* @param {String} topic
* @param {String|Buffer|Object} msg - A string, a buffer, a
* @param {String|Buffer|Object|Array} msg - A string, a buffer, a
* JSON serializable object, or a list of string / buffers /
* JSON serializable objects.
* @param {Number} timeMs - defer time
Expand All @@ -152,7 +152,7 @@ class Writer extends EventEmitter {
*/
deferPublish(topic, msg, timeMs, callback) {
let err = this._checkStateValidity()
err = err || this._checkMsgsValidity(msg)
err = err || this._checkMsgsValidity(msgs)
err = err || this._checkTimeMsValidity(timeMs)

if (err) {
Expand All @@ -163,12 +163,19 @@ class Writer extends EventEmitter {
if (!this.ready) {
const onReady = (err) => {
if (err) return callback(err)
this.deferPublish(topic, msg, timeMs, callback)
this.deferPublish(topic, msgs, timeMs, callback)
}
this._callwhenReady(onReady)
}

return this.conn.produceMessages(topic, msg, timeMs, callback)
if (!_.isArray(msgs)) {
msgs = [msgs]
}

// Automatically serialize as JSON if the message isn't a String or a Buffer
msgs = msgs.map(this._serializeMsg)

return this.conn.produceMessages(topic, msgs, timeMs, callback)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion test/writer_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('writer', () => {
const topic = 'test_topic'
const msg = 'hello world!'

writer.publish(topic, msg, 300, () => {
writer.deferPublish(topic, msg, 300, () => {
should.equal(writer.conn.produceMessages.calledOnce, true)
should.equal(writer.conn.produceMessages.calledWith(topic, [msg]), true)
})
Expand Down