Skip to content

Commit

Permalink
Bump otel instrumentation to 2.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
brunobat committed Jul 3, 2024
1 parent 198a76d commit 50e205a
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 61 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@

<kafka.version>3.7.1</kafka.version>

<opentelemetry.version>1.32.0-alpha</opentelemetry.version>
<opentelemetry-semconv.version>1.21.0-alpha</opentelemetry-semconv.version>
<opentelemetry.instrumentation.version>2.5.0-alpha</opentelemetry.instrumentation.version>
<opentelemetry-semconv.version>1.25.0-alpha</opentelemetry-semconv.version>

<smallrye-vertx-mutiny-clients.version>3.13.0</smallrye-vertx-mutiny-clients.version>
<smallrye-reactive-converters.version>3.0.0</smallrye-reactive-converters.version>
Expand Down Expand Up @@ -282,7 +282,7 @@
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-bom-alpha</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.instrumentation.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.smallrye.reactive.messaging.amqp.tracing;

import java.util.Collections;
import java.util.List;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;

public class AmqpAttributesExtractor implements AttributesExtractor<AmqpMessage<?>, Void> {
Expand Down Expand Up @@ -61,21 +64,44 @@ public String getConversationId(final AmqpMessage<?> amqpMessage) {

// Recommended
@Override
public Long getMessagePayloadSize(final AmqpMessage<?> amqpMessage) {
public String getMessageId(final AmqpMessage<?> amqpMessage, final Void unused) {
Object messageId = amqpMessage.getMessageId();
return messageId instanceof String ? (String) messageId : null;
}

@Override
public List<String> getMessageHeader(AmqpMessage<?> amqpMessage, String name) {
return Collections.emptyList();
}

@Override
public String getDestinationTemplate(AmqpMessage<?> amqpMessage) {
return null;
}

// Recommended
@Override
public Long getMessagePayloadCompressedSize(final AmqpMessage<?> amqpMessage) {
public boolean isAnonymousDestination(AmqpMessage<?> amqpMessage) {
return false;
}

@Override
public Long getMessageBodySize(AmqpMessage<?> amqpMessage) {
return null;
}

// Recommended
@Override
public String getMessageId(final AmqpMessage<?> amqpMessage, final Void unused) {
Object messageId = amqpMessage.getMessageId();
return messageId instanceof String ? (String) messageId : null;
public Long getMessageEnvelopeSize(AmqpMessage<?> amqpMessage) {
return null;
}

@Override
public String getClientId(AmqpMessage<?> amqpMessage) {
return null;
}

@Override
public Long getBatchMessageCount(AmqpMessage<?> amqpMessage, Void unused) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;
import io.smallrye.reactive.messaging.tracing.TracingUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;

import java.util.Collections;
import java.util.List;

public class ${connectorPrefix}AttributesExtractor implements AttributesExtractor<${connectorPrefix}Trace, Void> {
private final MessagingAttributesGetter<${connectorPrefix}Trace, Void> messagingAttributesGetter;
Expand Down Expand Up @@ -53,17 +56,42 @@ public String getConversationId(final ${connectorPrefix}Trace myTrace) {
}

@Override
public Long getMessagePayloadSize(final ${connectorPrefix}Trace myTrace) {
public String getMessageId(final ${connectorPrefix}Trace myTrace, final Void unused) {
return null;
}

@Override
public Long getMessagePayloadCompressedSize(final ${connectorPrefix}Trace myTrace) {
public List<String> getMessageHeader(final ${connectorPrefix}Trace myTrace, final String name) {
return Collections.emptyList();
}

@Override
public String getDestinationTemplate(final ${connectorPrefix}Trace myTrace) {
return null;
}

@Override
public String getMessageId(final ${connectorPrefix}Trace myTrace, final Void unused) {
public boolean isAnonymousDestination(final ${connectorPrefix}Trace myTrace) {
return false;
}

@Override
public Long getMessageBodySize(final ${connectorPrefix}Trace myTrace) {
return null;
}

@Override
public Long getMessageEnvelopeSize(final ${connectorPrefix}Trace myTrace) {
return null;
}

@Override
public String getClientId(final ${connectorPrefix}Trace myTrace) {
return null;
}

@Override
public Long getBatchMessageCount(final ${connectorPrefix}Trace myTrace, final Void unused) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.smallrye.reactive.messaging.tracing.TracingUtils;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package io.smallrye.reactive.messaging.kafka.tracing;

import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_CONSUMER_ID;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_PARTITION;

import java.util.Collections;
import java.util.List;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;

public class KafkaAttributesExtractor implements AttributesExtractor<KafkaTrace, Void> {
private final MessagingAttributesGetter<KafkaTrace, Void> messagingAttributesGetter;
Expand Down Expand Up @@ -39,9 +41,6 @@ public void onStart(final AttributesBuilder attributes, final Context parentCont
if (groupId != null) {
attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, groupId);
}
if (clientId != null) {
attributes.put(MESSAGING_KAFKA_CLIENT_ID, clientId);
}
}

@Override
Expand Down Expand Up @@ -80,17 +79,45 @@ public String getConversationId(final KafkaTrace kafkaTrace) {
}

@Override
public Long getMessagePayloadSize(final KafkaTrace kafkaTrace) {
public String getMessageId(final KafkaTrace kafkaTrace, final Void unused) {
return null;
}

@Override
public Long getMessagePayloadCompressedSize(final KafkaTrace kafkaTrace) {
public List<String> getMessageHeader(KafkaTrace kafkaTrace, String name) {
return Collections.emptyList();
}

@Override
public String getDestinationTemplate(KafkaTrace kafkaTrace) {
return null;
}

@Override
public String getMessageId(final KafkaTrace kafkaTrace, final Void unused) {
public boolean isAnonymousDestination(KafkaTrace kafkaTrace) {
return false;
}

@Override
public Long getMessageBodySize(KafkaTrace kafkaTrace) {
return null;
}

@Override
public Long getMessageEnvelopeSize(KafkaTrace kafkaTrace) {
return null;
}

@Override
public String getClientId(KafkaTrace kafkaTrace) {
if (kafkaTrace.getClientId() == null) {
return null;
}
return kafkaTrace.getClientId();
}

@Override
public Long getBatchMessageCount(KafkaTrace kafkaTrace, Void unused) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.reactive.messaging.tracing.TracingUtils;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.reactive.messaging.kafka.tracing;

import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_CLIENT_ID;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_SYSTEM;
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testFromAppToKafka() {
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION));
assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_CLIENT_ID));
assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));

assertEquals(topic + " publish", span.getName());
Expand Down Expand Up @@ -155,7 +155,8 @@ public void testFromAppToKafkaWithStructuredCloudEvents() {
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION));
assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION));
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_CLIENT_ID));
assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(topic + " publish", span.getName());
});
Expand Down Expand Up @@ -192,7 +193,8 @@ public void testFromAppToKafkaWithBinaryCloudEvents() {
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION));
assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION));
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_CLIENT_ID));
assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(topic + " publish", span.getName());
});
Expand Down Expand Up @@ -238,7 +240,7 @@ public void testFromKafkaToAppToKafka() {
assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-consumer-source", consumer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals("kafka-consumer-source", consumer.getAttributes().get(MESSAGING_CLIENT_ID));
assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(parentTopic + " receive", consumer.getName());

Expand All @@ -249,7 +251,8 @@ public void testFromKafkaToAppToKafka() {
assertEquals("kafka", producer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION));
assertEquals(resultTopic, producer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-producer-kafka", producer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION));
assertEquals("kafka-producer-kafka", producer.getAttributes().get(MESSAGING_CLIENT_ID));
assertEquals(0, producer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(resultTopic + " publish", producer.getName());
});
Expand Down Expand Up @@ -303,7 +306,7 @@ public void testFromKafkaToAppWithParentSpan() {
assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_CLIENT_ID));
assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(parentTopic + " receive", consumer.getName());
});
Expand Down
2 changes: 1 addition & 1 deletion smallrye-reactive-messaging-otel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<artifactId>opentelemetry-instrumentation-api-incubator</artifactId>
<exclusions>
<exclusion>
<groupId>io.opentelemetry.semconv</groupId>
Expand Down
Loading

0 comments on commit 50e205a

Please sign in to comment.