-
-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat!]: change to named exports to enable monkey patching #888
Comments
This could be an opportunity to revise how modules are exported and solve #878 by the same occasion. |
I will work on a PR soon. For reference i am copying the patch that i applied for aedes-otel-instrumentation. diff --git a/node_modules/aedes/aedes.js b/node_modules/aedes/aedes.js
index c02d289..668b162 100644
--- a/node_modules/aedes/aedes.js
+++ b/node_modules/aedes/aedes.js
@@ -7,7 +7,7 @@ const series = require('fastseries')
const { v4: uuidv4 } = require('uuid')
const reusify = require('reusify')
const { pipeline } = require('stream')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
const memory = require('aedes-persistence')
const mqemitter = require('mqemitter')
const Client = require('./lib/client')
@@ -45,6 +45,7 @@ function Aedes (opts) {
// +1 when construct a new aedes-packet
// internal track for last brokerCounter
this.counter = 0
+ this.concurrency = opts.concurrency
this.queueLimit = opts.queueLimit
this.connectTimeout = opts.connectTimeout
this.maxClientsIdLength = opts.maxClientsIdLength
@@ -52,24 +53,19 @@ function Aedes (opts) {
concurrency: opts.concurrency,
matchEmptyLevels: true // [MQTT-4.7.1-3]
})
- this.handle = function handle (conn, req) {
- conn.setMaxListeners(opts.concurrency * 2)
- // create a new Client instance for a new connection
- // return, just to please standard
- return new Client(that, conn, req)
- }
+
this.persistence = opts.persistence || memory()
this.persistence.broker = this
this._parallel = parallel()
this._series = series()
this._enqueuers = reusify(DoEnqueues)
- this.preConnect = opts.preConnect
- this.authenticate = opts.authenticate
- this.authorizePublish = opts.authorizePublish
- this.authorizeSubscribe = opts.authorizeSubscribe
- this.authorizeForward = opts.authorizeForward
- this.published = opts.published
+ this._preConnect = opts.preConnect
+ this._authenticate = opts.authenticate
+ this._authorizePublish = opts.authorizePublish
+ this._authorizeSubscribe = opts.authorizeSubscribe
+ this._authorizeForward = opts.authorizeForward
+ this._published = opts.published
this.decodeProtocol = opts.decodeProtocol
this.trustProxy = opts.trustProxy
@@ -250,6 +246,15 @@ function removeSharp (sub) {
return code !== 43 && code !== 35
}
+// assiging to prototype is a breaking change as it is required to bind the Aedes instance to the function
+// @example net.createServer(broker.handle.bind(broker)) or net.createServer((socket) => broker.handle(socket))
+Aedes.prototype.handle = function handle (conn, req) {
+ conn.setMaxListeners(this.concurrency * 2)
+ // create a new Client instance for a new connection
+ // return, just to please standard
+ return new Client(this, conn, req)
+}
+
function callPublished (_, done) {
this.broker.published(this.packet, this.client, done)
this.broker.emit('publish', this.packet, this.client)
@@ -338,6 +343,30 @@ Aedes.prototype.close = function (cb = noop) {
Aedes.prototype.version = require('./package.json').version
+Aedes.prototype.preConnect = function (client, packet, callback) {
+ this._preConnect(client, packet, callback)
+}
+
+Aedes.prototype.authenticate = function (client, username, password, callback) {
+ this._authenticate(client, username, password, callback)
+}
+
+Aedes.prototype.authorizePublish = function (client, packet, callback) {
+ this._authorizePublish(client, packet, callback)
+}
+
+Aedes.prototype.authorizeSubscribe = function (client, sub, callback) {
+ this._authorizeSubscribe(client, sub, callback)
+}
+
+Aedes.prototype.authorizeForward = function (client, packet) {
+ return this._authorizeForward(client, packet)
+}
+
+Aedes.prototype.published = function (packet, client, callback) {
+ this._published(packet, client, callback)
+}
+
function defaultPreConnect (client, packet, callback) {
callback(null, true)
}
diff --git a/node_modules/aedes/lib/client.js b/node_modules/aedes/lib/client.js
index 414d8e5..e525712 100644
--- a/node_modules/aedes/lib/client.js
+++ b/node_modules/aedes/lib/client.js
@@ -4,12 +4,12 @@ const mqtt = require('mqtt-packet')
const EventEmitter = require('events')
const util = require('util')
const eos = require('end-of-stream')
-const Packet = require('aedes-packet')
-const write = require('./write')
+const { Packet } = require('aedes-packet')
+const { write } = require('./write')
const QoSPacket = require('./qos-packet')
-const handleSubscribe = require('./handlers/subscribe')
-const handleUnsubscribe = require('./handlers/unsubscribe')
-const handle = require('./handlers')
+const { handleSubscribe } = require('./handlers/subscribe')
+const { handleUnsubscribe } = require('./handlers/unsubscribe')
+const { handle } = require('./handlers')
const { pipeline } = require('stream')
const { through } = require('./utils')
diff --git a/node_modules/aedes/lib/handlers/connect.js b/node_modules/aedes/lib/handlers/connect.js
index a4c32d0..bd2d8cb 100644
--- a/node_modules/aedes/lib/handlers/connect.js
+++ b/node_modules/aedes/lib/handlers/connect.js
@@ -2,10 +2,10 @@
const retimer = require('retimer')
const { pipeline } = require('stream')
-const write = require('../write')
+const { write } = require('../write')
const QoSPacket = require('../qos-packet')
const { through } = require('../utils')
-const handleSubscribe = require('./subscribe')
+const { handleSubscribe } = require('./subscribe')
const uniqueId = require('hyperid')()
function Connack (arg) {
@@ -264,4 +264,4 @@ function emptyQueueFilter (err, client, packet) {
}
}
-module.exports = handleConnect
+module.exports = { handleConnect }
diff --git a/node_modules/aedes/lib/handlers/index.js b/node_modules/aedes/lib/handlers/index.js
index a5dfaa8..b611293 100644
--- a/node_modules/aedes/lib/handlers/index.js
+++ b/node_modules/aedes/lib/handlers/index.js
@@ -1,13 +1,13 @@
'use strict'
-const handleConnect = require('./connect')
-const handleSubscribe = require('./subscribe')
-const handleUnsubscribe = require('./unsubscribe')
-const handlePublish = require('./publish')
-const handlePuback = require('./puback')
-const handlePubrel = require('./pubrel')
-const handlePubrec = require('./pubrec')
-const handlePing = require('./ping')
+const { handleConnect } = require('./connect')
+const { handleSubscribe } = require('./subscribe')
+const { handleUnsubscribe } = require('./unsubscribe')
+const { handlePublish } = require('./publish')
+const { handlePuback } = require('./puback')
+const { handlePubrel } = require('./pubrel')
+const { handlePubrec } = require('./pubrec')
+const { handlePing } = require('./ping')
function handle (client, packet, done) {
if (packet.cmd === 'connect') {
@@ -74,4 +74,4 @@ function finish (conn, packet, done) {
done(error)
}
-module.exports = handle
+module.exports = { handle }
diff --git a/node_modules/aedes/lib/handlers/ping.js b/node_modules/aedes/lib/handlers/ping.js
index a4c042c..69b3ded 100644
--- a/node_modules/aedes/lib/handlers/ping.js
+++ b/node_modules/aedes/lib/handlers/ping.js
@@ -1,6 +1,6 @@
'use strict'
-const write = require('../write')
+const { write } = require('../write')
const pingResp = {
cmd: 'pingresp'
}
@@ -10,4 +10,4 @@ function handlePing (client, packet, done) {
write(client, pingResp, done)
}
-module.exports = handlePing
+module.exports = { handlePing }
diff --git a/node_modules/aedes/lib/handlers/puback.js b/node_modules/aedes/lib/handlers/puback.js
index e4b419c..8376861 100644
--- a/node_modules/aedes/lib/handlers/puback.js
+++ b/node_modules/aedes/lib/handlers/puback.js
@@ -8,4 +8,4 @@ function handlePuback (client, packet, done) {
})
}
-module.exports = handlePuback
+module.exports = { handlePuback }
diff --git a/node_modules/aedes/lib/handlers/publish.js b/node_modules/aedes/lib/handlers/publish.js
index e30c9db..5c3e167 100644
--- a/node_modules/aedes/lib/handlers/publish.js
+++ b/node_modules/aedes/lib/handlers/publish.js
@@ -1,6 +1,6 @@
'use strict'
-const write = require('../write')
+const { write } = require('../write')
function PubAck (packet) {
this.cmd = 'puback'
@@ -62,4 +62,4 @@ function authorizePublish (packet, done) {
this.broker.authorizePublish(this, packet, done)
}
-module.exports = handlePublish
+module.exports = { handlePublish }
diff --git a/node_modules/aedes/lib/handlers/pubrec.js b/node_modules/aedes/lib/handlers/pubrec.js
index 5c914dd..dc7a7f0 100644
--- a/node_modules/aedes/lib/handlers/pubrec.js
+++ b/node_modules/aedes/lib/handlers/pubrec.js
@@ -1,6 +1,6 @@
'use strict'
-const write = require('../write')
+const { write } = require('../write')
function PubRel (packet) {
this.cmd = 'pubrel'
@@ -27,4 +27,4 @@ function handlePubrec (client, packet, done) {
}
}
-module.exports = handlePubrec
+module.exports = { handlePubrec }
diff --git a/node_modules/aedes/lib/handlers/pubrel.js b/node_modules/aedes/lib/handlers/pubrel.js
index 09dcc86..672b697 100644
--- a/node_modules/aedes/lib/handlers/pubrel.js
+++ b/node_modules/aedes/lib/handlers/pubrel.js
@@ -1,6 +1,6 @@
'use strict'
-const write = require('../write')
+const { write } = require('../write')
function ClientPacketStatus (client, packet) {
this.client = client
@@ -47,4 +47,4 @@ function pubrelDel (arg, done) {
persistence.incomingDelPacket(this.client, arg.packet, done)
}
-module.exports = handlePubrel
+module.exports = { handlePubrel }
diff --git a/node_modules/aedes/lib/handlers/subscribe.js b/node_modules/aedes/lib/handlers/subscribe.js
index 2470427..e2007aa 100644
--- a/node_modules/aedes/lib/handlers/subscribe.js
+++ b/node_modules/aedes/lib/handlers/subscribe.js
@@ -1,10 +1,10 @@
'use strict'
const fastfall = require('fastfall')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
const { through } = require('../utils')
const { validateTopic, $SYS_PREFIX } = require('../utils')
-const write = require('../write')
+const { write } = require('../write')
const subscribeTopicActions = fastfall([
authorize,
@@ -245,4 +245,4 @@ function completeSubscribe (err) {
function noop () { }
-module.exports = handleSubscribe
+module.exports = { handleSubscribe }
diff --git a/node_modules/aedes/lib/handlers/unsubscribe.js b/node_modules/aedes/lib/handlers/unsubscribe.js
index e08c317..b9cd7ef 100644
--- a/node_modules/aedes/lib/handlers/unsubscribe.js
+++ b/node_modules/aedes/lib/handlers/unsubscribe.js
@@ -1,6 +1,6 @@
'use strict'
-const write = require('../write')
+const { write } = require('../write')
const { validateTopic, $SYS_PREFIX } = require('../utils')
function UnSubAck (packet) {
@@ -101,4 +101,4 @@ function completeUnsubscribe (err) {
function noop () { }
-module.exports = handleUnsubscribe
+module.exports = { handleUnsubscribe }
diff --git a/node_modules/aedes/lib/qos-packet.js b/node_modules/aedes/lib/qos-packet.js
index 5527fe1..07c1581 100644
--- a/node_modules/aedes/lib/qos-packet.js
+++ b/node_modules/aedes/lib/qos-packet.js
@@ -1,6 +1,6 @@
'use strict'
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
const util = require('util')
function QoSPacket (original, client) {
diff --git a/node_modules/aedes/lib/write.js b/node_modules/aedes/lib/write.js
index 716d81a..a5d186c 100644
--- a/node_modules/aedes/lib/write.js
+++ b/node_modules/aedes/lib/write.js
@@ -21,4 +21,4 @@ function write (client, packet, done) {
setImmediate(done, error, client)
}
-module.exports = write
+module.exports = { write }
diff --git a/node_modules/aedes/types/client.d.ts b/node_modules/aedes/types/client.d.ts
index 2906213..c415fce 100644
--- a/node_modules/aedes/types/client.d.ts
+++ b/node_modules/aedes/types/client.d.ts
@@ -6,10 +6,10 @@ import {
Subscriptions,
UnsubscribePacket
} from './packet'
-import { Connection } from './instance'
+import Aedes, { Connection } from './instance'
import { EventEmitter } from 'node:events'
-export interface Client extends EventEmitter {
+export class Client extends EventEmitter {
id: Readonly<string>;
clean: Readonly<boolean>;
version: Readonly<number>;
@@ -19,6 +19,8 @@ export interface Client extends EventEmitter {
connected: Readonly<boolean>;
closed: Readonly<boolean>;
+ constructor(broker: Aedes, conn: Connection, req?: IncomingMessage)
+
on(event: 'connected', listener: () => void): this;
on(event: 'error', listener: (error: Error) => void): this;
|
This seems a massive change to do (not opposed). I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB |
Indeed, this implies some breaking changes. Aedes-packet does not need to be modified BUT mqtt-packet has to be patched at runtime to propagate the context and enable distributed traces, those traces are composed of multiple spans which SHOULD be related to trace the communication between multiple services. To reformulate your statement : Regarding your concern about multi process systems, the packets are stored by those systems (in aedes-persistence-X and aedes-emitter-X) right ? So as long as the context is correctly serialized into the packet it should be fine ? |
Changing mqtt-packet is a non starter unfortunately:(. |
Maybe i wasn’t clear, no change will be requested in the mqtt-packet source code. |
@mcollina what we would like to know is mostly if the approach could have some performance implications and/or if there are better alternatives |
I think the changes here are not really needed, as the "in-the-middle" approach would be sufficient. However I think it would make things easier. As for perf, it should be neutral when no monkeypatching is used. A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster. How do you plan to propagate the trace over the MQTT protocol? |
How do you imagine this design ?
As said in one of the message above:
|
IMHO this makes sense only with MQTT5 |
I agree that it would make less of a performance penalty for MQTT 5. For MQTT 3 if some users truly wish to propagate trace between their systems, i don't see another alternative (except prepending the whole MQTT packet with the trace context, in the same way this is done for the PROXY protocol). |
Is your feature request related to a problem? Please describe.
I am currently trying to build an OpenTelemetry instrumentation library for Aedes to enable traces to be completed with what happens inside Aedes.
OpenTelemetry provides a library that contains a base class which provide many helpers to patch the module to instrument.
It relies heavily on shimmer, require-in-the-middle and import-in-the-middle to achieve this.
My plan is to start by patching :
handleConnect
,handlePublish
,handleSubscribe
,handleUnsubscribe
)handle
,preConnect
,subscribe
)To be able to write a first working PoC that would create spans and close spans at publish, subscribe and message delivery for QoS 0.
After giving a first try i noticed few obstacles :
handle
) cannot be patched as they are not explicitly assigned toAedes.prototype
So aedes-packet would need to be patched as well, and updated to use a named export.So mqtt-packet needs to be patched at runtime to store and retrieve the serialized context following this proposal
Describe the solution you'd like
handle
andpreConnect
to Aedes.prototype would solve (2) with the cost of assigning extra variables to Aedes instance.Describe alternatives you've considered
Additional context
Add any other context or screenshots about the feature request here.
The text was updated successfully, but these errors were encountered: