From 72cd003464ee9e835454628d5a6bae9622a717d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paw=20Figg=C3=A9=20Kjeldgaard?= Date: Fri, 6 Dec 2024 15:58:49 +0100 Subject: [PATCH] Fixed tracing of outgoing messages (#243) * Fixed tracing outgoing messages * Bumped io.nats:jnats from 2.20.4 to 2.20.5 --- .github/project.yml | 2 +- ...ctiveMesssagingNatsJetstreamProcessor.java | 6 +- .../jetstream/test/DataCollectorBean.java | 40 +++ .../jetstream/test/DataConsumingBean.java | 47 ++-- .../nats/jetstream/test/DataResource.java | 6 +- .../jetstream/test/FetchMessagesTest.java | 26 +- .../nats/jetstream/test/HealthTest.java | 3 +- .../nats/jetstream/test/MessageConsumer.java | 16 ++ ...iveMesssagingNatsJetstreamDevModeTest.java | 2 +- ...activeMesssagingNatsJetstreamPullTest.java | 21 +- ...esssagingNatsJetstreamPullTracingTest.java | 12 +- ...activeMesssagingNatsJetstreamPushTest.java | 21 +- ...esssagingNatsJetstreamPushTracingTest.java | 7 +- .../test/ReaderSubscribeConnectionTest.java | 5 - .../test/RedeliveryConsumingBean.java | 4 +- .../jetstream/test/RequestReplyResource.java | 31 +-- .../nats/jetstream/test/SubjectData.java | 34 --- .../test/SubtopicsConsumingBean.java | 42 --- .../jetstream/test/SubtopicsResource.java | 49 ---- .../resources/application-health.properties | 14 +- .../application-pull-tracing.properties | 12 +- .../application-push-tracing.properties | 13 +- .../resources/application-push.properties | 20 +- .../src/test/resources/application.properties | 19 +- .../ROOT/pages/includes/attributes.adoc | 2 +- .../nats/jetstream/it/DataConsumingBean.java | 4 +- .../nats/jetstream/it/DataResource.java | 4 +- pom.xml | 2 +- .../JetStreamBuildConfiguration.java | 6 + .../nats/jetstream/JetStreamConnector.java | 26 +- .../nats/jetstream/JetStreamMessage.java | 17 -- .../JetStreamOutgoingMessageMetadata.java | 38 --- .../nats/jetstream/client/Connection.java | 14 +- .../nats/jetstream/client/Context.java | 30 +++ .../jetstream/client/ContextSupplier.java | 9 + .../jetstream/client/DefaultConnection.java | 248 ++++++++---------- .../client/DefaultConnectionFactory.java | 29 +- .../jetstream/client/JetStreamMessage.java | 57 ++++ .../jetstream/client/PushSubscription.java | 15 +- .../jetstream/client/ReaderSubscription.java | 20 +- .../nats/jetstream/client/Subscription.java | 4 +- .../{ => client/api}/ExponentialBackoff.java | 2 +- .../client/api/JetStreamMessage.java | 108 +------- .../nats/jetstream/client/api/Message.java | 55 ++++ .../api/PublishMessage.java} | 16 +- .../api/PublishMessageMetadata.java} | 16 +- .../jetstream/client/api/ResolvedMessage.java | 111 ++++++++ .../client/api/SubscribeMessage.java | 47 ++++ .../client/api/SubscribeMessageMetadata.java | 27 ++ .../configuration/ConsumerConfiguration.java | 2 - .../configuration/PublishConfiguration.java | 2 - .../client/tracing/DefaultTracer.java | 105 ++++++++ .../PublishMessageAttributesExtractor.java | 88 +++++++ .../tracing/PublishMessageTextMapGetter.java} | 12 +- .../SubscribeMessageAttributesExtractor.java | 72 +++++ .../SubscribeMessageTextMapSetter.java | 19 ++ .../nats/jetstream/client/tracing/Tracer.java | 15 ++ .../client/tracing/TracerFactory.java | 27 ++ .../mapper/DefaultMessageMapper.java | 34 +-- .../mapper/DefaultPayloadMapper.java | 2 +- .../nats/jetstream/mapper/MessageMapper.java | 3 +- ...aultMessagePullPublisherConfiguration.java | 5 - ...aultMessagePushPublisherConfiguration.java | 5 - .../publisher/MessagePublisherProcessor.java | 13 +- .../MessagePullPublisherProcessor.java | 8 +- .../MessagePushPublisherProcessor.java | 8 +- ...DefaultMessageSubscriberConfiguration.java | 5 - .../MessageSubscriberProcessor.java | 13 +- .../tracing/JetStreamInstrument.java | 55 ---- .../jetstream/tracing/JetStreamTrace.java | 33 --- .../JetStreamTraceAttributesExtractor.java | 85 ------ .../tracing/JetStreamTraceTextMapSetter.java | 19 -- .../jetstream/ExponentialBackoffTest.java | 2 + .../MessagePublisherProcessorTest.java | 5 - 74 files changed, 1096 insertions(+), 900 deletions(-) create mode 100644 deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataCollectorBean.java create mode 100644 deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/MessageConsumer.java delete mode 100644 deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubjectData.java delete mode 100644 deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsConsumingBean.java delete mode 100644 deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsResource.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamMessage.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamOutgoingMessageMetadata.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Context.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ContextSupplier.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/JetStreamMessage.java rename runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/{ => client/api}/ExponentialBackoff.java (83%) create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/Message.java rename runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/{JetStreamIncomingMessage.java => client/api/PublishMessage.java} (88%) rename runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/{JetStreamIncomingMessageMetadata.java => client/api/PublishMessageMetadata.java} (79%) create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ResolvedMessage.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessage.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessageMetadata.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/DefaultTracer.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageAttributesExtractor.java rename runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/{tracing/JetStreamTraceTextMapGetter.java => client/tracing/PublishMessageTextMapGetter.java} (62%) create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageAttributesExtractor.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageTextMapSetter.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/Tracer.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/TracerFactory.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTrace.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceAttributesExtractor.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapSetter.java diff --git a/.github/project.yml b/.github/project.yml index f2849d3a..a7ca5896 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.17.1" + current-version: "3.17.3" next-version: "3.18.0-SNAPSHOT" diff --git a/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java b/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java index 44646e54..ce7f81ec 100644 --- a/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java +++ b/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java @@ -6,12 +6,13 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamBuildConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamConnector; import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamRecorder; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context; import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapperImpl; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultPayloadMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapperImpl; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem; import io.quarkus.deployment.annotations.BuildProducer; @@ -53,13 +54,14 @@ void initializeSecureRandomRelatedClassesAtRuntime( @BuildStep void createNatsConnector(BuildProducer buildProducer) { buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamConnector.class)); - buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamInstrument.class)); + buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(TracerFactory.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ExecutionHolder.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultConnectionFactory.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultPayloadMapper.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultMessageMapper.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConsumerMapperImpl.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(StreamStateMapperImpl.class)); + buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(Context.class)); } @BuildStep diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataCollectorBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataCollectorBean.java new file mode 100644 index 00000000..a2c53951 --- /dev/null +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataCollectorBean.java @@ -0,0 +1,40 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.test; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.jboss.logging.Logger; + +import io.smallrye.mutiny.Uni; + +@ApplicationScoped +public class DataCollectorBean implements MessageConsumer { + private final static Logger logger = Logger.getLogger(DataCollectorBean.class); + + private final AtomicReference lastData = new AtomicReference<>(); + + @Incoming("data-collector") + public Uni data(Message message) { + return Uni.createFrom().item(message) + .onItem().invoke(m -> logger.infof("Received message: %s", message)) + .onItem().transformToUni(this::setLast) + .onItem().transformToUni(this::acknowledge) + .onFailure().recoverWithUni(throwable -> notAcknowledge(message, throwable)); + } + + private Uni> setLast(Message message) { + return Uni.createFrom().item(() -> { + lastData.set(message.getPayload()); + return message; + }); + } + + public Optional getLast() { + return Optional.ofNullable(lastData.get()); + } + +} diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataConsumingBean.java index 3b6139da..1166d12b 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataConsumingBean.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataConsumingBean.java @@ -1,40 +1,49 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.test; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; import org.jboss.logging.Logger; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata; import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.mutiny.tuples.Tuple2; @ApplicationScoped -public class DataConsumingBean { +public class DataConsumingBean implements MessageConsumer { private final static Logger logger = Logger.getLogger(DataConsumingBean.class); - private final AtomicReference lastData = new AtomicReference<>(); + private final Emitter dataEmitter; + + public DataConsumingBean(@Channel("data-emitter") Emitter dataEmitter) { + this.dataEmitter = dataEmitter; + } - @Blocking @Incoming("data-consumer") public Uni data(Message message) { return Uni.createFrom().item(message) - .onItem().invoke(m -> { - logger.infof("Received message: %s", message); - message.getMetadata(JetStreamIncomingMessageMetadata.class) - .ifPresent(metadata -> lastData.set( - new Data(message.getPayload(), metadata.headers().get("RESOURCE_ID").get(0), - metadata.messageId()))); - }) - .onItem().transformToUni(m -> Uni.createFrom().completionStage(m.ack())) - .onFailure().recoverWithUni(throwable -> Uni.createFrom().completionStage(message.nack(throwable))); + .onItem().invoke(m -> logger.infof("Received message: %s", message)) + .onItem().transformToUni(this::publish) + .onItem().transformToUni(this::acknowledge) + .onFailure().recoverWithUni(throwable -> notAcknowledge(message, throwable)); } - public Optional getLast() { - return Optional.ofNullable(lastData.get()); + private Uni> publish(Message message) { + try { + return Uni.createFrom() + .item(() -> message.getMetadata(PublishMessageMetadata.class) + .map(metadata -> Tuple2.of(metadata.headers().get("RESOURCE_ID").get(0), metadata.messageId())) + .orElse(Tuple2.of(null, null))) + .onItem() + .transformToUni(tuple -> Uni.createFrom() + .completionStage( + dataEmitter.send(new Data(message.getPayload(), tuple.getItem1(), tuple.getItem2())))) + .onItem().transform(ignore -> message); + } catch (Exception e) { + return Uni.createFrom().failure(e); + } } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataResource.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataResource.java index dd8c0f92..a8ab936a 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataResource.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DataResource.java @@ -11,7 +11,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata; import io.smallrye.mutiny.Uni; @Path("/data") @@ -19,7 +19,7 @@ public class DataResource { @Inject - DataConsumingBean bean; + DataCollectorBean bean; @Channel("data") Emitter emitter; @@ -40,7 +40,7 @@ private Uni> emitData(String id, String data) { return Uni.createFrom().item(() -> { final var headers = new HashMap>(); headers.put("RESOURCE_ID", List.of(data)); - final var message = Message.of(data, Metadata.of(JetStreamOutgoingMessageMetadata.of(id, headers, null))); + final var message = Message.of(data, Metadata.of(SubscribeMessageMetadata.of(id, headers))); emitter.send(message); return message; }); diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java index 88405725..3a229d8f 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java @@ -22,10 +22,12 @@ import io.nats.client.api.ReplayPolicy; import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context; import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Uni; @@ -44,6 +46,12 @@ public class FetchMessagesTest { @Inject ConnectionFactory connectionFactory; + @Inject + Context context; + + @Inject + TracerFactory tracerFactory; + @BeforeEach public void setup() throws Exception { try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), @@ -158,7 +166,10 @@ private void publish(Data data, String subject) throws Exception { new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { final var publishConfiguragtion = createPublishConfiguration(subject); final var fetchConsumerConfiguration = createFetchConsumerConfiguration(subject); - connection.publish(Message.of(data), publishConfiguragtion, fetchConsumerConfiguration).await() + context.withContext( + ctx -> connection.publish(Message.of(data), publishConfiguragtion, fetchConsumerConfiguration, + tracerFactory.create(), ctx)) + .await() .atMost(Duration.ofSeconds(30)); } } @@ -167,7 +178,9 @@ private Data fetch(String subject, boolean ack) throws Exception { try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { final var fetchConsumerConfiguration = createFetchConsumerConfiguration(subject); - final var received = connection.nextMessage(fetchConsumerConfiguration).await().atMost(Duration.ofSeconds(30)); + final var received = context + .withContext(ctx -> connection.nextMessage(fetchConsumerConfiguration, tracerFactory.create(), ctx)).await() + .atMost(Duration.ofSeconds(30)); if (ack) { Uni.createFrom().completionStage(received.ack()).await().atMost(Duration.ofSeconds(30)); } else { @@ -184,11 +197,6 @@ public Optional fetchTimeout() { return Optional.of(Duration.ofSeconds(10)); } - @Override - public boolean traceEnabled() { - return false; - } - @Override public boolean exponentialBackoff() { return false; @@ -313,10 +321,6 @@ public Optional> payloadType() { private PublishConfiguration createPublishConfiguration(String subject) { return new PublishConfiguration() { - @Override - public boolean traceEnabled() { - return false; - } @Override public String stream() { diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java index fec1c4de..5f7191e1 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java @@ -19,7 +19,8 @@ public class HealthTest { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( () -> ShrinkWrap.create(JavaArchive.class) - .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class)) + .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class, + DataCollectorBean.class, MessageConsumer.class)) .withConfigurationResource("application-health.properties"); @Test diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/MessageConsumer.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/MessageConsumer.java new file mode 100644 index 00000000..a58a6e4d --- /dev/null +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/MessageConsumer.java @@ -0,0 +1,16 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.test; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; + +public interface MessageConsumer { + + default Uni acknowledge(Message message) { + return Uni.createFrom().completionStage(message.ack()); + } + + default Uni notAcknowledge(Message message, Throwable throwable) { + return Uni.createFrom().completionStage(message.nack(throwable)); + } +} diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java index abc0e9b8..b17ff2c7 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java @@ -19,7 +19,7 @@ public class ReactiveMesssagingNatsJetstreamDevModeTest { TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class, Advisory.class, DeadLetterResource.class, DeadLetterConsumingBean.class, DurableResource.class, DurableConsumingBean.class, RedeliveryResource.class, - RedeliveryConsumingBean.class) + RedeliveryConsumingBean.class, DataCollectorBean.class, MessageConsumer.class) .addAsResource("application.properties")); @Test diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java index 110ffddc..54cad5ea 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java @@ -26,8 +26,7 @@ public class ReactiveMesssagingNatsJetstreamPullTest { TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class, Advisory.class, DeadLetterResource.class, DeadLetterConsumingBean.class, DurableResource.class, DurableConsumingBean.class, RedeliveryResource.class, - RedeliveryConsumingBean.class, SubtopicsResource.class, SubtopicsConsumingBean.class, - SubjectData.class)) + RedeliveryConsumingBean.class, DataCollectorBean.class, MessageConsumer.class)) .withConfigurationResource("application.properties"); @BeforeEach @@ -102,22 +101,4 @@ void redelivery() { return value.equalsIgnoreCase("42"); }); } - - @Test - public void subtopic() { - final var messageId = "9a99811a-ef82-468e-9f0b-7879f7be16a9"; - final var data = "N6cXzM"; - final var subject = "subtopic"; - final var subtopic = "data"; - - given().pathParam("subtopic", subtopic).pathParam("id", messageId).pathParam("data", data) - .post("/subtopics/{subtopic}/{id}/{data}").then().statusCode(204); - - await().atMost(1, TimeUnit.MINUTES).until(() -> { - final var dataValue = get("/subtopics/last").as(SubjectData.class); - return data.equals(dataValue.getData()) && data.equals(dataValue.getResourceId()) - && messageId.equals(dataValue.getMessageId()) - && dataValue.getSubject().equals(subject + "." + subtopic); - }); - } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java index c3cd1ff2..800ed34d 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java @@ -24,7 +24,8 @@ public class ReactiveMesssagingNatsJetstreamPullTracingTest { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( () -> ShrinkWrap.create(JavaArchive.class) - .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class)) + .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class, + DataCollectorBean.class, MessageConsumer.class)) .withConfigurationResource("application-pull-tracing.properties"); @Inject @@ -43,16 +44,17 @@ public void tracing() { RestAssured.given().pathParam("id", messageId).pathParam("data", data).post("/data/{id}/{data}").then().statusCode(204); - final var spans = spanExporter.getFinishedSpanItems(3); + final var spans = spanExporter.getFinishedSpanItems(5); assertThat(spans).isNotEmpty(); List parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) .toList(); - assertEquals(1, parentSpans.size()); + assertEquals(2, parentSpans.size()); for (var parentSpan : parentSpans) { - assertThat(spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()) - .isEqualTo(1); + final var parentSpanId = parentSpan.getSpanId(); + final var childSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpanId)).toList(); + assertThat(childSpans).hasSize(1); } } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java index 31f14acc..b4a8d9dc 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java @@ -28,8 +28,7 @@ public class ReactiveMesssagingNatsJetstreamPushTest { Advisory.class, DeadLetterResource.class, DeadLetterConsumingBean.class, DurableResource.class, DurableConsumingBean.class, RedeliveryResource.class, RedeliveryConsumingBean.class, ExponentialBackoffConsumingBean.class, - ExponentialBackoffResource.class, SubtopicsResource.class, SubtopicsConsumingBean.class, - SubjectData.class)) + ExponentialBackoffResource.class, DataCollectorBean.class, MessageConsumer.class)) .withConfigurationResource("application-push.properties"); @BeforeEach @@ -126,22 +125,4 @@ void redelivery() { return value.equalsIgnoreCase("42"); }); } - - @Test - public void subtopic() { - final var messageId = "9a99811a-ef82-468e-9f0b-7879f7be16a9"; - final var data = "N6cXzM"; - final var subject = "subtopic"; - final var subtopic = "data"; - - given().pathParam("subtopic", subtopic).pathParam("id", messageId).pathParam("data", data) - .post("/subtopics/{subtopic}/{id}/{data}").then().statusCode(204); - - await().atMost(1, TimeUnit.MINUTES).until(() -> { - final var dataValue = get("/subtopics/last").as(SubjectData.class); - return data.equals(dataValue.getData()) && data.equals(dataValue.getResourceId()) - && messageId.equals(dataValue.getMessageId()) - && dataValue.getSubject().equals(subject + "." + subtopic); - }); - } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java index 650eb858..eee1f3f2 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java @@ -24,7 +24,8 @@ public class ReactiveMesssagingNatsJetstreamPushTracingTest { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( () -> ShrinkWrap.create(JavaArchive.class) - .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class)) + .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class, + DataCollectorBean.class, MessageConsumer.class)) .withConfigurationResource("application-pull-tracing.properties"); @Inject @@ -43,12 +44,12 @@ public void tracing() { RestAssured.given().pathParam("id", messageId).pathParam("data", data).post("/data/{id}/{data}").then().statusCode(204); - final var spans = spanExporter.getFinishedSpanItems(3); + final var spans = spanExporter.getFinishedSpanItems(5); assertThat(spans).isNotEmpty(); List parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) .toList(); - assertEquals(1, parentSpans.size()); + assertEquals(2, parentSpans.size()); for (var parentSpan : parentSpans) { assertThat(spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()) diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java index 69fcb303..e8efc07d 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java @@ -204,11 +204,6 @@ public Optional> payloadType() { return Optional.of(Object.class); } - @Override - public boolean traceEnabled() { - return false; - } - @Override public boolean exponentialBackoff() { return false; diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RedeliveryConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RedeliveryConsumingBean.java index b245e133..e0954e51 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RedeliveryConsumingBean.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RedeliveryConsumingBean.java @@ -6,7 +6,7 @@ import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.unchecked.Unchecked; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -21,7 +21,7 @@ public class RedeliveryConsumingBean { public Uni unstable(Message message) { return Uni.createFrom().item(message) .onItem().transformToUni(m -> Uni.createFrom().item(Unchecked.supplier(() -> { - final var metadata = message.getMetadata(JetStreamIncomingMessageMetadata.class) + final var metadata = message.getMetadata(PublishMessageMetadata.class) .orElseThrow(() -> new RuntimeException("No metadata")); if (metadata.deliveredCount() < 3) { throw new RuntimeException("Redeliver message"); diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java index 98d95582..8bb363db 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java @@ -14,7 +14,6 @@ import jakarta.enterprise.context.RequestScoped; import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Reception; -import jakarta.inject.Inject; import jakarta.ws.rs.*; import org.eclipse.microprofile.reactive.messaging.Message; @@ -24,14 +23,16 @@ import io.nats.client.api.DeliverPolicy; import io.nats.client.api.ReplayPolicy; import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context; import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.smallrye.mutiny.Uni; @Path("/request-reply") @@ -42,14 +43,17 @@ public class RequestReplyResource { private final NatsConfiguration natsConfiguration; private final String streamName; private final AtomicReference messageConnection; + private final TracerFactory tracerFactory; + private final Context context; - @Inject public RequestReplyResource(ConnectionFactory connectionFactory, - NatsConfiguration natsConfiguration) { + NatsConfiguration natsConfiguration, TracerFactory tracerFactory, Context context) { this.connectionFactory = connectionFactory; this.natsConfiguration = natsConfiguration; this.streamName = "request-reply"; this.messageConnection = new AtomicReference<>(); + this.tracerFactory = tracerFactory; + this.context = context; } @GET @@ -115,13 +119,9 @@ private Uni getOrEstablishMessageConnection() { } private Uni produceData(Connection connection, String subject, String id, String data, String messageId) { - return connection.publish( - Message.of(new Data(data, id, messageId), Metadata.of(JetStreamOutgoingMessageMetadata.of(messageId))), + return context.withContext(ctx -> connection.publish( + Message.of(new Data(data, id, messageId), Metadata.of(SubscribeMessageMetadata.of(messageId))), new PublishConfiguration() { - @Override - public boolean traceEnabled() { - return true; - } @Override public String stream() { @@ -132,12 +132,14 @@ public String stream() { public String subject() { return "events." + subject; } - }, getConsumerConfiguration(streamName, subject)) + }, getConsumerConfiguration(streamName, subject), tracerFactory.create(), ctx)) .onItem().transformToUni(m -> Uni.createFrom().voidItem()); } public Uni consumeData(Connection connection, String subject) { - return connection.nextMessage(getConsumerConfiguration(streamName, subject)) + return context + .withContext( + c -> connection.nextMessage(getConsumerConfiguration(streamName, subject), tracerFactory.create(), c)) .map(message -> { message.ack(); return message.getPayload(); @@ -172,11 +174,6 @@ public String name() { return subject; } - @Override - public boolean traceEnabled() { - return true; - } - @Override public Optional> payloadType() { return Optional.empty(); diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubjectData.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubjectData.java deleted file mode 100644 index 242a49d3..00000000 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubjectData.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.test; - -public class SubjectData { - private String data; - private String resourceId; - private String messageId; - private String subject; - - public SubjectData(String data, String resourceId, String messageId, String subject) { - this.data = data; - this.resourceId = resourceId; - this.messageId = messageId; - this.subject = subject; - } - - public SubjectData() { - } - - public String getData() { - return data; - } - - public String getResourceId() { - return resourceId; - } - - public String getMessageId() { - return messageId; - } - - public String getSubject() { - return subject; - } -} diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsConsumingBean.java deleted file mode 100644 index e099f432..00000000 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsConsumingBean.java +++ /dev/null @@ -1,42 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.test; - -import java.util.Optional; - -import jakarta.enterprise.context.ApplicationScoped; - -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.jboss.logging.Logger; - -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata; -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.annotations.Blocking; - -@ApplicationScoped -public class SubtopicsConsumingBean { - private final static Logger logger = Logger.getLogger(DataConsumingBean.class); - - volatile Optional lastData = Optional.empty(); - - @Blocking - @Incoming("subtopics-consumer") - public Uni data(Message message) { - return Uni.createFrom().item(() -> handleData(message)) - .onItem().transformToUni(m -> Uni.createFrom().completionStage(m.ack())) - .onFailure().recoverWithUni(throwable -> Uni.createFrom().completionStage(message.nack(throwable))); - } - - public Optional getLast() { - return lastData; - } - - private Message handleData(Message message) { - logger.infof("Received message: %s", message); - message.getMetadata(JetStreamIncomingMessageMetadata.class) - .ifPresent(metadata -> lastData = Optional.of( - new SubjectData(message.getPayload(), metadata.headers().get("RESOURCE_ID").get(0), - metadata.messageId(), - metadata.subject()))); - return message; - } -} diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsResource.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsResource.java deleted file mode 100644 index 7e02f8a8..00000000 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/SubtopicsResource.java +++ /dev/null @@ -1,49 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.test; - -import java.util.HashMap; -import java.util.List; - -import jakarta.inject.Inject; -import jakarta.ws.rs.*; - -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Emitter; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.Metadata; - -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata; -import io.smallrye.mutiny.Uni; - -@Path("/subtopics") -@Produces("application/json") -public class SubtopicsResource { - - @Inject - SubtopicsConsumingBean bean; - - @Channel("subtopic") - Emitter emitter; - - @GET - @Path("/last") - public SubjectData getLast() { - return bean.getLast().orElseGet(SubjectData::new); - } - - @POST - @Path("/{subtopic}/{id}/{data}") - public Uni produceData(@PathParam("subtopic") String subtopic, @PathParam("id") String id, - @PathParam("data") String data) { - return Uni.createFrom().item(() -> emitData(subtopic, id, data)) - .onItem().ignore().andContinueWithNull(); - } - - private Message emitData(String subtopic, String id, String data) { - final var headers = new HashMap>(); - headers.put("RESOURCE_ID", List.of(data)); - final var message = Message.of(data, - Metadata.of(JetStreamOutgoingMessageMetadata.of(id, headers, subtopic))); - emitter.send(message); - return message; - } -} diff --git a/deployment/src/test/resources/application-health.properties b/deployment/src/test/resources/application-health.properties index f36c74a7..d93846cf 100644 --- a/deployment/src/test/resources/application-health.properties +++ b/deployment/src/test/resources/application-health.properties @@ -1,5 +1,6 @@ quarkus.messaging.nats.jet-stream.streams[0].name=test -quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=data +quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=data +quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=data-stream mp.messaging.outgoing.data.connector=quarkus-jetstream mp.messaging.outgoing.data.stream=test @@ -7,4 +8,13 @@ mp.messaging.outgoing.data.subject=data mp.messaging.incoming.data-consumer.connector=quarkus-jetstream mp.messaging.incoming.data-consumer.stream=test -mp.messaging.incoming.data-consumer.subject=data \ No newline at end of file +mp.messaging.incoming.data-consumer.subject=data + +mp.messaging.outgoing.data-emitter.connector=quarkus-jetstream +mp.messaging.outgoing.data-emitter.stream=test +mp.messaging.outgoing.data-emitter.subject=data-stream + +mp.messaging.incoming.data-collector.connector=quarkus-jetstream +mp.messaging.incoming.data-collector.subject=data-stream +mp.messaging.incoming.data-collector.stream=test +mp.messaging.incoming.data-collector.max-deliver=1 \ No newline at end of file diff --git a/deployment/src/test/resources/application-pull-tracing.properties b/deployment/src/test/resources/application-pull-tracing.properties index 41306bdf..bc5a013e 100644 --- a/deployment/src/test/resources/application-pull-tracing.properties +++ b/deployment/src/test/resources/application-pull-tracing.properties @@ -1,5 +1,6 @@ quarkus.messaging.nats.jet-stream.streams[0].name=test-tracing quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=data-tracing +quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=data-stream mp.messaging.outgoing.data.connector=quarkus-jetstream mp.messaging.outgoing.data.stream=test-tracing @@ -7,4 +8,13 @@ mp.messaging.outgoing.data.subject=data-tracing mp.messaging.incoming.data-consumer.connector=quarkus-jetstream mp.messaging.incoming.data-consumer.stream=test-tracing -mp.messaging.incoming.data-consumer.subject=data-tracing \ No newline at end of file +mp.messaging.incoming.data-consumer.subject=data-tracing + +mp.messaging.outgoing.data-emitter.connector=quarkus-jetstream +mp.messaging.outgoing.data-emitter.stream=test-tracing +mp.messaging.outgoing.data-emitter.subject=data-stream + +mp.messaging.incoming.data-collector.connector=quarkus-jetstream +mp.messaging.incoming.data-collector.subject=data-stream +mp.messaging.incoming.data-collector.stream=test-tracing +mp.messaging.incoming.data-collector.max-deliver=1 \ No newline at end of file diff --git a/deployment/src/test/resources/application-push-tracing.properties b/deployment/src/test/resources/application-push-tracing.properties index 7a6406b3..f1c8bfa9 100644 --- a/deployment/src/test/resources/application-push-tracing.properties +++ b/deployment/src/test/resources/application-push-tracing.properties @@ -1,5 +1,6 @@ quarkus.messaging.nats.jet-stream.streams[0].name=test-tracing quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=data-tracing +quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=data-stream mp.messaging.outgoing.data.connector=quarkus-jetstream mp.messaging.outgoing.data.stream=test-tracing @@ -8,4 +9,14 @@ mp.messaging.outgoing.data.subject=data-tracing mp.messaging.incoming.data-consumer.connector=quarkus-jetstream mp.messaging.incoming.data-consumer.stream=test-tracing mp.messaging.incoming.data-consumer.subject=data-tracing -mp.messaging.incoming.data-consumer.publisher-type=Push \ No newline at end of file +mp.messaging.incoming.data-consumer.publisher-type=Push + +mp.messaging.outgoing.data-emitter.connector=quarkus-jetstream +mp.messaging.outgoing.data-emitter.stream=test-tracing +mp.messaging.outgoing.data-emitter.subject=data-stream + +mp.messaging.incoming.data-collector.connector=quarkus-jetstream +mp.messaging.incoming.data-collector.subject=data-stream +mp.messaging.incoming.data-collector.stream=test-tracing +mp.messaging.incoming.data-collector.max-deliver=1 +mp.messaging.incoming.data-collector.publisher-type=Push \ No newline at end of file diff --git a/deployment/src/test/resources/application-push.properties b/deployment/src/test/resources/application-push.properties index 0519b0d5..7ea55118 100644 --- a/deployment/src/test/resources/application-push.properties +++ b/deployment/src/test/resources/application-push.properties @@ -8,6 +8,7 @@ quarkus.messaging.nats.jet-stream.streams[0].subjects[5]=redelivery-data quarkus.messaging.nats.jet-stream.streams[0].subjects[6]=$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.test.unstable-data-consumer quarkus.messaging.nats.jet-stream.streams[0].subjects[7]=eb quarkus.messaging.nats.jet-stream.streams[0].subjects[8]=$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.test.exponential-backoff-consumer +quarkus.messaging.nats.jet-stream.streams[0].subjects[9]=data-stream mp.messaging.outgoing.source.connector=quarkus-jetstream mp.messaging.outgoing.source.stream=test @@ -27,6 +28,16 @@ mp.messaging.incoming.data-consumer.stream=test mp.messaging.incoming.data-consumer.subject=data mp.messaging.incoming.data-consumer.publisher-type=Push +mp.messaging.outgoing.data-emitter.connector=quarkus-jetstream +mp.messaging.outgoing.data-emitter.stream=test +mp.messaging.outgoing.data-emitter.subject=data-stream + +mp.messaging.incoming.data-collector.connector=quarkus-jetstream +mp.messaging.incoming.data-collector.subject=data-stream +mp.messaging.incoming.data-collector.stream=test +mp.messaging.incoming.data-collector.max-deliver=1 +mp.messaging.incoming.data-collector.publisher-type=Push + mp.messaging.outgoing.unstable-data.connector=quarkus-jetstream mp.messaging.outgoing.unstable-data.stream=test mp.messaging.outgoing.unstable-data.subject=unstable-data @@ -93,12 +104,3 @@ mp.messaging.incoming.unstable.stream=test mp.messaging.incoming.unstable.max-deliver=5 mp.messaging.incoming.unstable.back-off=PT1S,PT5S mp.messaging.incoming.unstable.publisher-type=Push - -mp.messaging.outgoing.subtopic.connector=quarkus-jetstream -mp.messaging.outgoing.subtopic.subject=subtopic -mp.messaging.outgoing.subtopic.stream=test - -mp.messaging.incoming.subtopics-consumer.connector=quarkus-jetstream -mp.messaging.incoming.subtopics-consumer.subject=subtopic.> -mp.messaging.incoming.subtopics-consumer.stream=test -mp.messaging.incoming.subtopics-consumer.publisher-type=Push diff --git a/deployment/src/test/resources/application.properties b/deployment/src/test/resources/application.properties index bb31671c..80f8ee6a 100644 --- a/deployment/src/test/resources/application.properties +++ b/deployment/src/test/resources/application.properties @@ -6,6 +6,7 @@ quarkus.messaging.nats.jet-stream.streams[0].subjects[3]=unstable-data quarkus.messaging.nats.jet-stream.streams[0].subjects[4]=dc quarkus.messaging.nats.jet-stream.streams[0].subjects[5]=redelivery-data quarkus.messaging.nats.jet-stream.streams[0].subjects[6]=$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.test.unstable-data-consumer +quarkus.messaging.nats.jet-stream.streams[0].subjects[7]=data-stream mp.messaging.outgoing.source.connector=quarkus-jetstream mp.messaging.outgoing.source.stream=test @@ -23,6 +24,16 @@ mp.messaging.incoming.data-consumer.connector=quarkus-jetstream mp.messaging.incoming.data-consumer.stream=test mp.messaging.incoming.data-consumer.subject=data +mp.messaging.outgoing.data-emitter.connector=quarkus-jetstream +mp.messaging.outgoing.data-emitter.stream=test +mp.messaging.outgoing.data-emitter.subject=data-stream + +mp.messaging.incoming.data-collector.connector=quarkus-jetstream +mp.messaging.incoming.data-collector.subject=data-stream +mp.messaging.incoming.data-collector.stream=test +mp.messaging.incoming.data-collector.max-deliver=1 +mp.messaging.incoming.data-collector.durable=data-collector + mp.messaging.outgoing.unstable-data.connector=quarkus-jetstream mp.messaging.outgoing.unstable-data.stream=test mp.messaging.outgoing.unstable-data.subject=unstable-data @@ -65,11 +76,3 @@ mp.messaging.incoming.unstable.subject=redelivery-data mp.messaging.incoming.unstable.stream=test mp.messaging.incoming.unstable.max-deliver=5 mp.messaging.incoming.unstable.back-off=PT1S,PT5S - -mp.messaging.outgoing.subtopic.connector=quarkus-jetstream -mp.messaging.outgoing.subtopic.subject=subtopic -mp.messaging.outgoing.subtopic.stream=test - -mp.messaging.incoming.subtopics-consumer.connector=quarkus-jetstream -mp.messaging.incoming.subtopics-consumer.subject=subtopic.> -mp.messaging.incoming.subtopics-consumer.stream=test \ No newline at end of file diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index 83aec4d4..78fe955b 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.17.1 +:project-version: 3.17.3 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataConsumingBean.java b/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataConsumingBean.java index 9caaa6f7..8d6f7cfc 100644 --- a/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataConsumingBean.java +++ b/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataConsumingBean.java @@ -8,7 +8,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.jboss.logging.Logger; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -32,7 +32,7 @@ public Optional getLast() { private void handleData(Message message) { logger.infof("Received message: %s", message); - message.getMetadata(JetStreamIncomingMessageMetadata.class) + message.getMetadata(PublishMessageMetadata.class) .ifPresent(metadata -> lastData = Optional.of( new Data(message.getPayload().getData(), metadata.headers().get("RESOURCE_ID").get(0), metadata.messageId(), message.getPayload().getCreationTime()))); diff --git a/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResource.java b/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResource.java index a7f8b377..987f60de 100644 --- a/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResource.java +++ b/integration-tests/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResource.java @@ -30,7 +30,7 @@ import com.fasterxml.jackson.annotation.JsonView; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata; import io.smallrye.mutiny.Uni; @Path("/data") @@ -68,7 +68,7 @@ private Message emitData(String messageId, Data data) { final var headers = new HashMap>(); headers.put("RESOURCE_ID", List.of(data.getResourceId())); final var message = Message.of(data, - Metadata.of(JetStreamOutgoingMessageMetadata.of(messageId, headers, null))); + Metadata.of(SubscribeMessageMetadata.of(messageId, headers))); emitter.send(message); return message; } diff --git a/pom.xml b/pom.xml index 23bd4be8..c20bc6c1 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ UTF-8 3.17.3 - 2.20.4 + 2.20.5 0.3.0 4.25.0 4.24.0 diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java index 8ae77fb1..aa6e10c1 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java @@ -21,6 +21,12 @@ public interface JetStreamBuildConfiguration { @WithDefault("true") Boolean autoConfigure(); + /** + * Enable tracing for JetStream + */ + @WithDefault("true") + Boolean trace(); + /** * If auto-configure is true the streams are created on Nats server. */ diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java index 41f8419f..5c18e43d 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java @@ -12,7 +12,6 @@ import jakarta.enterprise.context.BeforeDestroyed; import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Reception; -import jakarta.inject.Inject; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; @@ -21,8 +20,10 @@ import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerType; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.*; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.subscriber.MessageSubscriberConfiguration; @@ -39,7 +40,6 @@ // Publish and subscriber processor attributes @ConnectorAttribute(name = "stream", description = "The stream to subscribe or publish messages to", direction = INCOMING_AND_OUTGOING, type = "String") @ConnectorAttribute(name = "subject", description = "The subject to subscribe or publish messages to", direction = INCOMING_AND_OUTGOING, type = "String") -@ConnectorAttribute(name = "trace-enabled", description = "Enable traces for publisher or subscriber", direction = INCOMING_AND_OUTGOING, type = "Boolean", defaultValue = "true") // Publish common processor attributes @ConnectorAttribute(name = "name", description = "The name of the NATS consumer", direction = INCOMING, type = "String") @@ -83,16 +83,22 @@ public class JetStreamConnector implements InboundConnector, OutboundConnector, private final List processors; private final NatsConfiguration natsConfiguration; private final ConnectionFactory connectionFactory; + private final TracerFactory tracerFactory; + private final Context context; - @Inject public JetStreamConnector( NatsConfiguration natsConfiguration, - ConnectionFactory connectionFactory) { + ConnectionFactory connectionFactory, + TracerFactory tracerFactory, + Context context) { this.processors = new CopyOnWriteArrayList<>(); this.natsConfiguration = natsConfiguration; this.connectionFactory = connectionFactory; + this.tracerFactory = tracerFactory; + this.context = context; } + @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override public Flow.Publisher> getPublisher(Config config) { final var configuration = new JetStreamConnectorIncomingConfiguration(config); @@ -108,7 +114,9 @@ public Flow.Subscriber> getSubscriber(Config config) { final var processor = new MessageSubscriberProcessor( connectionConfiguration, connectionFactory, - MessageSubscriberConfiguration.of(configuration)); + MessageSubscriberConfiguration.of(configuration), + tracerFactory, + context); processors.add(processor); return processor.subscriber(); } @@ -152,11 +160,15 @@ private MessagePublisherProcessor createMessagePublisherProcessor( if (ConsumerType.Pull.equals(type)) { return new MessagePullPublisherProcessor<>(connectionFactory, connectionConfiguration, - MessagePullPublisherConfiguration.of(configuration)); + MessagePullPublisherConfiguration.of(configuration), + tracerFactory, + context); } else { return new MessagePushPublisherProcessor<>(connectionFactory, connectionConfiguration, - MessagePushPublisherConfiguration.of(configuration)); + MessagePushPublisherConfiguration.of(configuration), + tracerFactory, + context); } } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamMessage.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamMessage.java deleted file mode 100644 index b140f5d3..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamMessage.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream; - -import java.util.List; -import java.util.Map; - -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; -import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; - -public interface JetStreamMessage extends Message, ContextAwareMessage, MetadataInjectableMessage { - - String getMessageId(); - - Map> getHeaders(); - -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamOutgoingMessageMetadata.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamOutgoingMessageMetadata.java deleted file mode 100644 index 34c78548..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamOutgoingMessageMetadata.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import lombok.Builder; - -@Builder -public record JetStreamOutgoingMessageMetadata(String messageId, Map> headers, - Optional subtopic) { - - public static JetStreamOutgoingMessageMetadata of(final String messageId, - final Map> headers, - final String subtopic) { - return JetStreamOutgoingMessageMetadata.builder() - .messageId(messageId) - .headers(headers != null ? headers : Collections.emptyMap()) - .subtopic(Optional.ofNullable(subtopic)).build(); - } - - public static JetStreamOutgoingMessageMetadata of(final String messageId, - final Map> headers) { - return JetStreamOutgoingMessageMetadata.builder() - .messageId(messageId) - .headers(headers != null ? headers : Collections.emptyMap()) - .subtopic(Optional.empty()).build(); - } - - public static JetStreamOutgoingMessageMetadata of(final String messageId) { - return JetStreamOutgoingMessageMetadata.builder() - .messageId(messageId) - .headers(Collections.emptyMap()) - .subtopic(Optional.empty()).build(); - } - -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java index 2ebec205..f9aede60 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java @@ -10,8 +10,10 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.vertx.mutiny.core.Context; public interface Connection extends StreamSetup, KeyValueStoreSetup, AutoCloseable { @@ -64,14 +66,16 @@ default void fireEvent(ConnectionEvent event, String message) { Uni> purgeAllStreams(); - Uni> publish(final Message message, final PublishConfiguration configuration); + Uni> publish(Message message, PublishConfiguration publishConfiguration, + Tracer tracer, Context context); - Uni> publish(final Message message, final PublishConfiguration publishConfiguration, - FetchConsumerConfiguration consumerConfiguration); + Uni> publish(Message message, + PublishConfiguration publishConfiguration, + FetchConsumerConfiguration consumerConfiguration, Tracer tracer, Context context); - Uni> nextMessage(FetchConsumerConfiguration configuration); + Uni> nextMessage(FetchConsumerConfiguration configuration, Tracer tracer, Context context); - Multi> nextMessages(FetchConsumerConfiguration configuration); + Multi> nextMessages(FetchConsumerConfiguration configuration, Tracer tracer, Context context); Uni getKeyValue(String bucketName, String key, Class valueType); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Context.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Context.java new file mode 100644 index 00000000..e903a58a --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Context.java @@ -0,0 +1,30 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.vertx.mutiny.core.Vertx; + +@ApplicationScoped +public class Context { + private final ExecutionHolder executionHolder; + + public Context(ExecutionHolder executionHolder) { + this.executionHolder = executionHolder; + } + + public T withContext(ContextSupplier supplier) { + final var context = getContext(); + return supplier.get(context); + } + + private Optional getVertx() { + return Optional.ofNullable(executionHolder.vertx()); + } + + private io.vertx.mutiny.core.Context getContext() { + return getVertx().map(Vertx::getOrCreateContext).orElseThrow(() -> new ContextException("No Vertx available")); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ContextSupplier.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ContextSupplier.java new file mode 100644 index 00000000..7195df06 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ContextSupplier.java @@ -0,0 +1,9 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +import io.vertx.mutiny.core.Context; + +@FunctionalInterface +public interface ContextSupplier { + + T get(Context context); +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java index 0181492e..20be2ba5 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java @@ -1,8 +1,6 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client; import static io.nats.client.Connection.Status.CONNECTED; -import static io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper.MESSAGE_TYPE_HEADER; -import static io.smallrye.reactive.messaging.tracing.TracingUtils.traceOutgoing; import java.io.IOException; import java.time.Duration; @@ -21,22 +19,19 @@ import io.nats.client.api.StreamInfo; import io.nats.client.api.StreamInfoOptions; import io.nats.client.impl.Headers; -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.*; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.tuples.Tuple2; -import io.smallrye.mutiny.tuples.Tuple5; import io.smallrye.mutiny.unchecked.Unchecked; import io.vertx.mutiny.core.Context; @@ -45,29 +40,23 @@ public class DefaultConnection implements Connection { private final io.nats.client.Connection connection; private final List listeners; - private final Context context; private final StreamStateMapper streamStateMapper; private final ConsumerMapper consumerMapper; private final MessageMapper messageMapper; private final PayloadMapper payloadMapper; - private final JetStreamInstrument instrument; DefaultConnection(final ConnectionConfiguration configuration, final ConnectionListener connectionListener, - final Context context, final MessageMapper messageMapper, final PayloadMapper payloadMapper, final ConsumerMapper consumerMapper, - final StreamStateMapper streamStateMapper, - final JetStreamInstrument instrumenter) throws ConnectionException { + final StreamStateMapper streamStateMapper) throws ConnectionException { this.connection = connect(configuration); this.listeners = new ArrayList<>(List.of(connectionListener)); - this.context = context; this.streamStateMapper = streamStateMapper; this.consumerMapper = consumerMapper; this.messageMapper = messageMapper; this.payloadMapper = payloadMapper; - this.instrument = instrumenter; fireEvent(ConnectionEvent.Connected, "Connection established"); } @@ -78,15 +67,14 @@ public boolean isConnected() { @Override public Uni flush(Duration duration) { - return Uni.createFrom(). item(Unchecked.supplier(() -> { + return Uni.createFrom().item(Unchecked.supplier(() -> { try { connection.flush(duration); return null; } catch (TimeoutException | InterruptedException e) { throw new ConnectionException(e); } - })) - .emitOn(context::runOnContext); + })); } @Override @@ -130,8 +118,7 @@ public Uni getConsumer(String stream, String consumerName) { emitter.fail(new SystemException(e)); } })) - .onItem().transform(consumerMapper::of) - .emitOn(context::runOnContext); + .onItem().transform(consumerMapper::of); } @Override @@ -144,8 +131,7 @@ public Uni deleteConsumer(String streamName, String consumerName) { } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override @@ -162,8 +148,7 @@ public Uni pauseConsumer(String streamName, String consumerName, ZonedDate } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override @@ -180,29 +165,25 @@ public Uni resumeConsumer(String streamName, String consumerName) { } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override public Uni getFirstSequence(String streamName) { return getStreamInfo(streamName) - .onItem().transform(tuple -> tuple.getItem2().getStreamState().getFirstSequence()) - .emitOn(context::runOnContext); + .onItem().transform(tuple -> tuple.getItem2().getStreamState().getFirstSequence()); } @Override public Uni> getStreams() { return getJetStreamManagement() - .onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier((jsm::getStreamNames)))) - .emitOn(context::runOnContext); + .onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier((jsm::getStreamNames)))); } @Override public Uni> getSubjects(String streamName) { return getStreamInfo(streamName) - .onItem().transform(tuple -> tuple.getItem2().getConfiguration().getSubjects()) - .emitOn(context::runOnContext); + .onItem().transform(tuple -> tuple.getItem2().getConfiguration().getSubjects()); } @Override @@ -214,8 +195,7 @@ public Uni> getConsumerNames(String streamName) { } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override @@ -228,8 +208,7 @@ public Uni purgeStream(String streamName) { } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override @@ -248,106 +227,73 @@ public Uni deleteMessage(String stream, long sequence, boolean erase) { sequence, failure.getMessage()), failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override public Uni getStreamState(String streamName) { return getStreamInfo(streamName) - .onItem().transform(tuple -> streamStateMapper.of(tuple.getItem2().getStreamState())) - .emitOn(context::runOnContext); + .onItem().transform(tuple -> streamStateMapper.of(tuple.getItem2().getStreamState())); } @Override public Uni getStreamConfiguration(String streamName) { return getStreamInfo(streamName) - .onItem().transform(tuple -> StreamConfiguration.of(tuple.getItem2().getConfiguration())) - .emitOn(context::runOnContext); + .onItem().transform(tuple -> StreamConfiguration.of(tuple.getItem2().getConfiguration())); } @Override public Uni> purgeAllStreams() { return getStreams() - .onItem().transformToUni(this::purgeAllStreams) - .emitOn(context::runOnContext); + .onItem().transformToUni(this::purgeAllStreams); } @Override - public Uni> publish(final Message message, final PublishConfiguration configuration) { - return Uni.createFrom().> emitter(emitter -> { - try { - final var metadata = message.getMetadata(JetStreamOutgoingMessageMetadata.class); - final var messageId = metadata.map(JetStreamOutgoingMessageMetadata::messageId) - .orElseGet(() -> UUID.randomUUID().toString()); - final var payload = payloadMapper.of(message.getPayload()); - final var subject = metadata.flatMap(JetStreamOutgoingMessageMetadata::subtopic) - .map(subtopic -> configuration.subject() + "." + subtopic).orElseGet(configuration::subject); - final var headers = new HashMap>(); - metadata.ifPresent(m -> headers.putAll(m.headers())); - if (message.getPayload() != null) { - headers.putIfAbsent(MESSAGE_TYPE_HEADER, List.of(message.getPayload().getClass().getTypeName())); - } - - if (configuration.traceEnabled()) { - // Create a new span for the outbound message and record updated tracing information in - // the headers; this has to be done before we build the properties below - traceOutgoing(instrument.publisher(), message, - JetStreamTrace.builder() - .stream(configuration.stream()) - .subject(subject) - .messageId(messageId) - .headers(headers) - .payload(new String(payload)) - .build()); - } - - final var jetStream = connection.jetStream(); - final var options = createPublishOptions(messageId, configuration.stream()); - - emitter.complete(Tuple5.of(jetStream, subject, toJetStreamHeaders(headers), payload, options)); - } catch (Throwable failure) { - emitter.fail( - new PublishException(String.format("Failed to publish message: %s", failure.getMessage()), failure)); - } - }) + public Uni> publish(final Message message, PublishConfiguration publishConfiguration, Tracer tracer, + Context context) { + return Uni.createFrom().voidItem() + .emitOn(context::runOnContext) + .onItem().transformToUni(ignore -> tracer.withTrace(message, publishConfiguration)) .onItem() - .transformToUni(tuple -> Uni.createFrom().completionStage( - tuple.getItem1().publishAsync(tuple.getItem2(), tuple.getItem3(), tuple.getItem4(), tuple.getItem5()))) - .onItem() - .invoke(ack -> logger.debugf("Message published to stream: %s with sequence number: %d", ack.getStream(), - ack.getSeqno())) + .transformToUni(subscribeMessage -> getJetStream().onItem() + .transform(jetStream -> Tuple2.of(jetStream, subscribeMessage))) + .onItem().transformToUni(tuple -> Uni.createFrom().completionStage( + tuple.getItem1().publishAsync(tuple.getItem2().configuration().subject(), + toJetStreamHeaders(tuple.getItem2().headers()), + tuple.getItem2().payload(), + createPublishOptions(tuple.getItem2().messageId(), tuple.getItem2().configuration().stream())))) + .onItem().transform(ack -> { + logger.debugf("Message published to stream: %s with sequence number: %d", ack.getStream(), ack.getSeqno()); + return message; + }) .onItem().transformToUni(ignore -> acknowledge(message)) .onFailure().recoverWithUni(failure -> notAcknowledge(message, failure)) - .onFailure().transform(failure -> new PublishException(failure.getMessage(), failure)) - .emitOn(context::runOnContext); + .onFailure().transform(failure -> new PublishException(failure.getMessage(), failure)); } @Override - public Uni> publish(Message message, - PublishConfiguration publishConfiguration, - FetchConsumerConfiguration consumerConfiguration) { + public Uni> publish(Message message, PublishConfiguration publishConfiguration, + FetchConsumerConfiguration consumerConfiguration, Tracer tracer, Context context) { return addOrUpdateConsumer(consumerConfiguration) - .onItem().transformToUni(v -> publish(message, publishConfiguration)); + .onItem().transformToUni(v -> publish(message, publishConfiguration, tracer, context)); } @Override - public Uni> nextMessage(FetchConsumerConfiguration configuration) { + public Uni> nextMessage(FetchConsumerConfiguration configuration, Tracer tracer, Context context) { ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return addOrUpdateConsumer(configuration) .onItem() - .transformToUni(consumerContext -> nextMessage(consumerContext, configuration)) - .runSubscriptionOn(pullExecutor) - .emitOn(context::runOnContext); + .transformToUni(consumerContext -> nextMessage(consumerContext, configuration, tracer, context)) + .runSubscriptionOn(pullExecutor); } + @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override - public Multi> nextMessages(FetchConsumerConfiguration configuration) { + public Multi> nextMessages(FetchConsumerConfiguration configuration, Tracer tracer, Context context) { ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return addOrUpdateConsumer(configuration) - .onItem().transformToMulti(consumerContext -> nextMessages(consumerContext, configuration)) - .runSubscriptionOn(pullExecutor) - .emitOn(context::runOnContext); + .onItem().transformToMulti(consumerContext -> nextMessages(consumerContext, configuration, tracer, context)) + .runSubscriptionOn(pullExecutor); } @Override @@ -361,8 +307,7 @@ public Uni getKeyValue(String bucketName, String key, Class valueType) } }) .onItem().ifNull().failWith(() -> new KeyValueNotFoundException(bucketName, key)) - .onItem().ifNotNull().transform(keyValueEntry -> payloadMapper.of(keyValueEntry.getValue(), valueType)) - .emitOn(context::runOnContext); + .onItem().ifNotNull().transform(keyValueEntry -> payloadMapper.of(keyValueEntry.getValue(), valueType)); } @Override @@ -375,8 +320,7 @@ public Uni putKeyValue(String bucketName, String key, T value) { } catch (Throwable failure) { emitter.fail(new KeyValueException(failure)); } - }) - .emitOn(context::runOnContext); + }); } @Override @@ -389,8 +333,7 @@ public Uni deleteKeyValue(String bucketName, String key) { } catch (Throwable failure) { emitter.fail(new KeyValueException(failure)); } - }) - .emitOn(context::runOnContext); + }); } @Override @@ -400,17 +343,16 @@ public Uni> resolve(String streamName, long sequence) { final var jetStream = connection.jetStream(); final var streamContext = jetStream.getStreamContext(streamName); final var messageInfo = streamContext.getMessage(sequence); - emitter.complete(new JetStreamMessage<>(messageInfo, payloadMapper. of(messageInfo).orElse(null))); + emitter.complete(new ResolvedMessage<>(messageInfo, payloadMapper. of(messageInfo).orElse(null))); } catch (IOException | JetStreamApiException e) { emitter.fail(e); } - }) - .emitOn(context::runOnContext); + }); } @Override public Uni> subscription(PushConsumerConfiguration configuration) { - return Uni.createFrom().item(() -> new PushSubscription<>(this, configuration, connection, messageMapper, context)); + return Uni.createFrom().item(() -> new PushSubscription<>(this, configuration, connection, messageMapper)); } @Override @@ -420,7 +362,7 @@ public Uni> subscription(ReaderConsumerConfiguration conf .onItem() .transformToUni(pair -> Uni.createFrom() .> item(Unchecked.supplier(() -> new ReaderSubscription<>(this, configuration, - pair.getItem1(), pair.getItem2(), messageMapper, context)))) + pair.getItem1(), pair.getItem2(), messageMapper)))) .onItem().invoke(this::addListener); } @@ -438,10 +380,10 @@ public void close(Subscription subscription) { public Uni addOrUpdateKeyValueStores(List keyValueConfigurations) { return Multi.createFrom().items(keyValueConfigurations.stream()) .onItem().transformToUniAndMerge(this::addOrUpdateKeyValueStore) - .collect().last() - .emitOn(context::runOnContext); + .collect().last(); } + @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override public Uni> addStreams(List streamConfigurations) { return getJetStreamManagement() @@ -450,8 +392,7 @@ public Uni> addStreams(List streamC .items(streamConfigurations.stream() .map(streamConfiguration -> Tuple2.of(jetStreamManagement, streamConfiguration)))) .onItem().transformToUniAndMerge(tuple -> addOrUpdateStream(tuple.getItem1(), tuple.getItem2())) - .collect().asList() - .emitOn(context::runOnContext); + .collect().asList(); } @Override @@ -472,8 +413,7 @@ public Uni addSubject(String streamName, String subject) { } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } @Override @@ -494,8 +434,7 @@ public Uni removeSubject(String streamName, String subject) { } catch (Throwable failure) { emitter.fail(new SystemException(failure)); } - })) - .emitOn(context::runOnContext); + })); } private Uni> createReader(ReaderConsumerConfiguration configuration, @@ -586,6 +525,17 @@ private Uni getJetStreamManagement() { }); } + private Uni getJetStream() { + return Uni.createFrom().emitter(emitter -> { + try { + emitter.complete(connection.jetStream()); + } catch (Throwable failure) { + emitter.fail( + new SystemException(String.format("Unable to get JetStream: %s", failure.getMessage()), failure)); + } + }); + } + private PublishOptions createPublishOptions(final String messageId, final String streamName) { return PublishOptions.builder() .messageId(messageId) @@ -633,15 +583,19 @@ private Uni nextMessage(final ConsumerContext consumerCo } private Uni> nextMessage(final ConsumerContext consumerContext, - final FetchConsumerConfiguration configuration) { - return nextMessage(consumerContext, configuration.fetchTimeout().orElse(null)) + final FetchConsumerConfiguration configuration, + final Tracer tracer, + final Context context) { + return Uni.createFrom().voidItem() + .emitOn(context::runOnContext) + .onItem().transformToUni(ignore -> nextMessage(consumerContext, configuration.fetchTimeout().orElse(null))) .map(message -> messageMapper.of( message, - configuration.traceEnabled(), configuration.payloadType().orElse(null), context, new ExponentialBackoff(false, Duration.ZERO), - configuration.ackTimeout())); + configuration.ackTimeout())) + .onItem().transformToUni(message -> tracer.withTrace(message)); } private Headers toJetStreamHeaders(Map> headers) { @@ -662,33 +616,37 @@ private Uni addOrUpdateConsumer(ConsumerConfiguration co } catch (Throwable failure) { throw new FetchException(failure); } - })) - .emitOn(context::runOnContext); + })); } + @SuppressWarnings("ReactiveStreamsUnusedPublisher") private Multi> nextMessages(final ConsumerContext consumerContext, - FetchConsumerConfiguration configuration) { - return Multi.createFrom().> emitter(emitter -> { - try { - try (final var fetchConsumer = fetchConsumer(consumerContext, configuration.fetchTimeout().orElse(null))) { - var message = fetchConsumer.nextMessage(); - while (message != null) { - emitter.emit(messageMapper.of( - message, - configuration.traceEnabled(), - configuration.payloadType().orElse(null), - context, - new ExponentialBackoff(false, Duration.ZERO), - configuration.ackTimeout())); - message = fetchConsumer.nextMessage(); + final FetchConsumerConfiguration configuration, + final Tracer tracer, + final Context context) { + return Uni.createFrom().voidItem() + .emitOn(context::runOnContext) + .onItem().transformToMulti(ignore -> Multi.createFrom().> emitter(emitter -> { + try { + try (final var fetchConsumer = fetchConsumer(consumerContext, + configuration.fetchTimeout().orElse(null))) { + var message = fetchConsumer.nextMessage(); + while (message != null) { + emitter.emit(messageMapper.of( + message, + configuration.payloadType().orElse(null), + context, + new ExponentialBackoff(false, Duration.ZERO), + configuration.ackTimeout())); + message = fetchConsumer.nextMessage(); + } + emitter.complete(); + } + } catch (Throwable failure) { + emitter.fail(new FetchException(failure)); } - emitter.complete(); - } - } catch (Throwable failure) { - emitter.fail(new FetchException(failure)); - } - }) - .emitOn(context::runOnContext); + })) + .onItem().transformToUniAndMerge(tracer::withTrace); } private io.nats.client.Connection connect(ConnectionConfiguration configuration) throws ConnectionException { diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionFactory.java index bda8765f..8c079d6d 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionFactory.java @@ -3,8 +3,6 @@ import static io.nats.client.Options.DEFAULT_MAX_RECONNECT; import static io.nats.client.Options.DEFAULT_RECONNECT_WAIT; -import java.util.Optional; - import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -15,34 +13,24 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.unchecked.Unchecked; -import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; -import io.vertx.mutiny.core.Context; -import io.vertx.mutiny.core.Vertx; @ApplicationScoped public class DefaultConnectionFactory implements ConnectionFactory { private final static Logger logger = Logger.getLogger(DefaultConnectionFactory.class); - private final ExecutionHolder executionHolder; private final MessageMapper messageMapper; - private final JetStreamInstrument instrumenter; private final PayloadMapper payloadMapper; private final ConsumerMapper consumerMapper; private final StreamStateMapper streamStateMapper; @Inject - public DefaultConnectionFactory(final ExecutionHolder executionHolder, - final MessageMapper messageMapper, - final JetStreamInstrument instrumenter, + public DefaultConnectionFactory(final MessageMapper messageMapper, final PayloadMapper payloadMapper, final ConsumerMapper consumerMapper, final StreamStateMapper streamStateMapper) { - this.executionHolder = executionHolder; this.messageMapper = messageMapper; - this.instrumenter = instrumenter; this.payloadMapper = payloadMapper; this.consumerMapper = consumerMapper; this.streamStateMapper = streamStateMapper; @@ -51,11 +39,8 @@ public DefaultConnectionFactory(final ExecutionHolder executionHolder, @Override public Uni create(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) { - return getContext() - .onItem().transformToUni( - context -> Uni.createFrom().item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, - connectionListener, context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, - instrumenter)))) + return Uni.createFrom().item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, + connectionListener, messageMapper, payloadMapper, consumerMapper, streamStateMapper))) .onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage())) .onFailure() .retry() @@ -63,12 +48,4 @@ public Uni create(ConnectionConfiguration connectionConfig .atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_MAX_RECONNECT)); } - private Optional getVertx() { - return Optional.ofNullable(executionHolder.vertx()); - } - - private Uni getContext() { - return Uni.createFrom().item(Unchecked.supplier( - () -> getVertx().map(Vertx::getOrCreateContext).orElseThrow(() -> new ContextException("No Vertx available")))); - } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/JetStreamMessage.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/JetStreamMessage.java new file mode 100644 index 00000000..7c242e77 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/JetStreamMessage.java @@ -0,0 +1,57 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessage; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import lombok.Builder; + +@Builder +public record JetStreamMessage(String stream, + String subject, + String messageId, + Map> headers, + byte[] payload, + Long streamSequence, + Long consumerSequence, + String consumer, + Long deliveredCount) { + + public static final String MESSAGE_TYPE_HEADER = "message.type"; + + public static JetStreamMessage of(PublishMessage message) { + return JetStreamMessage.builder() + .stream(message.getStream()) + .subject(message.getSubject()) + .messageId(message.messageId()) + .headers(message.headers()) + .payload(message.getData()) + .streamSequence(message.getStreamSequence()) + .consumerSequence(message.getConsumerSequence()) + .consumer(message.getConsumer()) + .deliveredCount(message.getDeliveredCount()) + .build(); + } + + public static JetStreamMessage of(byte[] payload, Class type, SubscribeMessageMetadata metadata, + PublishConfiguration configuration) { + final var messageId = metadata != null && metadata.messageId() != null ? metadata.messageId() + : UUID.randomUUID().toString(); + final var subject = configuration.subject(); + final var headers = new HashMap>(); + if (type != null) { + headers.putIfAbsent(MESSAGE_TYPE_HEADER, List.of(type.getTypeName())); + } + return JetStreamMessage.builder() + .stream(configuration.stream()) + .subject(subject) + .messageId(messageId) + .headers(headers) + .payload(payload) + .build(); + } +} \ No newline at end of file diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscription.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscription.java index 77f57744..b248455a 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscription.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscription.java @@ -7,9 +7,10 @@ import io.nats.client.Dispatcher; import io.nats.client.JetStreamSubscription; -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; import io.smallrye.mutiny.Multi; import io.vertx.mutiny.core.Context; @@ -22,7 +23,6 @@ public class PushSubscription

implements Subscription

{ private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory; private final io.nats.client.Connection natsConnection; private final MessageMapper messageMapper; - private final Context context; private volatile JetStreamSubscription subscription; private volatile Dispatcher dispatcher; @@ -30,19 +30,16 @@ public class PushSubscription

implements Subscription

{ PushSubscription(final Connection connection, final PushConsumerConfiguration

consumerConfiguration, final io.nats.client.Connection natsConnection, - final MessageMapper messageMapper, - final Context context) { + final MessageMapper messageMapper) { this.connection = connection; this.consumerConfiguration = consumerConfiguration; this.pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory(); this.natsConnection = natsConnection; this.messageMapper = messageMapper; - this.context = context; } @Override - public Multi> subscribe() { - boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); + public Multi> subscribe(Tracer

tracer, Context context) { Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); final var subject = consumerConfiguration.subject(); return Multi.createFrom(). emitter(emitter -> { @@ -68,13 +65,13 @@ public Multi> subscribe() { .emitOn(context::runOnContext) .map(message -> messageMapper.of( message, - traceEnabled, payloadType, context, new ExponentialBackoff( consumerConfiguration.consumerConfiguration().exponentialBackoff(), consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), - consumerConfiguration.consumerConfiguration().ackTimeout())); + consumerConfiguration.consumerConfiguration().ackTimeout())) + .onItem().transformToUniAndMerge(tracer::withTrace); } @Override diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscription.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscription.java index 9516b864..2c19f0ed 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscription.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscription.java @@ -12,8 +12,9 @@ import io.nats.client.JetStreamReader; import io.nats.client.JetStreamStatusException; import io.nats.client.JetStreamSubscription; -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -27,35 +28,32 @@ class ReaderSubscription

implements Subscription

{ private final JetStreamReader reader; private final JetStreamSubscription subscription; private final MessageMapper messageMapper; - private final Context context; private final AtomicBoolean closed; ReaderSubscription(Connection connection, ReaderConsumerConfiguration

consumerConfiguration, JetStreamSubscription subscription, JetStreamReader reader, - MessageMapper messageMapper, - Context context) { + MessageMapper messageMapper) { this.connection = connection; this.consumerConfiguration = consumerConfiguration; this.subscription = subscription; this.reader = reader; this.messageMapper = messageMapper; - this.context = context; this.closed = new AtomicBoolean(false); } + @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override - public Multi> subscribe() { - boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); + public Multi> subscribe(Tracer

tracer, Context context) { Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return Multi.createBy().repeating() .uni(this::readNextMessage) .whilst(message -> true) .runSubscriptionOn(pullExecutor) - .emitOn(context::runOnContext) - .flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, context)); + .flatMap(message -> createMulti(message.orElse(null), payloadType, context)) + .onItem().transformToUniAndMerge(tracer::withTrace); } @Override @@ -110,12 +108,12 @@ private Uni> readNextMessage() { } private Multi> createMulti(io.nats.client.Message message, - boolean tracingEnabled, Class

payloadType, Context context) { + Class

payloadType, Context context) { if (message == null || message.getData() == null) { return Multi.createFrom().empty(); } else { return Multi.createFrom() - .item(() -> messageMapper.of(message, tracingEnabled, payloadType, context, + .item(() -> messageMapper.of(message, payloadType, context, new ExponentialBackoff( consumerConfiguration.consumerConfiguration().exponentialBackoff(), consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java index bfd943e3..f651a5c4 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java @@ -2,10 +2,12 @@ import org.eclipse.microprofile.reactive.messaging.Message; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer; import io.smallrye.mutiny.Multi; +import io.vertx.mutiny.core.Context; public interface Subscription extends ConnectionListener { - Multi> subscribe(); + Multi> subscribe(Tracer tracer, Context context); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoff.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ExponentialBackoff.java similarity index 83% rename from runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoff.java rename to runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ExponentialBackoff.java index aa2ff07f..d9bb7167 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoff.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ExponentialBackoff.java @@ -1,4 +1,4 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream; +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; import java.time.Duration; diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/JetStreamMessage.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/JetStreamMessage.java index 4fd8d118..861a5fa9 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/JetStreamMessage.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/JetStreamMessage.java @@ -1,112 +1,18 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; -import static io.quarkiverse.reactive.messaging.nats.jetstream.mapper.HeaderMapper.toMessageHeaders; -import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; - import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.eclipse.microprofile.reactive.messaging.Metadata; -import org.jboss.logging.Logger; - -import io.nats.client.api.MessageInfo; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata; - -public class JetStreamMessage implements io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamMessage { - - private static final Logger logger = Logger.getLogger(JetStreamMessage.class); - - private final MessageInfo message; - private Metadata metadata; - private final JetStreamIncomingMessageMetadata incomingMetadata; - private final T payload; - - public JetStreamMessage(final MessageInfo message, final T payload) { - this.message = message; - this.incomingMetadata = JetStreamIncomingMessageMetadata.of(message); - this.metadata = captureContextMetadata(incomingMetadata); - this.payload = payload; - } - - @Override - public Metadata getMetadata() { - return metadata; - } - - public String getMessageId() { - return incomingMetadata.messageId(); - } - - public byte[] getData() { - return message.getData(); - } - - public String getSubject() { - return incomingMetadata.subject(); - } - - public String getStream() { - return incomingMetadata.stream(); - } - - public Map> getHeaders() { - return toMessageHeaders(message.getHeaders()); - } - - @Override - public T getPayload() { - return payload; - } - - @Override - public Supplier> getAck() { - return this::ack; - } - - @Override - public CompletionStage ack() { - return CompletableFuture.supplyAsync(() -> { - logger.debugf("Message with id = %s acknowledged", getMessageId()); - return null; - }); - } - @Override - public CompletionStage nack(Throwable reason, Metadata metadata) { - return CompletableFuture.supplyAsync(() -> { - logger.errorf(reason, "Message with id = %s not acknowledged", getMessageId()); - throw new RuntimeException(reason); - }); - } +import org.eclipse.microprofile.reactive.messaging.Message; - @Override - public Function> getNack() { - return this::nack; - } +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; - @Override - public synchronized void injectMetadata(Object metadataObject) { - this.metadata = metadata.with(metadataObject); - } +public interface JetStreamMessage extends Message, ContextAwareMessage, MetadataInjectableMessage { + String MESSAGE_TYPE_HEADER = "message.type"; - @Override - public org.eclipse.microprofile.reactive.messaging.Message addMetadata(Object metadata) { - injectMetadata(metadata); - return this; - } + String messageId(); - @Override - public String toString() { - return "JetStreamMessage{" + - "message=" + message + - ", metadata=" + metadata + - ", incomingMetadata=" + incomingMetadata + - ", payload=" + payload + - '}'; - } + Map> headers(); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/Message.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/Message.java new file mode 100644 index 00000000..cd794c22 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/Message.java @@ -0,0 +1,55 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import lombok.Builder; + +@Builder +public record Message(String stream, + String subject, + String messageId, + Map> headers, + byte[] payload, + Long streamSequence, + Long consumerSequence, + String consumer, + Long deliveredCount) { + + public static final String MESSAGE_TYPE_HEADER = "message.type"; + + public static io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamMessage of(PublishMessage message) { + return io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamMessage.builder() + .stream(message.getStream()) + .subject(message.getSubject()) + .messageId(message.messageId()) + .headers(message.headers()) + .payload(message.getData()) + .streamSequence(message.getStreamSequence()) + .consumerSequence(message.getConsumerSequence()) + .consumer(message.getConsumer()) + .deliveredCount(message.getDeliveredCount()) + .build(); + } + + public static io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamMessage of(byte[] payload, Class type, + SubscribeMessageMetadata metadata, PublishConfiguration configuration) { + final var messageId = metadata != null && metadata.messageId() != null ? metadata.messageId() + : UUID.randomUUID().toString(); + final var subject = configuration.subject(); + final var headers = new HashMap>(); + if (type != null) { + headers.putIfAbsent(MESSAGE_TYPE_HEADER, List.of(type.getTypeName())); + } + return io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamMessage.builder() + .stream(configuration.stream()) + .subject(subject) + .messageId(messageId) + .headers(headers) + .payload(payload) + .build(); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamIncomingMessage.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/PublishMessage.java similarity index 88% rename from runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamIncomingMessage.java rename to runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/PublishMessage.java index 5047b7c5..0138fe22 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamIncomingMessage.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/PublishMessage.java @@ -1,4 +1,4 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream; +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; import static io.quarkiverse.reactive.messaging.nats.jetstream.mapper.HeaderMapper.toMessageHeaders; import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; @@ -17,22 +17,22 @@ import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.vertx.mutiny.core.Context; -public class JetStreamIncomingMessage implements JetStreamMessage { +public class PublishMessage implements JetStreamMessage { private final Message message; private Metadata metadata; - private final JetStreamIncomingMessageMetadata incomingMetadata; + private final PublishMessageMetadata incomingMetadata; private final T payload; private final Context context; private final ExponentialBackoff exponentialBackoff; private final Duration ackTimeout; - public JetStreamIncomingMessage(final Message message, + public PublishMessage(final Message message, final T payload, Context context, ExponentialBackoff exponentialBackoff, Duration ackTimeout) { this.message = message; - this.incomingMetadata = JetStreamIncomingMessageMetadata.of(message); + this.incomingMetadata = PublishMessageMetadata.of(message); this.exponentialBackoff = exponentialBackoff; this.metadata = captureContextMetadata(incomingMetadata); this.payload = payload; @@ -45,7 +45,7 @@ public Metadata getMetadata() { return metadata; } - public String getMessageId() { + public String messageId() { return incomingMetadata.messageId(); } @@ -77,7 +77,7 @@ public Long getDeliveredCount() { return incomingMetadata.deliveredCount(); } - public Map> getHeaders() { + public Map> headers() { return toMessageHeaders(message.getHeaders()); } @@ -107,7 +107,7 @@ public CompletionStage ack() { public CompletionStage nack(Throwable reason, Metadata metadata) { return VertxContext.runOnContext(context.getDelegate(), f -> { if (exponentialBackoff.enabled()) { - metadata.get(JetStreamIncomingMessageMetadata.class) + metadata.get(PublishMessageMetadata.class) .ifPresentOrElse(m -> message.nakWithDelay(exponentialBackoff.getDuration(m.deliveredCount())), message::nak); } else { diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamIncomingMessageMetadata.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/PublishMessageMetadata.java similarity index 79% rename from runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamIncomingMessageMetadata.java rename to runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/PublishMessageMetadata.java index 23ac5ec5..cc36aa5d 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamIncomingMessageMetadata.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/PublishMessageMetadata.java @@ -1,4 +1,4 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream; +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; import static io.nats.client.support.NatsJetStreamConstants.MSG_ID_HDR; @@ -15,7 +15,7 @@ import lombok.Builder; @Builder -public record JetStreamIncomingMessageMetadata(String stream, +public record PublishMessageMetadata(String stream, String subject, String messageId, Map> headers, @@ -25,13 +25,13 @@ public record JetStreamIncomingMessageMetadata(String stream, Long consumerSequence, ZonedDateTime timestamp) { - public static JetStreamIncomingMessageMetadata of(@NotNull Message message) { + public static PublishMessageMetadata of(@NotNull Message message) { final var headers = Optional.ofNullable(message.getHeaders()); - return JetStreamIncomingMessageMetadata.builder() + return PublishMessageMetadata.builder() .stream(message.metaData().getStream()) .subject(message.getSubject()) .messageId(headers.map(h -> h.getFirst(MSG_ID_HDR)).orElse(null)) - .headers(headers.map(JetStreamIncomingMessageMetadata::headers).orElseGet(HashMap::new)) + .headers(headers.map(PublishMessageMetadata::headers).orElseGet(HashMap::new)) .deliveredCount(message.metaData().deliveredCount()) .consumer(message.metaData().getConsumer()) .streamSequence(message.metaData().streamSequence()) @@ -40,13 +40,13 @@ public static JetStreamIncomingMessageMetadata of(@NotNull Message message) { .build(); } - public static JetStreamIncomingMessageMetadata of(MessageInfo message) { + public static PublishMessageMetadata of(MessageInfo message) { final var headers = Optional.ofNullable(message.getHeaders()); - return JetStreamIncomingMessageMetadata.builder() + return PublishMessageMetadata.builder() .stream(message.getStream()) .subject(message.getSubject()) .messageId(headers.map(h -> h.getFirst(MSG_ID_HDR)).orElse(null)) - .headers(headers.map(JetStreamIncomingMessageMetadata::headers).orElseGet(HashMap::new)) + .headers(headers.map(PublishMessageMetadata::headers).orElseGet(HashMap::new)) .deliveredCount(null) .consumer(null) .streamSequence(message.getSeq()) diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ResolvedMessage.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ResolvedMessage.java new file mode 100644 index 00000000..726f8609 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ResolvedMessage.java @@ -0,0 +1,111 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; + +import static io.quarkiverse.reactive.messaging.nats.jetstream.mapper.HeaderMapper.toMessageHeaders; +import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.jboss.logging.Logger; + +import io.nats.client.api.MessageInfo; + +public class ResolvedMessage implements JetStreamMessage { + + private static final Logger logger = Logger.getLogger(ResolvedMessage.class); + + private final MessageInfo message; + private Metadata metadata; + private final PublishMessageMetadata incomingMetadata; + private final T payload; + + public ResolvedMessage(final MessageInfo message, final T payload) { + this.message = message; + this.incomingMetadata = PublishMessageMetadata.of(message); + this.metadata = captureContextMetadata(incomingMetadata); + this.payload = payload; + } + + @Override + public Metadata getMetadata() { + return metadata; + } + + public String messageId() { + return incomingMetadata.messageId(); + } + + public byte[] getData() { + return message.getData(); + } + + public String getSubject() { + return incomingMetadata.subject(); + } + + public String getStream() { + return incomingMetadata.stream(); + } + + public Map> headers() { + return toMessageHeaders(message.getHeaders()); + } + + @Override + public T getPayload() { + return payload; + } + + @Override + public Supplier> getAck() { + return this::ack; + } + + @Override + public CompletionStage ack() { + return CompletableFuture.supplyAsync(() -> { + logger.debugf("Message with id = %s acknowledged", messageId()); + return null; + }); + } + + @Override + public CompletionStage nack(Throwable reason, Metadata metadata) { + return CompletableFuture.supplyAsync(() -> { + logger.errorf(reason, "Message with id = %s not acknowledged", messageId()); + throw new RuntimeException(reason); + }); + } + + @Override + public Function> getNack() { + return this::nack; + } + + @Override + public synchronized void injectMetadata(Object metadataObject) { + this.metadata = metadata.with(metadataObject); + } + + @Override + public org.eclipse.microprofile.reactive.messaging.Message addMetadata(Object metadata) { + injectMetadata(metadata); + return this; + } + + @Override + public String toString() { + return "JetStreamMessage{" + + "message=" + message + + ", metadata=" + metadata + + ", incomingMetadata=" + incomingMetadata + + ", payload=" + payload + + '}'; + } + +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessage.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessage.java new file mode 100644 index 00000000..a58c5e6b --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessage.java @@ -0,0 +1,47 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; + +import java.util.*; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; +import lombok.Builder; + +@Builder +public record SubscribeMessage(byte[] payload, String type, PublishConfiguration configuration, String messageId, + Map> headers, Message message) implements JetStreamMessage { + + @Override + public void injectMetadata(Object o) { + if (message instanceof MetadataInjectableMessage metadataInjectableMessage) { + metadataInjectableMessage.injectMetadata(o); + } + } + + @Override + public T getPayload() { + return message.getPayload(); + } + + public static SubscribeMessage of(Message message, byte[] payload, PublishConfiguration configuration) { + final var metadata = message.getMetadata(SubscribeMessageMetadata.class).orElse(null); + final var type = message.getPayload() != null ? message.getPayload().getClass().getTypeName() : null; + final var headers = metadata != null && metadata.headers() != null ? new HashMap<>(metadata.headers()) + : new HashMap>(); + if (type != null) { + headers.put(MESSAGE_TYPE_HEADER, List.of(type)); + } + + return SubscribeMessage. builder() + .payload(payload) + .message(message) + .type(type) + .configuration(configuration) + .messageId( + metadata != null && metadata.messageId() != null ? metadata.messageId() : UUID.randomUUID().toString()) + .headers(headers) + .build(); + } + +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessageMetadata.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessageMetadata.java new file mode 100644 index 00000000..e747e1b7 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/SubscribeMessageMetadata.java @@ -0,0 +1,27 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.api; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import lombok.Builder; + +@Builder +public record SubscribeMessageMetadata(String messageId, Map> headers) { + + public static SubscribeMessageMetadata of(final String messageId, + final Map> headers) { + return SubscribeMessageMetadata.builder() + .messageId(messageId) + .headers(headers != null ? headers : Collections.emptyMap()) + .build(); + } + + public static SubscribeMessageMetadata of(final String messageId) { + return SubscribeMessageMetadata.builder() + .messageId(messageId) + .headers(Collections.emptyMap()) + .build(); + } + +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java index 9caeca5b..8ae4494c 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java @@ -54,8 +54,6 @@ public interface ConsumerConfiguration { Optional> payloadType(); - boolean traceEnabled(); - boolean exponentialBackoff(); Duration exponentialBackoffMaxDuration(); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PublishConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PublishConfiguration.java index dbcaa6ec..84d4a85a 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PublishConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PublishConfiguration.java @@ -2,8 +2,6 @@ public interface PublishConfiguration { - boolean traceEnabled(); - String stream(); String subject(); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/DefaultTracer.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/DefaultTracer.java new file mode 100644 index 00000000..3dcf4d3c --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/DefaultTracer.java @@ -0,0 +1,105 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; + +import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE; +import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor.create; +import static io.smallrye.reactive.messaging.tracing.TracingUtils.getOpenTelemetry; + +import jakarta.enterprise.inject.Instance; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamBuildConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessage; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessage; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.tuples.Tuple2; +import io.smallrye.mutiny.unchecked.Unchecked; +import io.smallrye.reactive.messaging.tracing.TracingUtils; + +public class DefaultTracer implements Tracer { + private final JetStreamBuildConfiguration configuration; + private final PayloadMapper payloadMapper; + private final OpenTelemetry openTelemetry; + private final Instrumenter, Void> receiver; + private final Instrumenter, Void> publisher; + + public DefaultTracer(Instance openTelemetryInstance, + JetStreamBuildConfiguration configuration, + PayloadMapper payloadMapper) { + this.configuration = configuration; + this.payloadMapper = payloadMapper; + this.openTelemetry = getOpenTelemetry(openTelemetryInstance); + this.receiver = receiver(openTelemetry); + this.publisher = publisher(openTelemetry); + } + + @Override + public Uni> withTrace(Message message, PublishConfiguration configuration) { + return Uni.createFrom().item(Unchecked.supplier(() -> createSubscribeMessage(message, configuration))) + .onItem().transformToUni(this::traceOutgoing); + } + + @Override + public Uni> withTrace(Message message) { + if (configuration.trace()) { + if (message instanceof PublishMessage publishMessage) { + return Uni.createFrom().item(Unchecked.supplier(() -> { + TracingUtils.traceIncoming(receiver, publishMessage, publishMessage); + return message; + })); + } + } + return Uni.createFrom().item(message); + } + + private Uni> traceOutgoing(final Tuple2, Message> tuple) { + if (configuration.trace()) { + return Uni.createFrom().item(Unchecked.supplier(() -> { + TracingUtils.traceOutgoing(publisher, tuple.getItem2(), tuple.getItem1()); + return tuple.getItem1(); + })); + } else { + return Uni.createFrom().item(tuple.getItem1()); + } + } + + private Instrumenter, Void> publisher(OpenTelemetry openTelemetry) { + final var attributesExtractor = new SubscribeMessageAttributesExtractor(); + MessagingAttributesGetter, Void> messagingAttributesGetter = attributesExtractor + .getMessagingAttributesGetter(); + InstrumenterBuilder, Void> builder = Instrumenter.builder(openTelemetry, + "io.smallrye.reactive.messaging.jetstream", + MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.SEND)); + return builder.addAttributesExtractor(create(messagingAttributesGetter, MessageOperation.SEND)) + .addAttributesExtractor(attributesExtractor) + .buildProducerInstrumenter(new SubscribeMessageTextMapSetter<>()); + } + + private Instrumenter, Void> receiver(OpenTelemetry openTelemetry) { + final var attributesExtractor = new PublishMessageAttributesExtractor(); + MessagingAttributesGetter, Void> messagingAttributesGetter = attributesExtractor + .getMessagingAttributesGetter(); + InstrumenterBuilder, Void> builder = Instrumenter.builder(openTelemetry, + "io.smallrye.reactive.messaging.jetstream", + MessagingSpanNameExtractor.create(messagingAttributesGetter, RECEIVE)); + + return builder.addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, RECEIVE)) + .buildConsumerInstrumenter(new PublishMessageTextMapGetter<>()); + } + + private Tuple2, Message> createSubscribeMessage(Message message, + PublishConfiguration configuration) { + final var payload = payloadMapper.of(message.getPayload()); + return Tuple2.of(SubscribeMessage.of(message, payload, configuration), message); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageAttributesExtractor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageAttributesExtractor.java new file mode 100644 index 00000000..051fafd5 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageAttributesExtractor.java @@ -0,0 +1,88 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessage; + +@SuppressWarnings("LombokGetterMayBeUsed") +public class PublishMessageAttributesExtractor implements AttributesExtractor, Void> { + private static final String MESSAGE_PAYLOAD = "message.payload"; + private static final String MESSAGE_STREAM_SEQUENCE = "message.stream_sequence"; + private static final String MESSAGE_CONSUMER_SEQUENCE = "message.consumer_sequence"; + private static final String MESSAGE_CONSUMER = "message.consumer"; + private static final String MESSAGE_DELIVERED_COUNT = "message.delivered_count"; + + private final MessagingAttributesGetter, Void> messagingAttributesGetter; + + public PublishMessageAttributesExtractor() { + this.messagingAttributesGetter = new JetStreamMessagingAttributesGetter<>(); + } + + @Override + public void onStart(AttributesBuilder attributesBuilder, Context context, PublishMessage message) { + attributesBuilder.put(MESSAGE_PAYLOAD, new String(message.getData())); + if (message.getStreamSequence() != null) { + attributesBuilder.put(MESSAGE_STREAM_SEQUENCE, message.getStreamSequence()); + } + if (message.getConsumerSequence() != null) { + attributesBuilder.put(MESSAGE_CONSUMER_SEQUENCE, message.getConsumerSequence()); + } + if (message.getConsumer() != null) { + attributesBuilder.put(MESSAGE_CONSUMER, message.getConsumer()); + } + if (message.getDeliveredCount() != null) { + attributesBuilder.put(MESSAGE_DELIVERED_COUNT, message.getDeliveredCount()); + } + } + + @Override + public void onEnd(AttributesBuilder attributes, Context context, PublishMessage message, Void unused, Throwable error) { + + } + + public MessagingAttributesGetter, Void> getMessagingAttributesGetter() { + return messagingAttributesGetter; + } + + private final static class JetStreamMessagingAttributesGetter + implements MessagingAttributesGetter, Void> { + + @Override + public String getSystem(PublishMessage trace) { + return "jetstream"; + } + + @Override + public String getDestination(PublishMessage trace) { + return String.format("%s.%s", trace.getStream(), trace.getSubject()); + } + + @Override + public boolean isTemporaryDestination(PublishMessage trace) { + return false; + } + + @Override + public String getConversationId(PublishMessage trace) { + return null; + } + + @Override + public Long getMessagePayloadSize(PublishMessage trace) { + return null; + } + + @Override + public Long getMessagePayloadCompressedSize(PublishMessage trace) { + return null; + } + + @Override + public String getMessageId(PublishMessage trace, Void unused) { + return trace.messageId(); + } + + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapGetter.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageTextMapGetter.java similarity index 62% rename from runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapGetter.java rename to runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageTextMapGetter.java index d246f32d..256639a5 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapGetter.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/PublishMessageTextMapGetter.java @@ -1,16 +1,14 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.tracing; +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; import java.util.Collections; -import jakarta.annotation.Nullable; - import io.opentelemetry.context.propagation.TextMapGetter; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessage; -public enum JetStreamTraceTextMapGetter implements TextMapGetter { - INSTANCE; +public class PublishMessageTextMapGetter implements TextMapGetter> { @Override - public Iterable keys(JetStreamTrace carrier) { + public Iterable keys(PublishMessage carrier) { final var headers = carrier.headers(); if (headers != null) { return headers.keySet(); @@ -19,7 +17,7 @@ public Iterable keys(JetStreamTrace carrier) { } @Override - public String get(@Nullable JetStreamTrace carrier, String key) { + public String get(PublishMessage carrier, String key) { if (carrier != null) { final var headers = carrier.headers(); if (headers != null) { diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageAttributesExtractor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageAttributesExtractor.java new file mode 100644 index 00000000..6aa48e0e --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageAttributesExtractor.java @@ -0,0 +1,72 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessage; + +public class SubscribeMessageAttributesExtractor implements AttributesExtractor, Void> { + private static final String MESSAGE_PAYLOAD = "message.payload"; + + private final MessagingAttributesGetter, Void> attributesGetter; + + public SubscribeMessageAttributesExtractor() { + this.attributesGetter = new SubscribeMessagingAttributesGetter<>(); + } + + @Override + public void onStart(AttributesBuilder attributes, Context parentContext, SubscribeMessage message) { + attributes.put(MESSAGE_PAYLOAD, new String(message.payload())); + } + + @Override + public void onEnd(AttributesBuilder attributes, Context context, SubscribeMessage tSubscribeMessage, Void unused, + Throwable error) { + + } + + public MessagingAttributesGetter, Void> getMessagingAttributesGetter() { + return attributesGetter; + } + + private final static class SubscribeMessagingAttributesGetter + implements MessagingAttributesGetter, Void> { + + @Override + public String getSystem(SubscribeMessage message) { + return "jetstream"; + } + + @Override + public String getDestination(SubscribeMessage message) { + return String.format("%s.%s", message.configuration().stream(), message.configuration().subject()); + } + + @Override + public boolean isTemporaryDestination(SubscribeMessage message) { + return false; + } + + @Override + public String getConversationId(SubscribeMessage message) { + return null; + } + + @Override + public Long getMessagePayloadSize(SubscribeMessage message) { + return (long) message.payload().length; + } + + @Override + public Long getMessagePayloadCompressedSize(SubscribeMessage message) { + return null; + } + + @Override + public String getMessageId(SubscribeMessage message, Void unused) { + return message.messageId(); + } + + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageTextMapSetter.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageTextMapSetter.java new file mode 100644 index 00000000..3d0ff286 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/SubscribeMessageTextMapSetter.java @@ -0,0 +1,19 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; + +import java.util.List; + +import io.opentelemetry.context.propagation.TextMapSetter; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessage; + +public class SubscribeMessageTextMapSetter implements TextMapSetter> { + + @Override + public void set(SubscribeMessage carrier, final String key, final String value) { + if (carrier != null) { + final var headers = carrier.headers(); + if (headers != null) { + headers.put(key, List.of(value)); + } + } + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/Tracer.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/Tracer.java new file mode 100644 index 00000000..dc638a7e --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/Tracer.java @@ -0,0 +1,15 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessage; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.smallrye.mutiny.Uni; + +public interface Tracer { + + Uni> withTrace(Message message, PublishConfiguration configuration); + + Uni> withTrace(Message message); + +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/TracerFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/TracerFactory.java new file mode 100644 index 00000000..b6d9bd56 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/tracing/TracerFactory.java @@ -0,0 +1,27 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; + +import io.opentelemetry.api.OpenTelemetry; +import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamBuildConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper; + +@ApplicationScoped +public class TracerFactory { + private final JetStreamBuildConfiguration configuration; + private final PayloadMapper payloadMapper; + private final Instance openTelemetry; + + public TracerFactory(Instance openTelemetryInstance, + JetStreamBuildConfiguration configuration, + PayloadMapper payloadMapper) { + this.configuration = configuration; + this.payloadMapper = payloadMapper; + this.openTelemetry = openTelemetryInstance; + } + + public Tracer create() { + return new DefaultTracer<>(openTelemetry, configuration, payloadMapper); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java index b6c6dc8b..517f18c9 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java @@ -1,63 +1,43 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.mapper; -import static io.smallrye.reactive.messaging.tracing.TracingUtils.traceIncoming; - import java.time.Duration; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessage; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessage; import io.quarkus.arc.DefaultBean; import io.vertx.mutiny.core.Context; @ApplicationScoped @DefaultBean public class DefaultMessageMapper implements MessageMapper { - - public static final String MESSAGE_TYPE_HEADER = "message.type"; - private final PayloadMapper payloadMapper; - private final JetStreamInstrument instrumenter; @Inject - public DefaultMessageMapper(PayloadMapper payloadMapper, - JetStreamInstrument instrumenter) { + public DefaultMessageMapper(PayloadMapper payloadMapper) { this.payloadMapper = payloadMapper; - this.instrumenter = instrumenter; } @SuppressWarnings("unchecked") @Override public org.eclipse.microprofile.reactive.messaging.Message of( io.nats.client.Message message, - boolean tracingEnabled, Class payloadType, Context context, ExponentialBackoff exponentialBackoff, Duration ackTimeout) { try { - final var incomingMessage = payloadType != null - ? new JetStreamIncomingMessage<>(message, payloadMapper.of(message, payloadType), context, + return payloadType != null + ? new PublishMessage<>(message, payloadMapper.of(message, payloadType), context, exponentialBackoff, ackTimeout) - : new JetStreamIncomingMessage<>(message, - payloadMapper.of(message).orElseGet(() -> message.getData()), + : new PublishMessage<>(message, + (T) payloadMapper.of(message).orElseGet(message::getData), context, exponentialBackoff, ackTimeout); - if (tracingEnabled) { - return (Message) traceIncoming(instrumenter.receiver(), incomingMessage, - JetStreamTrace.trace(incomingMessage)); - } else { - return (Message) incomingMessage; - } } catch (ClassCastException e) { throw new RuntimeException(e); } } - } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultPayloadMapper.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultPayloadMapper.java index 32ca6507..78bac314 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultPayloadMapper.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultPayloadMapper.java @@ -1,6 +1,6 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.mapper; -import static io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper.MESSAGE_TYPE_HEADER; +import static io.quarkiverse.reactive.messaging.nats.jetstream.client.api.JetStreamMessage.MESSAGE_TYPE_HEADER; import java.io.IOException; import java.util.Optional; diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/MessageMapper.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/MessageMapper.java index 0f5c38d3..8378a8ab 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/MessageMapper.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/MessageMapper.java @@ -2,13 +2,12 @@ import java.time.Duration; -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff; import io.vertx.mutiny.core.Context; public interface MessageMapper { org.eclipse.microprofile.reactive.messaging.Message of(io.nats.client.Message message, - boolean tracingEnabled, Class payloadType, Context context, ExponentialBackoff exponentialBackoff, diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java index df867a8f..dcff6acd 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java @@ -187,11 +187,6 @@ public Duration exponentialBackoffMaxDuration() { : null; } - @Override - public boolean traceEnabled() { - return configuration.getTraceEnabled(); - } - @Override public Duration ackTimeout() { return Duration.parse(configuration.getAckTimeout()); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java index 3dec3891..076e929b 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java @@ -202,11 +202,6 @@ public Duration exponentialBackoffMaxDuration() { : null; } - @Override - public boolean traceEnabled() { - return configuration.getTraceEnabled(); - } - @Override public Duration ackTimeout() { return Duration.parse(configuration.getAckTimeout()); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java index 0ffecad0..534ea040 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java @@ -7,6 +7,8 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.*; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; import io.smallrye.mutiny.Multi; @@ -21,9 +23,13 @@ public abstract class MessagePublisherProcessor implements MessageProcessor, private final ConnectionFactory connectionFactory; private final ConnectionConfiguration connectionConfiguration; private final AtomicReference> subscription; + private final Tracer tracer; + private final Context context; public MessagePublisherProcessor(final ConnectionFactory connectionFactory, - final ConnectionConfiguration connectionConfiguration) { + final ConnectionConfiguration connectionConfiguration, + final TracerFactory tracerFactory, + final Context context) { this.readiness = new AtomicReference<>( Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build()); this.liveness = new AtomicReference<>( @@ -32,6 +38,8 @@ public MessagePublisherProcessor(final ConnectionFactory connectionFactory, this.connectionFactory = connectionFactory; this.connectionConfiguration = connectionConfiguration; this.subscription = new AtomicReference<>(); + this.tracer = tracerFactory.create(); + this.context = context; } @Override @@ -82,6 +90,7 @@ public void onEvent(ConnectionEvent event, String message) { protected abstract Uni> subscription(Connection connection); + @SuppressWarnings("ReactiveStreamsUnusedPublisher") private Multi> recover(Throwable failure) { return Uni.createFrom(). item(() -> { close(this.subscription.getAndSet(null)); @@ -94,7 +103,7 @@ private Multi> subscribe( return getOrEstablishConnection() .onItem().transformToUni(this::subscription) .onItem().invoke(this.subscription::set) - .onItem().transformToMulti(Subscription::subscribe) + .onItem().transformToMulti(subscription -> context.withContext(ctx -> subscription.subscribe(tracer, ctx))) .onSubscription().invoke(() -> logger.infof("Subscribed to channel %s", configuration().channel())); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java index 67f476ce..9d42b1d9 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java @@ -2,8 +2,10 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.smallrye.mutiny.Uni; public class MessagePullPublisherProcessor extends MessagePublisherProcessor { @@ -11,8 +13,10 @@ public class MessagePullPublisherProcessor extends MessagePublisherProcessor< public MessagePullPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, - final MessagePullPublisherConfiguration configuration) { - super(connectionFactory, connectionConfiguration); + final MessagePullPublisherConfiguration configuration, + final TracerFactory tracerFactory, + final Context context) { + super(connectionFactory, connectionConfiguration, tracerFactory, context); this.configuration = configuration; } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java index c17c9ef5..354a472a 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java @@ -2,8 +2,10 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.smallrye.mutiny.Uni; public class MessagePushPublisherProcessor extends MessagePublisherProcessor { @@ -11,8 +13,10 @@ public class MessagePushPublisherProcessor extends MessagePublisherProcessor< public MessagePushPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, - final MessagePushPublisherConfiguration configuration) { - super(connectionFactory, connectionConfiguration); + final MessagePushPublisherConfiguration configuration, + final TracerFactory tracerFactory, + final Context context) { + super(connectionFactory, connectionConfiguration, tracerFactory, context); this.configuration = configuration; } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/DefaultMessageSubscriberConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/DefaultMessageSubscriberConfiguration.java index 575779f8..cfe2149d 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/DefaultMessageSubscriberConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/DefaultMessageSubscriberConfiguration.java @@ -24,9 +24,4 @@ public String subject() { return configuration.getSubject().orElseThrow((() -> new IllegalArgumentException("No subject configured"))); } - @Override - public boolean traceEnabled() { - return configuration.getTraceEnabled(); - } - } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java index 3ae80106..c69804ed 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java @@ -9,6 +9,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.*; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; import io.smallrye.mutiny.Multi; @@ -23,16 +24,22 @@ public class MessageSubscriberProcessor implements MessageProcessor, ConnectionL private final ConnectionFactory connectionFactory; private final AtomicReference status; private final AtomicReference connection; + private final TracerFactory tracerFactory; + private final Context context; public MessageSubscriberProcessor( final ConnectionConfiguration connectionConfiguration, final ConnectionFactory connectionFactory, - final MessageSubscriberConfiguration configuration) { + final MessageSubscriberConfiguration configuration, + final TracerFactory tracerFactory, + final Context context) { this.connectionConfiguration = connectionConfiguration; this.connectionFactory = connectionFactory; this.configuration = configuration; this.status = new AtomicReference<>(new Status(true, "Subscriber processor inactive", ConnectionEvent.Closed)); this.connection = new AtomicReference<>(); + this.tracerFactory = tracerFactory; + this.context = context; } public Flow.Subscriber> subscriber() { @@ -78,7 +85,9 @@ public void onEvent(ConnectionEvent event, String message) { private Uni> publish(final Message message) { return getOrEstablishConnection() - .onItem().transformToUni(connection -> connection.publish(message, configuration)) + .onItem() + .transformToUni(connection -> context + .withContext(ctx -> connection.publish(message, configuration, tracerFactory.create(), ctx))) .onFailure() .invoke(failure -> logger.errorf(failure, "Failed to publish with message: %s", failure.getMessage())) .onFailure().recoverWithUni(() -> recover(message)); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java deleted file mode 100644 index 17f35307..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.tracing; - -import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE; -import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor.create; -import static io.smallrye.reactive.messaging.tracing.TracingUtils.getOpenTelemetry; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Instance; -import jakarta.inject.Inject; - -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; - -@ApplicationScoped -public class JetStreamInstrument { - private final Instance openTelemetryInstance; - - @Inject - public JetStreamInstrument(Instance openTelemetryInstance) { - this.openTelemetryInstance = openTelemetryInstance; - } - - public Instrumenter publisher() { - final var attributesExtractor = new JetStreamTraceAttributesExtractor(); - MessagingAttributesGetter messagingAttributesGetter = attributesExtractor - .getMessagingAttributesGetter(); - - InstrumenterBuilder builder = Instrumenter.builder(getOpenTelemetry(openTelemetryInstance), - "io.smallrye.reactive.messaging.jetstream", - MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.SEND)); - - return builder.addAttributesExtractor(create(messagingAttributesGetter, MessageOperation.SEND)) - .addAttributesExtractor(attributesExtractor) - .buildProducerInstrumenter(JetStreamTraceTextMapSetter.INSTANCE); - } - - public Instrumenter receiver() { - final var attributesExtractor = new JetStreamTraceAttributesExtractor(); - MessagingAttributesGetter messagingAttributesGetter = attributesExtractor - .getMessagingAttributesGetter(); - InstrumenterBuilder builder = Instrumenter.builder(getOpenTelemetry(openTelemetryInstance), - "io.smallrye.reactive.messaging.jetstream", - MessagingSpanNameExtractor.create(messagingAttributesGetter, RECEIVE)); - - return builder.addAttributesExtractor(attributesExtractor) - .addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, RECEIVE)) - .buildConsumerInstrumenter(JetStreamTraceTextMapGetter.INSTANCE); - } - -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTrace.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTrace.java deleted file mode 100644 index 4257e394..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTrace.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.tracing; - -import java.util.List; -import java.util.Map; - -import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessage; -import lombok.Builder; - -@Builder -public record JetStreamTrace(String stream, - String subject, - String messageId, - Map> headers, - String payload, - Long streamSequence, - Long consumerSequence, - String consumer, - Long deliveredCount) { - - public static JetStreamTrace trace(JetStreamIncomingMessage message) { - return JetStreamTrace.builder() - .stream(message.getStream()) - .subject(message.getSubject()) - .messageId(message.getMessageId()) - .headers(message.getHeaders()) - .payload(new String(message.getData())) - .streamSequence(message.getStreamSequence()) - .consumerSequence(message.getConsumerSequence()) - .consumer(message.getConsumer()) - .deliveredCount(message.getDeliveredCount()) - .build(); - } -} \ No newline at end of file diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceAttributesExtractor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceAttributesExtractor.java deleted file mode 100644 index 95d763ff..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceAttributesExtractor.java +++ /dev/null @@ -1,85 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.tracing; - -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; - -public class JetStreamTraceAttributesExtractor implements AttributesExtractor { - private static final String MESSAGE_PAYLOAD = "message.payload"; - private static final String MESSAGE_STREAM_SEQUENCE = "message.stream_sequence"; - private static final String MESSAGE_CONSUMER_SEQUENCE = "message.consumer_sequence"; - private static final String MESSAGE_CONSUMER = "message.consumer"; - private static final String MESSAGE_DELIVERED_COUNT = "message.delivered_count"; - - private final MessagingAttributesGetter messagingAttributesGetter; - - public JetStreamTraceAttributesExtractor() { - this.messagingAttributesGetter = new JetStreamMessagingAttributesGetter(); - } - - @Override - public void onStart(AttributesBuilder attributesBuilder, Context context, JetStreamTrace jetStreamTrace) { - attributesBuilder.put(MESSAGE_PAYLOAD, jetStreamTrace.payload()); - if (jetStreamTrace.streamSequence() != null) { - attributesBuilder.put(MESSAGE_STREAM_SEQUENCE, jetStreamTrace.streamSequence()); - } - if (jetStreamTrace.consumerSequence() != null) { - attributesBuilder.put(MESSAGE_CONSUMER_SEQUENCE, jetStreamTrace.consumerSequence()); - } - if (jetStreamTrace.consumer() != null) { - attributesBuilder.put(MESSAGE_CONSUMER, jetStreamTrace.consumer()); - } - if (jetStreamTrace.deliveredCount() != null) { - attributesBuilder.put(MESSAGE_DELIVERED_COUNT, jetStreamTrace.deliveredCount()); - } - } - - @Override - public void onEnd(AttributesBuilder attributesBuilder, Context context, JetStreamTrace jetStreamTrace, - Void unused, Throwable throwable) { - } - - public MessagingAttributesGetter getMessagingAttributesGetter() { - return messagingAttributesGetter; - } - - private final static class JetStreamMessagingAttributesGetter implements MessagingAttributesGetter { - - @Override - public String getSystem(JetStreamTrace trace) { - return "jetstream"; - } - - @Override - public String getDestination(JetStreamTrace trace) { - return String.format("%s.%s", trace.stream(), trace.subject()); - } - - @Override - public boolean isTemporaryDestination(JetStreamTrace trace) { - return false; - } - - @Override - public String getConversationId(JetStreamTrace trace) { - return null; - } - - @Override - public Long getMessagePayloadSize(JetStreamTrace trace) { - return null; - } - - @Override - public Long getMessagePayloadCompressedSize(JetStreamTrace trace) { - return null; - } - - @Override - public String getMessageId(JetStreamTrace trace, Void unused) { - return trace.messageId(); - } - - } -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapSetter.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapSetter.java deleted file mode 100644 index fc81fa9b..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamTraceTextMapSetter.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.tracing; - -import java.util.List; - -import io.opentelemetry.context.propagation.TextMapSetter; - -public enum JetStreamTraceTextMapSetter implements TextMapSetter { - INSTANCE; - - @Override - public void set(final JetStreamTrace carrier, final String key, final String value) { - if (carrier != null) { - final var headers = carrier.headers(); - if (headers != null) { - headers.put(key, List.of(value)); - } - } - } -} diff --git a/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoffTest.java b/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoffTest.java index 75ce24b6..8f82f76b 100644 --- a/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoffTest.java +++ b/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/ExponentialBackoffTest.java @@ -6,6 +6,8 @@ import org.junit.jupiter.api.Test; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff; + public class ExponentialBackoffTest { @Test public void testGetDuration() { diff --git a/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java b/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java index d6fdd3ba..98513f80 100644 --- a/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java +++ b/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java @@ -197,11 +197,6 @@ public Duration exponentialBackoffMaxDuration() { return null; } - @Override - public boolean traceEnabled() { - return false; - } - @Override public Duration ackTimeout() { return Duration.ofSeconds(3);