Skip to content

Commit

Permalink
feat: patch new aedes.wrapDeliveryFunc to correctly modify delivery…
Browse files Browse the repository at this point in the history
… function
  • Loading branch information
getlarge committed Oct 8, 2023
1 parent d790cad commit 44cbaef
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 56 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
},
"homepage": "https://github.com/moscajs/aedes-otel-instrumentation#readme",
"scripts": {
"test": "node --require @esbuild-kit/cjs-loader --test-reporter=spec --test test/*.spec.ts",
"otel-test": "node --require @esbuild-kit/cjs-loader --test-reporter=spec --test test/*.spec.ts",
"aedes-test": "npm run build && node --require ./test/tracing --test test-aedes/*.js",
"aedes-test-bail": "npm run build && node --require ./test/tracing --test-reporter=@reporters/bail --test-reporter-destination=stderr --test-reporter=spec --test-reporter-destination=stdout --test test-aedes/*.js",
"test": "npm run otel-test && npm run aedes-test",
"lint": "eslint . --ext .ts",
"prettier": "prettier --write .",
"build": "tsc -p tsconfig.build.json",
Expand Down
94 changes: 40 additions & 54 deletions src/aedes-instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,6 @@ export class AedesInstrumentation extends InstrumentationBase {
this.getAedesListenerPatch.bind(this) as any
)
}

if (!this.isWrapped(moduleExports.prototype, 'subscribe')) {
this._wrap(
moduleExports.prototype,
'subscribe',
this.getAedesSubscribePatch.bind(this)
)
}
if (!this.isWrapped(moduleExports.prototype, 'preConnect')) {
this._wrap(
moduleExports.prototype,
Expand Down Expand Up @@ -319,7 +311,13 @@ export class AedesInstrumentation extends InstrumentationBase {
this.getAedesAuthorizeSubscribePatch.bind(this)
)
}

if (!this.isWrapped(moduleExports.prototype, 'wrapDeliveryFunc')) {
this._wrap(
moduleExports.prototype,
'wrapDeliveryFunc',
this.getAedesWrapDeliveryFuncPatch.bind(this)
)
}
return moduleExports
}

Expand All @@ -330,9 +328,6 @@ export class AedesInstrumentation extends InstrumentationBase {
if (isWrapped(moduleExports.prototype.on)) {
this._unwrap(moduleExports.prototype, 'on')
}
if (isWrapped(moduleExports.prototype.subscribe)) {
this._unwrap(moduleExports.prototype, 'subscribe')
}
if (isWrapped(moduleExports.prototype.preConnect)) {
this._unwrap(moduleExports.prototype, 'preConnect')
}
Expand All @@ -345,6 +340,9 @@ export class AedesInstrumentation extends InstrumentationBase {
if (isWrapped(moduleExports.prototype.authorizeSubscribe)) {
this._unwrap(moduleExports.prototype, 'authorizeSubscribe')
}
if (isWrapped(moduleExports.prototype.wrapDeliveryFunc)) {
this._unwrap(moduleExports.prototype, 'wrapDeliveryFunc')
}

return moduleExports
}
Expand Down Expand Up @@ -430,66 +428,65 @@ export class AedesInstrumentation extends InstrumentationBase {
}
}

private getAedesSubscribePatch(original: Aedes['subscribe']) {
private getAedesWrapDeliveryFuncPatch(original: Aedes['wrapDeliveryFunc']) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const instrumentation = this
return function patchedSubscribe(
return function patchedWrapDeliveryFunc(
this: Aedes,
...args: Parameters<Aedes['subscribe']>
...args: Parameters<Aedes['wrapDeliveryFunc']>
) {
const [topic, deliver] = args
const [, func] = args
// still unclear how to set the kind https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/#span-kind
const kind = SpanKind.SERVER
const handleSubscribeCtx = context.active()
const client = handleSubscribeCtx.getValue(CLIENT_CONTEXT_KEY) as
| AedesClient
| undefined
const attributes = {
...(client && client[CONNECTION_ATTRIBUTES]),
...(client && { [AedesAttributes.CLIENT_ID]: client.id }),
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.RECEIVE,
// source attribute is present in semantic conventions but missing in implementation
// @see https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/
'messaging.source': topic,
'messaging.source.kind': MessagingDestinationKindValues.TOPIC,
}
const currentContext = context.active()

function patchedDeliverFunc(
this: unknown, // default to MQEmitter
packet: AedesPublishPacket,
callback: () => void
) {
const currentContext = context.active()
if (packet.messageId) {
attributes[SemanticAttributes.MESSAGING_MESSAGE_ID] =
packet.messageId.toString()
}
attributes[SemanticAttributes.MESSAGING_DESTINATION] = packet.topic
attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND] =
MessagingDestinationKindValues.TOPIC
attributes[SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] =
packet.payload.length.toString()

const parentContext = getContextFromPacket(packet, currentContext)
const startTime = hrTime()
const topic = packet.topic
const attributes = {
...(client && client[CONNECTION_ATTRIBUTES]),
...(client && { [AedesAttributes.CLIENT_ID]: client.id }),
...(packet.messageId && {
[SemanticAttributes.MESSAGING_MESSAGE_ID]:
packet.messageId.toString(),
}),
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.RECEIVE,
[SemanticAttributes.MESSAGING_DESTINATION]: topic,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.TOPIC,
// source attribute is present in semantic conventions but missing in implementation
// @see https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/
'messaging.source': topic,
'messaging.source.kind': MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]:
packet.payload.length.toString(),
}
const span = instrumentation.startSpan(
`${packet.topic} receive`,
`${topic} receive`,
{
kind,
attributes,
},
client,
parentContext
)

const messageContext = trace.setSpan(
parentContext || context.active(),
span
)
instrumentation.callConsumeHook(span, packet)

function wrappedCallback(this: unknown) {
function patchedCallback(this: unknown) {
instrumentation.callConsumeEndHook(span, packet, null)
instrumentation.endSpan(
span,
Expand All @@ -499,24 +496,13 @@ export class AedesInstrumentation extends InstrumentationBase {
getMetricAttributes(attributes)
)
const cb = context.bind(messageContext, callback)
cb.apply(this)
return cb.apply(this)
}

// TODO depending on QoS :
// - span should be ended in this function for QoS 0 and 1
// - span should be ended in packet ack for QoS 2

return context.with(
messageContext,
deliver,
this,
packet,
wrappedCallback
)
return func.call(this, packet, patchedCallback)
}

args[1] = patchedDeliverFunc
return context.with(context.active(), original, this, ...args)
return original.apply(this, args)
}
}

Expand Down

0 comments on commit 44cbaef

Please sign in to comment.