-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(transport): add mqtt #4
Merged
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
8226b62
feat(transport): add mqtt
MaximeMRF ea3608f
refacto(mqtt): handle options
MaximeMRF a6e2089
test(redis): wait the test to finish properly
MaximeMRF 33b01e1
tests(mqtt): correct tests and add two broker
MaximeMRF 675695d
docs(mqtt)
MaximeMRF File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/** | ||
* @boringnode/bus | ||
* | ||
* @license MIT | ||
* @copyright Boring Node | ||
*/ | ||
|
||
import Mqtt from 'mqtt' | ||
import { assert } from '@poppinss/utils/assert' | ||
|
||
import debug from '../debug.js' | ||
import { | ||
Transport, | ||
TransportEncoder, | ||
TransportMessage, | ||
Serializable, | ||
SubscribeHandler, | ||
MqttProtocol, | ||
MqttTransportConfig, | ||
} from '../types/main.js' | ||
import { JsonEncoder } from '../encoders/json_encoder.js' | ||
|
||
export function mqtt(config: MqttTransportConfig) { | ||
return () => new MqttTransport(config) | ||
} | ||
|
||
export class MqttTransport implements Transport { | ||
#id: string | undefined | ||
#client: any | ||
#url: string | ||
readonly #encoder: TransportEncoder | ||
|
||
constructor(options: MqttTransportConfig, encoder?: TransportEncoder) { | ||
this.#encoder = encoder ?? new JsonEncoder() | ||
this.#url = `${options.protocol || MqttProtocol.MQTT}://${options.host}${options.port ? `:${options.port}` : ''}` | ||
|
||
this.#client = Mqtt.connect(this.#url) | ||
} | ||
|
||
setId(id: string): Transport { | ||
this.#id = id | ||
|
||
return this | ||
} | ||
|
||
async disconnect(): Promise<void> { | ||
await this.#client.endAsync() | ||
} | ||
|
||
async publish(channel: string, message: any): Promise<void> { | ||
assert(this.#id, 'You must set an id before publishing a message') | ||
|
||
const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) | ||
|
||
await this.#client.publishAsync(channel, encoded) | ||
} | ||
|
||
async subscribe<T extends Serializable>( | ||
channel: string, | ||
handler: SubscribeHandler<T> | ||
): Promise<void> { | ||
this.#client.subscribe(channel, (err) => { | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
|
||
this.#client.on('message', (receivedChannel: string, message: Buffer | string) => { | ||
if (channel !== receivedChannel) return | ||
|
||
debug('received message for channel "%s"', channel) | ||
|
||
const data = this.#encoder.decode<TransportMessage<T>>(message) | ||
|
||
/** | ||
* Ignore messages published by this bus instance | ||
*/ | ||
if (data.busId === this.#id) { | ||
debug('ignoring message published by the same bus instance') | ||
return | ||
} | ||
|
||
// @ts-expect-error - TODO: Weird typing issue | ||
handler(data.payload) | ||
}) | ||
} | ||
|
||
onReconnect(): void { | ||
this.#client.reconnect(this.#url) | ||
} | ||
|
||
async unsubscribe(channel: string): Promise<void> { | ||
await this.#client.unsubscribeAsync(channel) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/** | ||
* @boringnode/bus | ||
* | ||
* @license MIT | ||
* @copyright Boring Node | ||
*/ | ||
|
||
import { setTimeout } from 'node:timers/promises' | ||
import { test } from '@japa/runner' | ||
import { HiveMQContainer, StartedHiveMQContainer } from '@testcontainers/hivemq' | ||
import { MqttTransport } from '../../src/transports/mqtt.js' | ||
import { JsonEncoder } from '../../src/encoders/json_encoder.js' | ||
import { TransportEncoder, TransportMessage } from '../../src/types/main.js' | ||
|
||
test.group('Mqtt Transport', (group) => { | ||
let container: StartedHiveMQContainer | ||
|
||
group.setup(async () => { | ||
container = await new HiveMQContainer().start() | ||
|
||
return async () => { | ||
await container.stop() | ||
} | ||
}) | ||
|
||
test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => { | ||
const transport = new MqttTransport({ | ||
host: container.getHost(), | ||
port: container.getPort(), | ||
}).setId('bus') | ||
cleanup(() => transport.disconnect()) | ||
|
||
await transport.subscribe('testing-channel', () => { | ||
assert.fail('Bus should not receive message emitted by itself') | ||
}) | ||
|
||
await transport.publish('testing-channel', 'test') | ||
await setTimeout(1000) | ||
}).disableTimeout() | ||
|
||
test('transport should receive message emitted by another bus', async ({ | ||
assert, | ||
cleanup, | ||
}, done) => { | ||
assert.plan(1) | ||
|
||
const transport1 = new MqttTransport({ | ||
host: container.getHost(), | ||
port: container.getPort(), | ||
}).setId('bus1') | ||
const transport2 = new MqttTransport({ | ||
host: container.getHost(), | ||
port: container.getPort(), | ||
}).setId('bus2') | ||
|
||
cleanup(async () => { | ||
await transport1.disconnect() | ||
await transport2.disconnect() | ||
}) | ||
|
||
await transport1.subscribe('testing-channel', (payload) => { | ||
assert.equal(payload, 'test') | ||
done() | ||
}) | ||
|
||
await setTimeout(200) | ||
|
||
await transport2.publish('testing-channel', 'test') | ||
}).waitForDone() | ||
|
||
test('message should be encoded and decoded correctly when using JSON encoder', async ({ | ||
assert, | ||
cleanup, | ||
}) => { | ||
const transport = new MqttTransport( | ||
{ | ||
host: container.getHost(), | ||
port: container.getPort(), | ||
}, | ||
new JsonEncoder() | ||
).setId('bus') | ||
cleanup(() => transport.disconnect()) | ||
|
||
const data = { test: 'test' } | ||
|
||
await transport.subscribe('testing-channel', (payload) => { | ||
assert.deepEqual(payload, data) | ||
}) | ||
|
||
await setTimeout(200) | ||
|
||
await transport.publish('testing-channel', data) | ||
}) | ||
|
||
test('send binary data', async ({ assert, cleanup }, done) => { | ||
assert.plan(1) | ||
|
||
class BinaryEncoder implements TransportEncoder { | ||
encode(message: TransportMessage<any>) { | ||
return Buffer.from(JSON.stringify(message)) | ||
} | ||
|
||
decode(data: string | Buffer) { | ||
const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') | ||
return JSON.parse(buffer.toString()) | ||
} | ||
} | ||
|
||
const transport1 = new MqttTransport( | ||
{ host: container.getHost(), port: container.getMappedPort(1883) }, | ||
new BinaryEncoder() | ||
).setId('bus1') | ||
|
||
const transport2 = new MqttTransport( | ||
{ host: container.getHost(), port: container.getMappedPort(1883) }, | ||
new BinaryEncoder() | ||
).setId('bus2') | ||
|
||
cleanup(() => { | ||
transport1.disconnect() | ||
transport2.disconnect() | ||
}) | ||
|
||
const data = ['foo', '👍'] | ||
|
||
await transport1.subscribe('testing-channel', (payload) => { | ||
assert.deepEqual(payload, data) | ||
done() | ||
}) | ||
|
||
await setTimeout(200) | ||
await transport2.publish('testing-channel', data) | ||
}).waitForDone() | ||
}) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is invalid : If the
subscribe
is never called then the assertion will never be executed and the test will create a false positive. We should usedone
or the assertion planningshttps://japa.dev/docs/plugins/assert#plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, is the
"message should be encoded and decoded correctly when using JSON encoder"
test from Redis also invalid? The test doesn't checkassert.plan(1)
because there is nodone()
call, and it uses only one transport, which can't receive its proper data.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one ?
https://github.com/boringnode/bus/blob/main/tests/drivers/redis_transport.spec.ts#L80
Yeah, definitely invalid too. This line will never be called https://github.com/boringnode/bus/blob/main/tests/drivers/redis_transport.spec.ts#L92
and this is a good example of what I was trying to explain. The test passes even though the assertion was never called