Skip to content

Commit

Permalink
Fixed tracing of outgoing messages (#243)
Browse files Browse the repository at this point in the history
* Fixed tracing outgoing messages
* Bumped io.nats:jnats from 2.20.4 to 2.20.5
  • Loading branch information
kjeldpaw authored Dec 6, 2024
1 parent 7756b55 commit 72cd003
Show file tree
Hide file tree
Showing 74 changed files with 1,096 additions and 900 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.17.1"
current-version: "3.17.3"
next-version: "3.18.0-SNAPSHOT"

Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamBuildConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamConnector;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamRecorder;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapperImpl;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultPayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapperImpl;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -53,13 +54,14 @@ void initializeSecureRandomRelatedClassesAtRuntime(
@BuildStep
void createNatsConnector(BuildProducer<AdditionalBeanBuildItem> buildProducer) {
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamConnector.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamInstrument.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(TracerFactory.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ExecutionHolder.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultConnectionFactory.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultPayloadMapper.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultMessageMapper.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConsumerMapperImpl.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(StreamStateMapperImpl.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(Context.class));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class DataCollectorBean implements MessageConsumer<Data> {
private final static Logger logger = Logger.getLogger(DataCollectorBean.class);

private final AtomicReference<Data> lastData = new AtomicReference<>();

@Incoming("data-collector")
public Uni<Void> data(Message<Data> message) {
return Uni.createFrom().item(message)
.onItem().invoke(m -> logger.infof("Received message: %s", message))
.onItem().transformToUni(this::setLast)
.onItem().transformToUni(this::acknowledge)
.onFailure().recoverWithUni(throwable -> notAcknowledge(message, throwable));
}

private Uni<Message<Data>> setLast(Message<Data> message) {
return Uni.createFrom().item(() -> {
lastData.set(message.getPayload());
return message;
});
}

public Optional<Data> getLast() {
return Optional.ofNullable(lastData.get());
}

}
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.mutiny.tuples.Tuple2;

@ApplicationScoped
public class DataConsumingBean {
public class DataConsumingBean implements MessageConsumer<String> {
private final static Logger logger = Logger.getLogger(DataConsumingBean.class);

private final AtomicReference<Data> lastData = new AtomicReference<>();
private final Emitter<Data> dataEmitter;

public DataConsumingBean(@Channel("data-emitter") Emitter<Data> dataEmitter) {
this.dataEmitter = dataEmitter;
}

@Blocking
@Incoming("data-consumer")
public Uni<Void> data(Message<String> message) {
return Uni.createFrom().item(message)
.onItem().invoke(m -> {
logger.infof("Received message: %s", message);
message.getMetadata(JetStreamIncomingMessageMetadata.class)
.ifPresent(metadata -> lastData.set(
new Data(message.getPayload(), metadata.headers().get("RESOURCE_ID").get(0),
metadata.messageId())));
})
.onItem().transformToUni(m -> Uni.createFrom().completionStage(m.ack()))
.onFailure().recoverWithUni(throwable -> Uni.createFrom().completionStage(message.nack(throwable)));
.onItem().invoke(m -> logger.infof("Received message: %s", message))
.onItem().transformToUni(this::publish)
.onItem().transformToUni(this::acknowledge)
.onFailure().recoverWithUni(throwable -> notAcknowledge(message, throwable));
}

public Optional<Data> getLast() {
return Optional.ofNullable(lastData.get());
private Uni<Message<String>> publish(Message<String> message) {
try {
return Uni.createFrom()
.item(() -> message.getMetadata(PublishMessageMetadata.class)
.map(metadata -> Tuple2.of(metadata.headers().get("RESOURCE_ID").get(0), metadata.messageId()))
.orElse(Tuple2.of(null, null)))
.onItem()
.transformToUni(tuple -> Uni.createFrom()
.completionStage(
dataEmitter.send(new Data(message.getPayload(), tuple.getItem1(), tuple.getItem2()))))
.onItem().transform(ignore -> message);
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata;
import io.smallrye.mutiny.Uni;

@Path("/data")
@Produces("application/json")
public class DataResource {

@Inject
DataConsumingBean bean;
DataCollectorBean bean;

@Channel("data")
Emitter<String> emitter;
Expand All @@ -40,7 +40,7 @@ private Uni<Message<String>> emitData(String id, String data) {
return Uni.createFrom().item(() -> {
final var headers = new HashMap<String, List<String>>();
headers.put("RESOURCE_ID", List.of(data));
final var message = Message.of(data, Metadata.of(JetStreamOutgoingMessageMetadata.of(id, headers, null)));
final var message = Message.of(data, Metadata.of(SubscribeMessageMetadata.of(id, headers)));
emitter.send(message);
return message;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import io.nats.client.api.ReplayPolicy;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

Expand All @@ -44,6 +46,12 @@ public class FetchMessagesTest {
@Inject
ConnectionFactory connectionFactory;

@Inject
Context context;

@Inject
TracerFactory tracerFactory;

@BeforeEach
public void setup() throws Exception {
try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
Expand Down Expand Up @@ -158,7 +166,10 @@ private void publish(Data data, String subject) throws Exception {
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {
final var publishConfiguragtion = createPublishConfiguration(subject);
final var fetchConsumerConfiguration = createFetchConsumerConfiguration(subject);
connection.publish(Message.of(data), publishConfiguragtion, fetchConsumerConfiguration).await()
context.withContext(
ctx -> connection.publish(Message.of(data), publishConfiguragtion, fetchConsumerConfiguration,
tracerFactory.create(), ctx))
.await()
.atMost(Duration.ofSeconds(30));
}
}
Expand All @@ -167,7 +178,9 @@ private Data fetch(String subject, boolean ack) throws Exception {
try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {
final var fetchConsumerConfiguration = createFetchConsumerConfiguration(subject);
final var received = connection.nextMessage(fetchConsumerConfiguration).await().atMost(Duration.ofSeconds(30));
final var received = context
.withContext(ctx -> connection.nextMessage(fetchConsumerConfiguration, tracerFactory.create(), ctx)).await()
.atMost(Duration.ofSeconds(30));
if (ack) {
Uni.createFrom().completionStage(received.ack()).await().atMost(Duration.ofSeconds(30));
} else {
Expand All @@ -184,11 +197,6 @@ public Optional<Duration> fetchTimeout() {
return Optional.of(Duration.ofSeconds(10));
}

@Override
public boolean traceEnabled() {
return false;
}

@Override
public boolean exponentialBackoff() {
return false;
Expand Down Expand Up @@ -313,10 +321,6 @@ public Optional<Class<Data>> payloadType() {

private PublishConfiguration createPublishConfiguration(String subject) {
return new PublishConfiguration() {
@Override
public boolean traceEnabled() {
return false;
}

@Override
public String stream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class HealthTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class))
.addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class,
DataCollectorBean.class, MessageConsumer.class))
.withConfigurationResource("application-health.properties");

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;

public interface MessageConsumer<T> {

default Uni<Void> acknowledge(Message<T> message) {
return Uni.createFrom().completionStage(message.ack());
}

default Uni<Void> notAcknowledge(Message<T> message, Throwable throwable) {
return Uni.createFrom().completionStage(message.nack(throwable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ReactiveMesssagingNatsJetstreamDevModeTest {
TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class,
Advisory.class, DeadLetterResource.class, DeadLetterConsumingBean.class,
DurableResource.class, DurableConsumingBean.class, RedeliveryResource.class,
RedeliveryConsumingBean.class)
RedeliveryConsumingBean.class, DataCollectorBean.class, MessageConsumer.class)
.addAsResource("application.properties"));

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public class ReactiveMesssagingNatsJetstreamPullTest {
TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class,
Advisory.class, DeadLetterResource.class, DeadLetterConsumingBean.class,
DurableResource.class, DurableConsumingBean.class, RedeliveryResource.class,
RedeliveryConsumingBean.class, SubtopicsResource.class, SubtopicsConsumingBean.class,
SubjectData.class))
RedeliveryConsumingBean.class, DataCollectorBean.class, MessageConsumer.class))
.withConfigurationResource("application.properties");

@BeforeEach
Expand Down Expand Up @@ -102,22 +101,4 @@ void redelivery() {
return value.equalsIgnoreCase("42");
});
}

@Test
public void subtopic() {
final var messageId = "9a99811a-ef82-468e-9f0b-7879f7be16a9";
final var data = "N6cXzM";
final var subject = "subtopic";
final var subtopic = "data";

given().pathParam("subtopic", subtopic).pathParam("id", messageId).pathParam("data", data)
.post("/subtopics/{subtopic}/{id}/{data}").then().statusCode(204);

await().atMost(1, TimeUnit.MINUTES).until(() -> {
final var dataValue = get("/subtopics/last").as(SubjectData.class);
return data.equals(dataValue.getData()) && data.equals(dataValue.getResourceId())
&& messageId.equals(dataValue.getMessageId())
&& dataValue.getSubject().equals(subject + "." + subtopic);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class ReactiveMesssagingNatsJetstreamPullTracingTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class))
.addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class,
DataCollectorBean.class, MessageConsumer.class))
.withConfigurationResource("application-pull-tracing.properties");

@Inject
Expand All @@ -43,16 +44,17 @@ public void tracing() {

RestAssured.given().pathParam("id", messageId).pathParam("data", data).post("/data/{id}/{data}").then().statusCode(204);

final var spans = spanExporter.getFinishedSpanItems(3);
final var spans = spanExporter.getFinishedSpanItems(5);
assertThat(spans).isNotEmpty();

List<SpanData> parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid()))
.toList();
assertEquals(1, parentSpans.size());
assertEquals(2, parentSpans.size());

for (var parentSpan : parentSpans) {
assertThat(spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count())
.isEqualTo(1);
final var parentSpanId = parentSpan.getSpanId();
final var childSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpanId)).toList();
assertThat(childSpans).hasSize(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public class ReactiveMesssagingNatsJetstreamPushTest {
Advisory.class, DeadLetterResource.class, DeadLetterConsumingBean.class,
DurableResource.class, DurableConsumingBean.class, RedeliveryResource.class,
RedeliveryConsumingBean.class, ExponentialBackoffConsumingBean.class,
ExponentialBackoffResource.class, SubtopicsResource.class, SubtopicsConsumingBean.class,
SubjectData.class))
ExponentialBackoffResource.class, DataCollectorBean.class, MessageConsumer.class))
.withConfigurationResource("application-push.properties");

@BeforeEach
Expand Down Expand Up @@ -126,22 +125,4 @@ void redelivery() {
return value.equalsIgnoreCase("42");
});
}

@Test
public void subtopic() {
final var messageId = "9a99811a-ef82-468e-9f0b-7879f7be16a9";
final var data = "N6cXzM";
final var subject = "subtopic";
final var subtopic = "data";

given().pathParam("subtopic", subtopic).pathParam("id", messageId).pathParam("data", data)
.post("/subtopics/{subtopic}/{id}/{data}").then().statusCode(204);

await().atMost(1, TimeUnit.MINUTES).until(() -> {
final var dataValue = get("/subtopics/last").as(SubjectData.class);
return data.equals(dataValue.getData()) && data.equals(dataValue.getResourceId())
&& messageId.equals(dataValue.getMessageId())
&& dataValue.getSubject().equals(subject + "." + subtopic);
});
}
}
Loading

0 comments on commit 72cd003

Please sign in to comment.