From 44cbaef2cdb832c601310903e1cb2ac6565a4916 Mon Sep 17 00:00:00 2001 From: getlarge Date: Sun, 8 Oct 2023 09:03:04 +0200 Subject: [PATCH] feat: patch new `aedes.wrapDeliveryFunc` to correctly modify delivery function --- package.json | 4 +- src/aedes-instrumentation.ts | 94 +++++++++++++++--------------------- 2 files changed, 42 insertions(+), 56 deletions(-) diff --git a/package.json b/package.json index 890cae6..1661cf6 100644 --- a/package.json +++ b/package.json @@ -31,9 +31,9 @@ }, "homepage": "https://github.com/moscajs/aedes-otel-instrumentation#readme", "scripts": { - "test": "node --require @esbuild-kit/cjs-loader --test-reporter=spec --test test/*.spec.ts", + "otel-test": "node --require @esbuild-kit/cjs-loader --test-reporter=spec --test test/*.spec.ts", "aedes-test": "npm run build && node --require ./test/tracing --test test-aedes/*.js", - "aedes-test-bail": "npm run build && node --require ./test/tracing --test-reporter=@reporters/bail --test-reporter-destination=stderr --test-reporter=spec --test-reporter-destination=stdout --test test-aedes/*.js", + "test": "npm run otel-test && npm run aedes-test", "lint": "eslint . --ext .ts", "prettier": "prettier --write .", "build": "tsc -p tsconfig.build.json", diff --git a/src/aedes-instrumentation.ts b/src/aedes-instrumentation.ts index f33269b..9b133c7 100644 --- a/src/aedes-instrumentation.ts +++ b/src/aedes-instrumentation.ts @@ -283,14 +283,6 @@ export class AedesInstrumentation extends InstrumentationBase { this.getAedesListenerPatch.bind(this) as any ) } - - if (!this.isWrapped(moduleExports.prototype, 'subscribe')) { - this._wrap( - moduleExports.prototype, - 'subscribe', - this.getAedesSubscribePatch.bind(this) - ) - } if (!this.isWrapped(moduleExports.prototype, 'preConnect')) { this._wrap( moduleExports.prototype, @@ -319,7 +311,13 @@ export class AedesInstrumentation extends InstrumentationBase { this.getAedesAuthorizeSubscribePatch.bind(this) ) } - + if (!this.isWrapped(moduleExports.prototype, 'wrapDeliveryFunc')) { + this._wrap( + moduleExports.prototype, + 'wrapDeliveryFunc', + this.getAedesWrapDeliveryFuncPatch.bind(this) + ) + } return moduleExports } @@ -330,9 +328,6 @@ export class AedesInstrumentation extends InstrumentationBase { if (isWrapped(moduleExports.prototype.on)) { this._unwrap(moduleExports.prototype, 'on') } - if (isWrapped(moduleExports.prototype.subscribe)) { - this._unwrap(moduleExports.prototype, 'subscribe') - } if (isWrapped(moduleExports.prototype.preConnect)) { this._unwrap(moduleExports.prototype, 'preConnect') } @@ -345,6 +340,9 @@ export class AedesInstrumentation extends InstrumentationBase { if (isWrapped(moduleExports.prototype.authorizeSubscribe)) { this._unwrap(moduleExports.prototype, 'authorizeSubscribe') } + if (isWrapped(moduleExports.prototype.wrapDeliveryFunc)) { + this._unwrap(moduleExports.prototype, 'wrapDeliveryFunc') + } return moduleExports } @@ -430,51 +428,51 @@ export class AedesInstrumentation extends InstrumentationBase { } } - private getAedesSubscribePatch(original: Aedes['subscribe']) { + private getAedesWrapDeliveryFuncPatch(original: Aedes['wrapDeliveryFunc']) { // eslint-disable-next-line @typescript-eslint/no-this-alias const instrumentation = this - return function patchedSubscribe( + return function patchedWrapDeliveryFunc( this: Aedes, - ...args: Parameters + ...args: Parameters ) { - const [topic, deliver] = args + const [, func] = args // still unclear how to set the kind https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/#span-kind const kind = SpanKind.SERVER const handleSubscribeCtx = context.active() const client = handleSubscribeCtx.getValue(CLIENT_CONTEXT_KEY) as | AedesClient | undefined - const attributes = { - ...(client && client[CONNECTION_ATTRIBUTES]), - ...(client && { [AedesAttributes.CLIENT_ID]: client.id }), - [SemanticAttributes.MESSAGING_OPERATION]: - MessagingOperationValues.RECEIVE, - // source attribute is present in semantic conventions but missing in implementation - // @see https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/ - 'messaging.source': topic, - 'messaging.source.kind': MessagingDestinationKindValues.TOPIC, - } + const currentContext = context.active() function patchedDeliverFunc( this: unknown, // default to MQEmitter packet: AedesPublishPacket, callback: () => void ) { - const currentContext = context.active() - if (packet.messageId) { - attributes[SemanticAttributes.MESSAGING_MESSAGE_ID] = - packet.messageId.toString() - } - attributes[SemanticAttributes.MESSAGING_DESTINATION] = packet.topic - attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND] = - MessagingDestinationKindValues.TOPIC - attributes[SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] = - packet.payload.length.toString() - const parentContext = getContextFromPacket(packet, currentContext) const startTime = hrTime() + const topic = packet.topic + const attributes = { + ...(client && client[CONNECTION_ATTRIBUTES]), + ...(client && { [AedesAttributes.CLIENT_ID]: client.id }), + ...(packet.messageId && { + [SemanticAttributes.MESSAGING_MESSAGE_ID]: + packet.messageId.toString(), + }), + [SemanticAttributes.MESSAGING_OPERATION]: + MessagingOperationValues.RECEIVE, + [SemanticAttributes.MESSAGING_DESTINATION]: topic, + [SemanticAttributes.MESSAGING_DESTINATION_KIND]: + MessagingDestinationKindValues.TOPIC, + // source attribute is present in semantic conventions but missing in implementation + // @see https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/ + 'messaging.source': topic, + 'messaging.source.kind': MessagingDestinationKindValues.TOPIC, + [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: + packet.payload.length.toString(), + } const span = instrumentation.startSpan( - `${packet.topic} receive`, + `${topic} receive`, { kind, attributes, @@ -482,14 +480,13 @@ export class AedesInstrumentation extends InstrumentationBase { client, parentContext ) - const messageContext = trace.setSpan( parentContext || context.active(), span ) instrumentation.callConsumeHook(span, packet) - function wrappedCallback(this: unknown) { + function patchedCallback(this: unknown) { instrumentation.callConsumeEndHook(span, packet, null) instrumentation.endSpan( span, @@ -499,24 +496,13 @@ export class AedesInstrumentation extends InstrumentationBase { getMetricAttributes(attributes) ) const cb = context.bind(messageContext, callback) - cb.apply(this) + return cb.apply(this) } - // TODO depending on QoS : - // - span should be ended in this function for QoS 0 and 1 - // - span should be ended in packet ack for QoS 2 - - return context.with( - messageContext, - deliver, - this, - packet, - wrappedCallback - ) + return func.call(this, packet, patchedCallback) } - args[1] = patchedDeliverFunc - return context.with(context.active(), original, this, ...args) + return original.apply(this, args) } }