From 73754218b67d92b20e16c9de34ecc0e720ab2959 Mon Sep 17 00:00:00 2001 From: brunobat Date: Wed, 3 Jul 2024 11:20:41 +0100 Subject: [PATCH] Bump otel instrumentation to 2.5.0 --- .../tracing/MyAttributesExtractor.java | 41 +++++++++++++++++- .../tracing/MyOpenTelemetryInstrumenter.java | 8 ++-- pom.xml | 6 +-- .../amqp/tracing/AmqpAttributesExtractor.java | 42 ++++++++++++++---- .../AmqpOpenTelemetryInstrumenter.java | 8 ++-- ..._connectorPrefix__AttributesExtractor.java | 36 ++++++++++++++-- ...ctorPrefix__OpenTelemetryInstrumenter.java | 8 ++-- .../tracing/KafkaAttributesExtractor.java | 43 +++++++++++++++---- .../KafkaOpenTelemetryInstrumenter.java | 8 ++-- .../kafka/tracing/TracingPropagationTest.java | 17 +++++--- smallrye-reactive-messaging-otel/pom.xml | 2 +- .../tracing/PulsarAttributesExtractor.java | 38 +++++++++++++--- .../PulsarOpenTelemetryInstrumenter.java | 8 ++-- .../RabbitMQOpenTelemetryInstrumenter.java | 8 ++-- .../RabbitMQTraceAttributesExtractor.java | 36 ++++++++++++++-- 15 files changed, 244 insertions(+), 65 deletions(-) diff --git a/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java b/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java index c9d76a1cea..a9cdc896fa 100644 --- a/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java +++ b/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java @@ -1,9 +1,12 @@ package connectors.tracing; +import java.util.Collections; +import java.util.List; + import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; public class MyAttributesExtractor implements AttributesExtractor { private final MessagingAttributesGetter messagingAttributesGetter; @@ -66,5 +69,41 @@ public Long getMessagePayloadCompressedSize(final MyTrace myTrace) { public String getMessageId(final MyTrace myTrace, final Void unused) { return null; } + + @Override + public List getMessageHeader(MyTrace myTrace, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(MyTrace myTrace) { + return null; + } + + @Override + public boolean isAnonymousDestination(MyTrace myTrace) { + return false; + } + + @Override + public Long getMessageBodySize(MyTrace myTrace) { + return null; + } + + @Override + public Long getMessageEnvelopeSize(MyTrace myTrace) { + return null; + } + + @Override + public String getClientId(MyTrace myTrace) { + return null; + } + + @Override + public Long getBatchMessageCount(MyTrace myTrace, Void unused) { + return null; + } + } } diff --git a/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java b/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java index 99d320ee68..f2b2bde5dd 100644 --- a/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java +++ b/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java @@ -3,12 +3,12 @@ import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.reactive.messaging.tracing.TracingUtils; /** diff --git a/pom.xml b/pom.xml index 4b834db429..68e627373d 100644 --- a/pom.xml +++ b/pom.xml @@ -90,8 +90,8 @@ 3.7.1 - 1.32.0-alpha - 1.21.0-alpha + 2.5.0-alpha + 1.25.0-alpha 3.13.0 3.0.0 @@ -282,7 +282,7 @@ io.opentelemetry.instrumentation opentelemetry-instrumentation-bom-alpha - ${opentelemetry.version} + ${opentelemetry.instrumentation.version} pom import diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpAttributesExtractor.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpAttributesExtractor.java index 9264180466..987007ed7a 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpAttributesExtractor.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpAttributesExtractor.java @@ -1,9 +1,12 @@ package io.smallrye.reactive.messaging.amqp.tracing; +import java.util.Collections; +import java.util.List; + import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.smallrye.reactive.messaging.amqp.AmqpMessage; public class AmqpAttributesExtractor implements AttributesExtractor, Void> { @@ -61,21 +64,44 @@ public String getConversationId(final AmqpMessage amqpMessage) { // Recommended @Override - public Long getMessagePayloadSize(final AmqpMessage amqpMessage) { + public String getMessageId(final AmqpMessage amqpMessage, final Void unused) { + Object messageId = amqpMessage.getMessageId(); + return messageId instanceof String ? (String) messageId : null; + } + + @Override + public List getMessageHeader(AmqpMessage amqpMessage, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(AmqpMessage amqpMessage) { return null; } - // Recommended @Override - public Long getMessagePayloadCompressedSize(final AmqpMessage amqpMessage) { + public boolean isAnonymousDestination(AmqpMessage amqpMessage) { + return false; + } + + @Override + public Long getMessageBodySize(AmqpMessage amqpMessage) { return null; } - // Recommended @Override - public String getMessageId(final AmqpMessage amqpMessage, final Void unused) { - Object messageId = amqpMessage.getMessageId(); - return messageId instanceof String ? (String) messageId : null; + public Long getMessageEnvelopeSize(AmqpMessage amqpMessage) { + return null; + } + + @Override + public String getClientId(AmqpMessage amqpMessage) { + return null; + } + + @Override + public Long getBatchMessageCount(AmqpMessage amqpMessage, Void unused) { + return null; } } } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java index 49a1b1f542..8d4cbd6dd9 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java @@ -5,12 +5,12 @@ import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.reactive.messaging.amqp.AmqpMessage; import io.smallrye.reactive.messaging.tracing.TracingUtils; diff --git a/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__AttributesExtractor.java b/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__AttributesExtractor.java index ab5e5ef9e5..b213bf1569 100644 --- a/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__AttributesExtractor.java +++ b/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__AttributesExtractor.java @@ -3,7 +3,10 @@ import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; + +import java.util.Collections; +import java.util.List; public class ${connectorPrefix}AttributesExtractor implements AttributesExtractor<${connectorPrefix}Trace, Void> { private final MessagingAttributesGetter<${connectorPrefix}Trace, Void> messagingAttributesGetter; @@ -53,17 +56,42 @@ public String getConversationId(final ${connectorPrefix}Trace myTrace) { } @Override - public Long getMessagePayloadSize(final ${connectorPrefix}Trace myTrace) { + public String getMessageId(final ${connectorPrefix}Trace myTrace, final Void unused) { return null; } @Override - public Long getMessagePayloadCompressedSize(final ${connectorPrefix}Trace myTrace) { + public List getMessageHeader(final ${connectorPrefix}Trace myTrace, final String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(final ${connectorPrefix}Trace myTrace) { return null; } @Override - public String getMessageId(final ${connectorPrefix}Trace myTrace, final Void unused) { + public boolean isAnonymousDestination(final ${connectorPrefix}Trace myTrace) { + return false; + } + + @Override + public Long getMessageBodySize(final ${connectorPrefix}Trace myTrace) { + return null; + } + + @Override + public Long getMessageEnvelopeSize(final ${connectorPrefix}Trace myTrace) { + return null; + } + + @Override + public String getClientId(final ${connectorPrefix}Trace myTrace) { + return null; + } + + @Override + public Long getBatchMessageCount(final ${connectorPrefix}Trace myTrace, final Void unused) { return null; } } diff --git a/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__OpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__OpenTelemetryInstrumenter.java index 51bafcd696..7ed0df32e2 100644 --- a/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__OpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/tracing/__connectorPrefix__OpenTelemetryInstrumenter.java @@ -5,10 +5,10 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.smallrye.reactive.messaging.tracing.TracingUtils; /** diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaAttributesExtractor.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaAttributesExtractor.java index 631d449987..d93449268f 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaAttributesExtractor.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaAttributesExtractor.java @@ -1,15 +1,17 @@ package io.smallrye.reactive.messaging.kafka.tracing; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_CONSUMER_ID; -import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_PARTITION; +import java.util.Collections; +import java.util.List; + import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; public class KafkaAttributesExtractor implements AttributesExtractor { private final MessagingAttributesGetter messagingAttributesGetter; @@ -39,9 +41,6 @@ public void onStart(final AttributesBuilder attributes, final Context parentCont if (groupId != null) { attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, groupId); } - if (clientId != null) { - attributes.put(MESSAGING_KAFKA_CLIENT_ID, clientId); - } } @Override @@ -80,17 +79,45 @@ public String getConversationId(final KafkaTrace kafkaTrace) { } @Override - public Long getMessagePayloadSize(final KafkaTrace kafkaTrace) { + public String getMessageId(final KafkaTrace kafkaTrace, final Void unused) { return null; } @Override - public Long getMessagePayloadCompressedSize(final KafkaTrace kafkaTrace) { + public List getMessageHeader(KafkaTrace kafkaTrace, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(KafkaTrace kafkaTrace) { return null; } @Override - public String getMessageId(final KafkaTrace kafkaTrace, final Void unused) { + public boolean isAnonymousDestination(KafkaTrace kafkaTrace) { + return false; + } + + @Override + public Long getMessageBodySize(KafkaTrace kafkaTrace) { + return null; + } + + @Override + public Long getMessageEnvelopeSize(KafkaTrace kafkaTrace) { + return null; + } + + @Override + public String getClientId(KafkaTrace kafkaTrace) { + if (kafkaTrace.getClientId() == null) { + return null; + } + return kafkaTrace.getClientId(); + } + + @Override + public Long getBatchMessageCount(KafkaTrace kafkaTrace, Void unused) { return null; } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java index 4f64fbf397..d91112762f 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java @@ -5,12 +5,12 @@ import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.reactive.messaging.tracing.TracingUtils; /** diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/tracing/TracingPropagationTest.java index 5173f04675..e4bec10fe2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/tracing/TracingPropagationTest.java @@ -1,7 +1,7 @@ package io.smallrye.reactive.messaging.kafka.tracing; +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_CLIENT_ID; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_DESTINATION_NAME; -import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_SYSTEM; @@ -117,7 +117,7 @@ public void testFromAppToKafka() { assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM)); assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID)); + assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_CLIENT_ID)); assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(topic + " publish", span.getName()); @@ -155,7 +155,8 @@ public void testFromAppToKafkaWithStructuredCloudEvents() { assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM)); assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID)); + assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_CLIENT_ID)); assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(topic + " publish", span.getName()); }); @@ -192,7 +193,8 @@ public void testFromAppToKafkaWithBinaryCloudEvents() { assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM)); assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID)); + assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_CLIENT_ID)); assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(topic + " publish", span.getName()); }); @@ -238,7 +240,7 @@ public void testFromKafkaToAppToKafka() { assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM)); assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("kafka-consumer-source", consumer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID)); + assertEquals("kafka-consumer-source", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(parentTopic + " receive", consumer.getName()); @@ -249,7 +251,8 @@ public void testFromKafkaToAppToKafka() { assertEquals("kafka", producer.getAttributes().get(MESSAGING_SYSTEM)); assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION)); assertEquals(resultTopic, producer.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("kafka-producer-kafka", producer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID)); + assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("kafka-producer-kafka", producer.getAttributes().get(MESSAGING_CLIENT_ID)); assertEquals(0, producer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(resultTopic + " publish", producer.getName()); }); @@ -303,7 +306,7 @@ public void testFromKafkaToAppWithParentSpan() { assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM)); assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID)); + assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(parentTopic + " receive", consumer.getName()); }); diff --git a/smallrye-reactive-messaging-otel/pom.xml b/smallrye-reactive-messaging-otel/pom.xml index 6cb9b8af19..3b6e9c3542 100644 --- a/smallrye-reactive-messaging-otel/pom.xml +++ b/smallrye-reactive-messaging-otel/pom.xml @@ -37,7 +37,7 @@ io.opentelemetry.instrumentation - opentelemetry-instrumentation-api-semconv + opentelemetry-instrumentation-api-incubator io.opentelemetry.semconv diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarAttributesExtractor.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarAttributesExtractor.java index bf9510795e..06ddfd4ed4 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarAttributesExtractor.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarAttributesExtractor.java @@ -2,10 +2,13 @@ import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_CONSUMER_ID; +import java.util.Collections; +import java.util.List; + import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; public class PulsarAttributesExtractor implements AttributesExtractor { private final MessagingAttributesGetter messagingAttributesGetter; @@ -56,18 +59,43 @@ public String getConversationId(PulsarTrace pulsarTrace) { } @Override - public Long getMessagePayloadSize(PulsarTrace pulsarTrace) { + public String getMessageId(PulsarTrace pulsarTrace, Void unused) { + return pulsarTrace.getMessageId(); + } + + @Override + public List getMessageHeader(PulsarTrace pulsarTrace, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(PulsarTrace pulsarTrace) { + return null; + } + + @Override + public boolean isAnonymousDestination(PulsarTrace pulsarTrace) { + return false; + } + + @Override + public Long getMessageBodySize(PulsarTrace pulsarTrace) { return pulsarTrace.getUncompressedPayloadSize(); } @Override - public Long getMessagePayloadCompressedSize(PulsarTrace pulsarTrace) { + public Long getMessageEnvelopeSize(PulsarTrace pulsarTrace) { return null; } @Override - public String getMessageId(PulsarTrace pulsarTrace, Void unused) { - return pulsarTrace.getMessageId(); + public String getClientId(PulsarTrace pulsarTrace) { + return null; + } + + @Override + public Long getBatchMessageCount(PulsarTrace pulsarTrace, Void unused) { + return null; } } } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java index 9c4df7dd5a..db8502d560 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java @@ -5,12 +5,12 @@ import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.reactive.messaging.tracing.TracingUtils; public class PulsarOpenTelemetryInstrumenter { diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java index a646a7bb6e..3f8a836cbf 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java @@ -5,12 +5,12 @@ import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.reactive.messaging.tracing.TracingUtils; public class RabbitMQOpenTelemetryInstrumenter { diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceAttributesExtractor.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceAttributesExtractor.java index 26d1db474e..d6cf9f3a1a 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceAttributesExtractor.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceAttributesExtractor.java @@ -2,10 +2,13 @@ import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY; +import java.util.Collections; +import java.util.List; + import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; public class RabbitMQTraceAttributesExtractor implements AttributesExtractor { private final MessagingAttributesGetter messagingAttributesGetter; @@ -54,17 +57,42 @@ public String getConversationId(final RabbitMQTrace rabbitMQTrace) { } @Override - public Long getMessagePayloadSize(final RabbitMQTrace rabbitMQTrace) { + public String getMessageId(final RabbitMQTrace rabbitMQTrace, final Void unused) { return null; } @Override - public Long getMessagePayloadCompressedSize(final RabbitMQTrace rabbitMQTrace) { + public List getMessageHeader(RabbitMQTrace rabbitMQTrace, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(RabbitMQTrace rabbitMQTrace) { return null; } @Override - public String getMessageId(final RabbitMQTrace rabbitMQTrace, final Void unused) { + public boolean isAnonymousDestination(RabbitMQTrace rabbitMQTrace) { + return false; + } + + @Override + public Long getMessageBodySize(RabbitMQTrace rabbitMQTrace) { + return null; + } + + @Override + public Long getMessageEnvelopeSize(RabbitMQTrace rabbitMQTrace) { + return null; + } + + @Override + public String getClientId(RabbitMQTrace rabbitMQTrace) { + return null; + } + + @Override + public Long getBatchMessageCount(RabbitMQTrace rabbitMQTrace, Void unused) { return null; } }