Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): add mqtt #4

Merged
merged 5 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
"@japa/expect-type": "^2.0.2",
"@japa/runner": "^3.1.4",
"@swc/core": "^1.5.7",
"@testcontainers/hivemq": "^10.9.0",
"@testcontainers/redis": "^10.9.0",
"@types/node": "^20.12.12",
"@types/object-hash": "^3.0.6",
"c8": "^9.1.0",
"del-cli": "^5.1.0",
"eslint": "^8.57.0",
"ioredis": "^5.4.1",
"mqtt": "^5.6.1",
"prettier": "^3.2.5",
"release-it": "^17.2.1",
"ts-node": "^10.9.2",
Expand Down
95 changes: 95 additions & 0 deletions src/transports/mqtt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* @boringnode/bus
*
* @license MIT
* @copyright Boring Node
*/

import Mqtt from 'mqtt'
import { assert } from '@poppinss/utils/assert'

import debug from '../debug.js'
import {
Transport,
TransportEncoder,
TransportMessage,
Serializable,
SubscribeHandler,
MqttProtocol,
MqttTransportConfig,
} from '../types/main.js'
import { JsonEncoder } from '../encoders/json_encoder.js'

export function mqtt(config: MqttTransportConfig) {
return () => new MqttTransport(config)
}

export class MqttTransport implements Transport {
#id: string | undefined
#client: any
#url: string
readonly #encoder: TransportEncoder

constructor(options: MqttTransportConfig, encoder?: TransportEncoder) {
this.#encoder = encoder ?? new JsonEncoder()
this.#url = `${options.protocol || MqttProtocol.MQTT}://${options.host}${options.port ? `:${options.port}` : ''}`

this.#client = Mqtt.connect(this.#url)
}

setId(id: string): Transport {
this.#id = id

return this
}

async disconnect(): Promise<void> {
await this.#client.endAsync()
}

async publish(channel: string, message: any): Promise<void> {
assert(this.#id, 'You must set an id before publishing a message')

const encoded = this.#encoder.encode({ payload: message, busId: this.#id })

await this.#client.publishAsync(channel, encoded)
}

async subscribe<T extends Serializable>(
channel: string,
handler: SubscribeHandler<T>
): Promise<void> {
this.#client.subscribe(channel, (err) => {
if (err) {
throw err
}
})

this.#client.on('message', (receivedChannel: string, message: Buffer | string) => {
if (channel !== receivedChannel) return

debug('received message for channel "%s"', channel)

const data = this.#encoder.decode<TransportMessage<T>>(message)

/**
* Ignore messages published by this bus instance
*/
if (data.busId === this.#id) {
debug('ignoring message published by the same bus instance')
return
}

// @ts-expect-error - TODO: Weird typing issue
handler(data.payload)
})
}

onReconnect(): void {
this.#client.reconnect(this.#url)
}

async unsubscribe(channel: string): Promise<void> {
await this.#client.unsubscribeAsync(channel)
}
}
18 changes: 18 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ export interface RedisTransportConfig extends RedisOptions {
useMessageBuffer?: boolean
}

export enum MqttProtocol {
MQTT = 'mqtt',
MQTTS = 'mqtts',
TCP = 'tcp',
TLS = 'tls',
WS = 'ws',
WSS = 'wss',
WXS = 'wxs',
ALIS = 'alis',
}

export interface MqttTransportConfig {
host: string
port?: number
protocol?: MqttProtocol
qos?: number
}

export interface Transport {
setId: (id: string) => Transport
onReconnect: (callback: () => void) => void
Expand Down
134 changes: 134 additions & 0 deletions tests/drivers/mqtt_transport.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* @boringnode/bus
*
* @license MIT
* @copyright Boring Node
*/

import { setTimeout } from 'node:timers/promises'
import { test } from '@japa/runner'
import { HiveMQContainer, StartedHiveMQContainer } from '@testcontainers/hivemq'
import { MqttTransport } from '../../src/transports/mqtt.js'
import { JsonEncoder } from '../../src/encoders/json_encoder.js'
import { TransportEncoder, TransportMessage } from '../../src/types/main.js'

test.group('Mqtt Transport', (group) => {
let container: StartedHiveMQContainer

group.setup(async () => {
container = await new HiveMQContainer().start()

return async () => {
await container.stop()
}
})

test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => {
const transport = new MqttTransport({
host: container.getHost(),
port: container.getPort(),
}).setId('bus')
cleanup(() => transport.disconnect())

await transport.subscribe('testing-channel', () => {
assert.fail('Bus should not receive message emitted by itself')
})

await transport.publish('testing-channel', 'test')
await setTimeout(1000)
}).disableTimeout()

test('transport should receive message emitted by another bus', async ({
assert,
cleanup,
}, done) => {
assert.plan(1)

const transport1 = new MqttTransport({
host: container.getHost(),
port: container.getPort(),
}).setId('bus1')
const transport2 = new MqttTransport({
host: container.getHost(),
port: container.getPort(),
}).setId('bus2')

cleanup(async () => {
await transport1.disconnect()
await transport2.disconnect()
})

await transport1.subscribe('testing-channel', (payload) => {
assert.equal(payload, 'test')
done()
})

await setTimeout(200)

await transport2.publish('testing-channel', 'test')
}).waitForDone()

test('message should be encoded and decoded correctly when using JSON encoder', async ({
assert,
cleanup,
}) => {
const transport = new MqttTransport(
{
host: container.getHost(),
port: container.getPort(),
},
new JsonEncoder()
).setId('bus')
cleanup(() => transport.disconnect())

const data = { test: 'test' }

await transport.subscribe('testing-channel', (payload) => {
assert.deepEqual(payload, data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is invalid : If the subscribe is never called then the assertion will never be executed and the test will create a false positive. We should use done or the assertion plannings

https://japa.dev/docs/plugins/assert#plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is the "message should be encoded and decoded correctly when using JSON encoder" test from Redis also invalid? The test doesn't check assert.plan(1) because there is no done() call, and it uses only one transport, which can't receive its proper data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one ?
https://github.com/boringnode/bus/blob/main/tests/drivers/redis_transport.spec.ts#L80

Yeah, definitely invalid too. This line will never be called https://github.com/boringnode/bus/blob/main/tests/drivers/redis_transport.spec.ts#L92

and this is a good example of what I was trying to explain. The test passes even though the assertion was never called

})

await setTimeout(200)

await transport.publish('testing-channel', data)
})

test('send binary data', async ({ assert, cleanup }, done) => {
assert.plan(1)

class BinaryEncoder implements TransportEncoder {
encode(message: TransportMessage<any>) {
return Buffer.from(JSON.stringify(message))
}

decode(data: string | Buffer) {
const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary')
return JSON.parse(buffer.toString())
}
}

const transport1 = new MqttTransport(
{ host: container.getHost(), port: container.getMappedPort(1883) },
new BinaryEncoder()
).setId('bus1')

const transport2 = new MqttTransport(
{ host: container.getHost(), port: container.getMappedPort(1883) },
new BinaryEncoder()
).setId('bus2')

cleanup(() => {
transport1.disconnect()
transport2.disconnect()
})

const data = ['foo', '👍']

await transport1.subscribe('testing-channel', (payload) => {
assert.deepEqual(payload, data)
done()
})

await setTimeout(200)
await transport2.publish('testing-channel', data)
}).waitForDone()
})