From 21cddc1e68e43fc9925c252cec00f712eaec1f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paw=20Figg=C3=A9=20Kjeldgaard?= Date: Tue, 15 Oct 2024 08:47:15 +0200 Subject: [PATCH] Fixed not reporting health down when not able to subscripe to NATS --- .github/project.yml | 2 +- ...ctiveMesssagingNatsJetstreamProcessor.java | 4 +- .../test/DeadLetterConsumingBean.java | 5 +- .../test/ExponentialBackoffConsumingBean.java | 4 +- .../jetstream/test/KeyValueStoreResource.java | 5 +- .../test/ReaderSubscribeConnectionTest.java | 39 +-- .../jetstream/test/RequestReplyResource.java | 5 +- .../ROOT/pages/includes/attributes.adoc | 2 +- runtime/pom.xml | 4 + .../nats/jetstream/JetStreamConnector.java | 12 +- .../nats/jetstream/client/Connection.java | 11 +- .../jetstream/client/ConnectionEvent.java | 3 +- .../jetstream/client/ConnectionFactory.java | 43 +-- .../jetstream/client/ConnectionListener.java | 2 +- .../jetstream/client/DefaultConnection.java | 90 ++++++- .../client/DefaultConnectionListener.java | 17 ++ .../client/PushSubscribeConnection.java | 197 -------------- .../jetstream/client/PushSubscribtion.java | 103 ++++++++ .../client/ReaderSubscribeConnection.java | 248 ------------------ .../jetstream/client/ReaderSubscribtion.java | 121 +++++++++ ...cribeConnection.java => Subscription.java} | 2 +- .../configuration/DefaultErrorListener.java | 2 +- .../mapper/DefaultMessageMapper.java | 6 +- .../processors/MessageProcessor.java | 27 +- .../publisher/MessagePublisherProcessor.java | 47 +++- .../MessagePullPublisherProcessor.java | 13 +- .../MessagePushPublisherProcessor.java | 12 +- .../MessageSubscriberProcessor.java | 16 +- ...rumenter.java => JetStreamInstrument.java} | 4 +- 29 files changed, 453 insertions(+), 593 deletions(-) create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionListener.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribtion.java delete mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribtion.java rename runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/{SubscribeConnection.java => Subscription.java} (76%) rename runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/{JetStreamInstrumenter.java => JetStreamInstrument.java} (96%) diff --git a/.github/project.yml b/.github/project.yml index f56af920..3d464bf3 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.15.5" + current-version: "3.15.6" next-version: "3.16.0-SNAPSHOT" diff --git a/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java b/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java index 5a1e1c28..c894cc10 100644 --- a/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java +++ b/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamProcessor.java @@ -11,7 +11,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultPayloadMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapperImpl; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter; +import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; @@ -51,7 +51,7 @@ void initializeSecureRandomRelatedClassesAtRuntime( @BuildStep void createNatsConnector(BuildProducer buildProducer) { buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamConnector.class)); - buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamInstrumenter.class)); + buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamInstrument.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ExecutionHolder.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConnectionFactory.class)); buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultPayloadMapper.class)); diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java index 9ce62378..908c20df 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java @@ -17,6 +17,7 @@ import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.smallrye.mutiny.Uni; @@ -83,8 +84,8 @@ private Uni getOrEstablishConnection() { .filter(Connection::isConnected) .orElse(null)) .onItem().ifNull() - .switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> { - })) + .switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), + new DefaultConnectionListener())) .onItem().invoke(this.connection::set); } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java index 917910fb..00cd8696 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java @@ -17,6 +17,7 @@ import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.smallrye.mutiny.Uni; @@ -93,8 +94,7 @@ private Uni getOrEstablishConnection() { .filter(Connection::isConnected) .orElse(null)) .onItem().ifNull().switchTo(() -> connectionFactory - .create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> { - })) + .create(ConnectionConfiguration.of(natsConfiguration), new DefaultConnectionListener())) .onItem().invoke(this.connection::set); } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/KeyValueStoreResource.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/KeyValueStoreResource.java index 3a795b7c..c09d3007 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/KeyValueStoreResource.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/KeyValueStoreResource.java @@ -15,6 +15,7 @@ import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.smallrye.mutiny.Uni; @@ -84,8 +85,8 @@ private Uni getOrEstablishConnection() { .filter(Connection::isConnected) .orElse(null)) .onItem().ifNull() - .switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> { - })) + .switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), + new DefaultConnectionListener())) .onItem().invoke(this.connection::set); } } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java index 7b469e57..b7bbe046 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java @@ -19,13 +19,14 @@ import io.nats.client.api.ReplayPolicy; import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; import io.quarkus.test.QuarkusUnitTest; public class ReaderSubscribeConnectionTest { - private Logger logger = Logger.getLogger(ReaderSubscribeConnectionTest.class); + private final static Logger logger = Logger.getLogger(ReaderSubscribeConnectionTest.class); @RegisterExtension static QuarkusUnitTest runner = new QuarkusUnitTest() @@ -42,30 +43,30 @@ void createConnectionWithModifiedConfiguration() throws Exception { var consumerConfiguration = createConsumerConfiguration(List.of(Duration.ofSeconds(10)), 2L); try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), - (event, message) -> { - }, - consumerConfiguration).await().atMost(Duration.ofSeconds(30))) { + new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { logger.info("Connected to NATS"); - final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name()) - .await().atMost(Duration.ofSeconds(30)); - assertThat(consumer).isNotNull(); - assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10))); - assertThat(consumer.configuration().maxDeliver()).isEqualTo(2L); + try (final var ignored = connection.subscribtion(consumerConfiguration).await().atMost(Duration.ofSeconds(30))) { + final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name()) + .await().atMost(Duration.ofSeconds(30)); + assertThat(consumer).isNotNull(); + assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10))); + assertThat(consumer.configuration().maxDeliver()).isEqualTo(2L); + } } consumerConfiguration = createConsumerConfiguration(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30)), 3L); try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), - (event, message) -> { - }, - consumerConfiguration).await().atMost(Duration.ofSeconds(30))) { - logger.info("Connected to NATS"); - final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name()) - .await().atMost(Duration.ofSeconds(30)); - assertThat(consumer).isNotNull(); - assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30))); - assertThat(consumer.configuration().maxDeliver()).isEqualTo(3L); + new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { + try (final var ignored = connection.subscribtion(consumerConfiguration).await().atMost(Duration.ofSeconds(30))) { + logger.info("Connected to NATS"); + final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name()) + .await().atMost(Duration.ofSeconds(30)); + assertThat(consumer).isNotNull(); + assertThat(consumer.configuration().backoff()) + .isEqualTo(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30))); + assertThat(consumer.configuration().maxDeliver()).isEqualTo(3L); + } } - } private ReaderConsumerConfiguration createConsumerConfiguration(List backoff, Long maxDeliver) { diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java index 5bbdd94d..98d95582 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java @@ -27,6 +27,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata; import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; @@ -108,8 +109,8 @@ private Uni getOrEstablishMessageConnection() { .filter(Connection::isConnected) .orElse(null)) .onItem().ifNull() - .switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> { - })) + .switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), + new DefaultConnectionListener())) .onItem().invoke(this.messageConnection::set); } diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index d9f61f8a..f0852b3e 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.15.5 +:project-version: 3.15.6 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/runtime/pom.xml b/runtime/pom.xml index 87ad344d..83263d58 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -29,6 +29,10 @@ io.opentelemetry.instrumentation opentelemetry-instrumentation-api-semconv + + org.apache.commons + commons-lang3 + io.nats jnats diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java index fcd4227f..41f8419f 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java @@ -17,6 +17,7 @@ import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import org.jboss.logging.Logger; import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; @@ -77,6 +78,8 @@ public class JetStreamConnector implements InboundConnector, OutboundConnector, HealthReporter { public static final String CONNECTOR_NAME = "quarkus-jetstream"; + private final static Logger logger = Logger.getLogger(JetStreamConnector.class); + private final List processors; private final NatsConfiguration natsConfiguration; private final ConnectionFactory connectionFactory; @@ -132,7 +135,14 @@ public HealthReport getLiveness() { public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object ignored) { - this.processors.forEach(processor -> processor.close().await().indefinitely()); + this.processors.forEach(processor -> { + try { + processor.close(); + } catch (Throwable failure) { + logger.warnf(failure, "Failed to close the processor: %s", failure.getMessage()); + } + }); + this.processors.clear(); } private MessagePublisherProcessor createMessagePublisherProcessor( diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java index caae73c5..b5069d09 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java @@ -8,8 +8,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -23,6 +22,8 @@ public interface Connection extends AutoCloseable { void addListener(ConnectionListener listener); + void removeListener(ConnectionListener listener); + default void fireEvent(ConnectionEvent event, String message) { listeners().forEach(listener -> listener.onEvent(event, message)); } @@ -70,4 +71,10 @@ Uni> publish(final Message message, final PublishConfiguration Uni deleteKeyValue(String bucketName, String key); Uni> resolve(String streamName, long sequence); + + Uni> subscribtion(PushConsumerConfiguration configuration); + + Uni> subscribtion(ReaderConsumerConfiguration configuration); + + void close(Subscription subscribtion); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java index b38426c2..e33175aa 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java @@ -5,6 +5,5 @@ public enum ConnectionEvent { Connected, Closed, Reconnected, - CommunicationFailed, - SubscriptionInactive, + CommunicationFailed } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java index 9b748dbf..918871ad 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java @@ -9,13 +9,11 @@ import org.jboss.logging.Logger; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter; +import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.unchecked.Unchecked; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; @@ -31,7 +29,7 @@ public class ConnectionFactory { private final ExecutionHolder executionHolder; private final MessageMapper messageMapper; - private final JetStreamInstrumenter instrumenter; + private final JetStreamInstrument instrumenter; private final PayloadMapper payloadMapper; private final ConsumerMapper consumerMapper; private final StreamStateMapper streamStateMapper; @@ -39,7 +37,7 @@ public class ConnectionFactory { @Inject public ConnectionFactory(ExecutionHolder executionHolder, MessageMapper messageMapper, - JetStreamInstrumenter instrumenter, + JetStreamInstrument instrumenter, PayloadMapper payloadMapper, ConsumerMapper consumerMapper, StreamStateMapper streamStateMapper) { @@ -51,41 +49,6 @@ public ConnectionFactory(ExecutionHolder executionHolder, this.streamStateMapper = streamStateMapper; } - public Uni> create(ConnectionConfiguration connectionConfiguration, - ConnectionListener connectionListener, - ReaderConsumerConfiguration consumerConfiguration) { - return getContext() - .onItem() - .transformToUni(context -> Uni.createFrom() - .item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener, - context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, instrumenter)))) - .onItem().transformToUni(connection -> Uni.createFrom() - .item(Unchecked.supplier(() -> new ReaderSubscribeConnection<>(connection, consumerConfiguration)))) - .onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage())) - .onFailure() - .retry() - .withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF)) - .atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS)); - } - - public Uni> create(ConnectionConfiguration connectionConfiguration, - ConnectionListener connectionListener, - PushConsumerConfiguration consumerConfiguration) { - - return getContext() - .onItem() - .transformToUni(context -> Uni.createFrom() - .item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener, - context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, instrumenter)))) - .onItem().transformToUni(connection -> Uni.createFrom() - .item(Unchecked.supplier(() -> new PushSubscribeConnection<>(connection, consumerConfiguration)))) - .onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage())) - .onFailure() - .retry() - .withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF)) - .atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS)); - } - public Uni create(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) { return getContext() diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionListener.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionListener.java index 94d2fcd9..d760a766 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionListener.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionListener.java @@ -1,6 +1,6 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client; -public interface ConnectionListener { +public interface ConnectionListener extends AutoCloseable { void onEvent(ConnectionEvent event, String message); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java index 8ed212e9..3c3d4545 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java @@ -9,6 +9,7 @@ import java.util.*; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.reactive.messaging.Message; import org.jboss.logging.Logger; @@ -29,7 +30,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter; +import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -46,7 +47,7 @@ public class DefaultConnection implements Connection { private final ConsumerMapper consumerMapper; private final MessageMapper messageMapper; private final PayloadMapper payloadMapper; - private final JetStreamInstrumenter instrumenter; + private final JetStreamInstrument instrument; DefaultConnection(final ConnectionConfiguration configuration, final ConnectionListener connectionListener, @@ -55,7 +56,7 @@ public class DefaultConnection implements Connection { final PayloadMapper payloadMapper, final ConsumerMapper consumerMapper, final StreamStateMapper streamStateMapper, - final JetStreamInstrumenter instrumenter) throws ConnectionException { + final JetStreamInstrument instrumenter) throws ConnectionException { this.connection = connect(configuration); this.listeners = new ArrayList<>(List.of(connectionListener)); this.context = context; @@ -63,7 +64,7 @@ public class DefaultConnection implements Connection { this.consumerMapper = consumerMapper; this.messageMapper = messageMapper; this.payloadMapper = payloadMapper; - this.instrumenter = instrumenter; + this.instrument = instrumenter; fireEvent(ConnectionEvent.Connected, "Connection established"); } @@ -95,8 +96,20 @@ public void addListener(ConnectionListener listener) { listeners.add(listener); } + @Override + public void removeListener(ConnectionListener listener) { + this.listeners.remove(listener); + } + @Override public void close() { + new ArrayList<>(listeners).forEach(listener -> { + try { + listener.close(); + } catch (Throwable failure) { + logger.warnf(failure, "Error closing listener: %s", failure.getMessage()); + } + }); try { connection.close(); } catch (Throwable throwable) { @@ -226,7 +239,7 @@ public Uni> publish(Message message, PublishConfiguration conf if (configuration.traceEnabled()) { // Create a new span for the outbound message and record updated tracing information in // the headers; this has to be done before we build the properties below - traceOutgoing(instrumenter.publisher(), message, + traceOutgoing(instrument.publisher(), message, new JetStreamTrace(configuration.stream(), subject, messageId, headers, new String(payload))); } @@ -338,16 +351,71 @@ public Uni> resolve(String streamName, long sequence) { .emitOn(context::runOnContext); } - io.nats.client.Connection connection() { - return connection; + @Override + public Uni> subscribtion(PushConsumerConfiguration configuration) { + return Uni.createFrom().item(() -> new PushSubscribtion<>(this, configuration, connection, messageMapper, context)); } - Context context() { - return context; + @Override + public Uni> subscribtion(ReaderConsumerConfiguration configuration) { + return createSubscription(configuration) + .onItem().transformToUni(subscription -> createReader(configuration, subscription)) + .onItem() + .transformToUni(pair -> Uni.createFrom() + .> item(Unchecked.supplier(() -> new ReaderSubscribtion<>(this, configuration, + pair.getLeft(), pair.getRight(), messageMapper, context)))) + .onItem().invoke(this::addListener); } - MessageMapper messageMapper() { - return messageMapper; + @Override + public void close(Subscription subscription) { + try { + subscription.close(); + } catch (Throwable failure) { + logger.warnf(failure, "Failed to close subscription: %s", failure.getMessage()); + } + removeListener(subscription); + } + + private Uni> createReader(ReaderConsumerConfiguration configuration, + JetStreamSubscription subscription) { + return Uni.createFrom() + .item(Unchecked.supplier(() -> subscription.reader(configuration.maxRequestBatch(), configuration.rePullAt()))) + .onItem().transform(reader -> Pair.of(subscription, reader)); + } + + /** + * Creates a subscription. + * If an IllegalArgumentException is thrown the consumer configuration is modified. + */ + private Uni createSubscription(ReaderConsumerConfiguration configuration) { + return Uni.createFrom().item(Unchecked.supplier(connection::jetStream)) + .onItem().transformToUni(jetStream -> createSubscription(jetStream, configuration)); + } + + private Uni createSubscription(JetStream jetStream, + ReaderConsumerConfiguration configuration) { + return subscribe(jetStream, configuration) + .onFailure().recoverWithUni(failure -> { + if (failure instanceof IllegalArgumentException) { + return deleteConsumer(configuration.consumerConfiguration().stream(), + configuration.consumerConfiguration().name()) + .onItem().transformToUni(v -> subscribe(jetStream, configuration)); + } else { + return Uni.createFrom().failure(failure); + } + }); + } + + private Uni subscribe(JetStream jetStream, ReaderConsumerConfiguration configuration) { + return Uni.createFrom().emitter(emitter -> { + try { + final var optionsFactory = new PullSubscribeOptionsFactory(); + emitter.complete(jetStream.subscribe(configuration.subject(), optionsFactory.create(configuration))); + } catch (Throwable failure) { + emitter.fail(failure); + } + }); } private Uni getStreamInfo(String streamName) { diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionListener.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionListener.java new file mode 100644 index 00000000..5238fa3b --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnectionListener.java @@ -0,0 +1,17 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +import org.jboss.logging.Logger; + +public class DefaultConnectionListener implements ConnectionListener { + private final static Logger logger = Logger.getLogger(DefaultConnectionListener.class); + + @Override + public void onEvent(ConnectionEvent event, String message) { + logger.infof("Event: %s, message: %s", event, message); + } + + @Override + public void close() throws Exception { + + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java deleted file mode 100644 index e7c89f45..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java +++ /dev/null @@ -1,197 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.client; - -import java.time.Duration; -import java.util.List; - -import org.eclipse.microprofile.reactive.messaging.Message; -import org.jboss.logging.Logger; - -import io.nats.client.Dispatcher; -import io.nats.client.JetStreamSubscription; -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; - -public class PushSubscribeConnection

implements SubscribeConnection

{ - private static final Logger logger = Logger.getLogger(PushSubscribeConnection.class); - - private final DefaultConnection delegate; - private final PushConsumerConfiguration

consumerConfiguration; - private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory; - - private volatile JetStreamSubscription subscription; - private volatile Dispatcher dispatcher; - - PushSubscribeConnection(DefaultConnection delegate, - PushConsumerConfiguration

consumerConfiguration) { - this.delegate = delegate; - this.consumerConfiguration = consumerConfiguration; - this.pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory(); - } - - @Override - public Multi> subscribe() { - boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); - Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); - final var subject = consumerConfiguration.subject(); - return Multi.createFrom(). emitter(emitter -> { - try { - final var jetStream = delegate.connection().jetStream(); - dispatcher = delegate.connection().createDispatcher(); - final var pushOptions = pushSubscribeOptionsFactory.create(consumerConfiguration); - subscription = jetStream.subscribe( - subject, dispatcher, - emitter::emit, - false, - pushOptions); - } catch (Throwable e) { - logger.errorf( - e, - "Failed subscribing to stream: %s, subject: %s with message: %s", - consumerConfiguration.consumerConfiguration().stream(), - subject, - e.getMessage()); - emitter.fail(e); - } - }) - .emitOn(runnable -> delegate.context().runOnContext(runnable)) - .map(message -> delegate.messageMapper().of( - message, - traceEnabled, - payloadType, - delegate.context(), - new ExponentialBackoff( - consumerConfiguration.consumerConfiguration().exponentialBackoff(), - consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), - consumerConfiguration.consumerConfiguration().ackTimeout())); - } - - @Override - public boolean isConnected() { - return delegate.isConnected(); - } - - @Override - public Uni flush(Duration duration) { - return delegate.flush(duration); - } - - @Override - public List listeners() { - return delegate.listeners(); - } - - @Override - public void addListener(ConnectionListener listener) { - delegate.addListener(listener); - } - - @Override - public Uni getConsumer(String stream, String consumerName) { - return delegate.getConsumer(stream, consumerName); - } - - @Override - public Uni deleteConsumer(String streamName, String consumerName) { - return delegate.deleteConsumer(streamName, consumerName); - } - - @Override - public Uni> getStreams() { - return delegate.getStreams(); - } - - @Override - public Uni> getSubjects(String streamName) { - return delegate.getSubjects(streamName); - } - - @Override - public Uni> getConsumerNames(String streamName) { - return delegate.getConsumerNames(streamName); - } - - @Override - public Uni purgeStream(String streamName) { - return delegate.purgeStream(streamName); - } - - @Override - public Uni deleteMessage(String stream, long sequence, boolean erase) { - return delegate.deleteMessage(stream, sequence, erase); - } - - @Override - public Uni getStreamState(String streamName) { - return delegate.getStreamState(streamName); - } - - @Override - public Uni> purgeAllStreams() { - return delegate.purgeAllStreams(); - } - - @Override - public Uni> publish(Message message, PublishConfiguration configuration) { - return delegate.publish(message, configuration); - } - - @Override - public Uni> publish(Message message, PublishConfiguration publishConfiguration, - FetchConsumerConfiguration consumerConfiguration) { - return delegate.publish(message, publishConfiguration, consumerConfiguration); - } - - @Override - public Uni> nextMessage(FetchConsumerConfiguration configuration) { - return delegate.nextMessage(configuration); - } - - @Override - public Multi> nextMessages(FetchConsumerConfiguration configuration) { - return delegate.nextMessages(configuration); - } - - @Override - public Uni getKeyValue(String bucketName, String key, Class valueType) { - return delegate.getKeyValue(bucketName, key, valueType); - } - - @Override - public Uni putKeyValue(String bucketName, String key, T value) { - return delegate.putKeyValue(bucketName, key, value); - } - - @Override - public Uni deleteKeyValue(String bucketName, String key) { - return delegate.deleteKeyValue(bucketName, key); - } - - @Override - public Uni> resolve(String streamName, long sequence) { - return delegate.resolve(streamName, sequence); - } - - @Override - public void close() { - try { - if (subscription.isActive()) { - subscription.drain(Duration.ofMillis(1000)); - } - } catch (Throwable failure) { - logger.warnf(failure, "Interrupted while draining subscription: %s", failure.getMessage()); - } - try { - if (subscription != null && dispatcher != null && dispatcher.isActive()) { - dispatcher.unsubscribe(subscription); - } - } catch (Throwable failure) { - logger.warnf(failure, "Failed to shutdown pull executor: %s", failure.getMessage()); - } - delegate.close(); - } -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribtion.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribtion.java new file mode 100644 index 00000000..6b4e15aa --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribtion.java @@ -0,0 +1,103 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +import java.time.Duration; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.jboss.logging.Logger; + +import io.nats.client.Dispatcher; +import io.nats.client.JetStreamSubscription; +import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; +import io.smallrye.mutiny.Multi; +import io.vertx.mutiny.core.Context; + +public class PushSubscribtion

implements Subscription

{ + private static final Logger logger = Logger.getLogger(PushSubscribtion.class); + + private final Connection connection; + private final PushConsumerConfiguration

consumerConfiguration; + private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory; + private final io.nats.client.Connection natsConnection; + private final MessageMapper messageMapper; + private final Context context; + + private volatile JetStreamSubscription subscription; + private volatile Dispatcher dispatcher; + + PushSubscribtion(final Connection connection, + final PushConsumerConfiguration

consumerConfiguration, + final io.nats.client.Connection natsConnection, + final MessageMapper messageMapper, + final Context context) { + this.connection = connection; + this.consumerConfiguration = consumerConfiguration; + this.pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory(); + this.natsConnection = natsConnection; + this.messageMapper = messageMapper; + this.context = context; + } + + @Override + public Multi> subscribe() { + boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); + Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); + final var subject = consumerConfiguration.subject(); + return Multi.createFrom(). emitter(emitter -> { + try { + final var jetStream = natsConnection.jetStream(); + dispatcher = natsConnection.createDispatcher(); + final var pushOptions = pushSubscribeOptionsFactory.create(consumerConfiguration); + subscription = jetStream.subscribe( + subject, dispatcher, + emitter::emit, + false, + pushOptions); + } catch (Throwable e) { + logger.errorf( + e, + "Failed subscribing to stream: %s, subject: %s with message: %s", + consumerConfiguration.consumerConfiguration().stream(), + subject, + e.getMessage()); + emitter.fail(e); + } + }) + .emitOn(context::runOnContext) + .map(message -> messageMapper.of( + message, + traceEnabled, + payloadType, + context, + new ExponentialBackoff( + consumerConfiguration.consumerConfiguration().exponentialBackoff(), + consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), + consumerConfiguration.consumerConfiguration().ackTimeout())); + } + + @Override + public void onEvent(ConnectionEvent event, String message) { + + } + + @Override + public void close() { + try { + if (subscription.isActive()) { + subscription.drain(Duration.ofMillis(1000)); + } + } catch (Throwable failure) { + logger.warnf(failure, "Interrupted while draining subscription: %s", failure.getMessage()); + } + try { + if (subscription != null && dispatcher != null && dispatcher.isActive()) { + dispatcher.unsubscribe(subscription); + } + } catch (Throwable failure) { + logger.warnf(failure, "Failed to shutdown pull executor: %s", failure.getMessage()); + } + connection.removeListener(this); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java deleted file mode 100644 index 9d9b1ecd..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java +++ /dev/null @@ -1,248 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.client; - -import java.io.IOException; -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.eclipse.microprofile.reactive.messaging.Message; -import org.jboss.logging.Logger; - -import io.nats.client.JetStream; -import io.nats.client.JetStreamApiException; -import io.nats.client.JetStreamStatusException; -import io.nats.client.JetStreamSubscription; -import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; -import io.vertx.mutiny.core.Context; - -public class ReaderSubscribeConnection

implements SubscribeConnection

{ - private final static Logger logger = Logger.getLogger(ReaderSubscribeConnection.class); - - private final DefaultConnection delegate; - private final ReaderConsumerConfiguration

consumerConfiguration; - private final io.nats.client.JetStreamReader reader; - private final JetStreamSubscription subscription; - - ReaderSubscribeConnection(DefaultConnection delegate, - ReaderConsumerConfiguration

consumerConfiguration) throws ConnectionException { - this.delegate = delegate; - this.consumerConfiguration = consumerConfiguration; - try { - final var jetStream = delegate.connection().jetStream(); - final var optionsFactory = new PullSubscribeOptionsFactory(); - this.subscription = createSubscription(jetStream, consumerConfiguration, optionsFactory); - this.reader = subscription.reader(consumerConfiguration.maxRequestBatch(), consumerConfiguration.rePullAt()); - } catch (Throwable failure) { - throw new ConnectionException(failure); - } - } - - @Override - public Multi> subscribe() { - boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); - Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); - ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); - return Multi.createBy().repeating() - .uni(this::readNextMessage) - .whilst(message -> isConnected() && subscription.isActive()) - .runSubscriptionOn(pullExecutor) - .emitOn(runable -> delegate.context().runOnContext(runable)) - .flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, delegate.context())) - .onCompletion().invoke(() -> fireEvent(ConnectionEvent.SubscriptionInactive, "Subscription became inactive")); - } - - @Override - public boolean isConnected() { - return delegate.isConnected(); - } - - @Override - public List listeners() { - return delegate.listeners(); - } - - @Override - public void addListener(ConnectionListener listener) { - delegate.addListener(listener); - } - - @Override - public Uni getConsumer(String stream, String consumerName) { - return delegate.getConsumer(stream, consumerName); - } - - @Override - public Uni> getStreams() { - return delegate.getStreams(); - } - - @Override - public Uni> getSubjects(String streamName) { - return delegate.getSubjects(streamName); - } - - @Override - public Uni> getConsumerNames(String streamName) { - return delegate.getConsumerNames(streamName); - } - - @Override - public Uni deleteConsumer(String streamName, String consumerName) { - return delegate.deleteConsumer(streamName, consumerName); - } - - @Override - public Uni purgeStream(String streamName) { - return delegate.purgeStream(streamName); - } - - @Override - public Uni deleteMessage(String stream, long sequence, boolean erase) { - return delegate.deleteMessage(stream, sequence, erase); - } - - @Override - public Uni getStreamState(String streamName) { - return delegate.getStreamState(streamName); - } - - @Override - public Uni> purgeAllStreams() { - return delegate.purgeAllStreams(); - } - - @Override - public Uni> publish(Message message, PublishConfiguration configuration) { - return delegate.publish(message, configuration); - } - - @Override - public Uni> publish(Message message, PublishConfiguration publishConfiguration, - FetchConsumerConfiguration consumerConfiguration) { - return delegate.publish(message, publishConfiguration, consumerConfiguration); - } - - @Override - public Uni> nextMessage(FetchConsumerConfiguration configuration) { - return delegate.nextMessage(configuration); - } - - @Override - public Multi> nextMessages(FetchConsumerConfiguration configuration) { - return delegate.nextMessages(configuration); - } - - @Override - public Uni getKeyValue(String bucketName, String key, Class valueType) { - return delegate.getKeyValue(bucketName, key, valueType); - } - - @Override - public Uni putKeyValue(String bucketName, String key, T value) { - return delegate.putKeyValue(bucketName, key, value); - } - - @Override - public Uni deleteKeyValue(String bucketName, String key) { - return delegate.deleteKeyValue(bucketName, key); - } - - @Override - public Uni> resolve(String streamName, long sequence) { - return delegate.resolve(streamName, sequence); - } - - @Override - public Uni flush(Duration duration) { - return delegate.flush(duration); - } - - @Override - public void close() { - try { - reader.stop(); - } catch (Throwable e) { - logger.warnf("Failed to stop reader with message %s", e.getMessage()); - } - try { - if (subscription.isActive()) { - subscription.drain(Duration.ofMillis(1000)); - } - } catch (Throwable e) { - logger.warnf("Interrupted while draining subscription"); - } - try { - if (subscription.isActive()) { - subscription.unsubscribe(); - } - } catch (Throwable e) { - logger.warnf("Failed to unsubscribe subscription with message %s", e.getMessage()); - } - delegate.close(); - } - - private Uni> readNextMessage() { - return Uni.createFrom().emitter(emitter -> { - try { - emitter.complete(Optional - .ofNullable(reader.nextMessage(consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO)))); - } catch (JetStreamStatusException e) { - emitter.fail(new ReaderException(e)); - } catch (IllegalStateException e) { - logger.warnf("The subscription became inactive for stream: %s", - consumerConfiguration.consumerConfiguration().stream()); - emitter.complete(Optional.empty()); - } catch (InterruptedException e) { - emitter.fail(new ReaderException(String.format("The reader was interrupted for stream: %s", - consumerConfiguration.consumerConfiguration().stream()), e)); - } catch (Throwable throwable) { - emitter.fail(new ReaderException(String.format("Error reading next message from stream: %s", - consumerConfiguration.consumerConfiguration().stream()), throwable)); - } - }); - } - - @SuppressWarnings("unchecked") - private Multi> createMulti(io.nats.client.Message message, - boolean tracingEnabled, Class

payloadType, Context context) { - if (message == null || message.getData() == null) { - return Multi.createFrom().empty(); - } else { - return Multi.createFrom() - .item(() -> delegate.messageMapper().of(message, tracingEnabled, payloadType, context, - new ExponentialBackoff( - consumerConfiguration.consumerConfiguration().exponentialBackoff(), - consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), - consumerConfiguration.consumerConfiguration().ackTimeout())); - } - } - - /** - * Creates a subscription. - * If an IllegalArgumentException is thrown the consumer configuration is modified. - */ - private JetStreamSubscription createSubscription(JetStream jetStream, - ReaderConsumerConfiguration

consumerConfiguration, - PullSubscribeOptionsFactory optionsFactory) throws IOException, JetStreamApiException { - try { - return jetStream.subscribe(consumerConfiguration.subject(), - optionsFactory.create(consumerConfiguration)); - } catch (IllegalArgumentException e) { // consumer is modified, Existing consumer cannot be modified - deleteConsumer(consumerConfiguration.consumerConfiguration().stream(), - consumerConfiguration.consumerConfiguration().name()).await().indefinitely(); - return jetStream.subscribe(consumerConfiguration.subject(), - optionsFactory.create(consumerConfiguration)); - } - } -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribtion.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribtion.java new file mode 100644 index 00000000..9be955a4 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribtion.java @@ -0,0 +1,121 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.jboss.logging.Logger; + +import io.nats.client.JetStreamReader; +import io.nats.client.JetStreamStatusException; +import io.nats.client.JetStreamSubscription; +import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.vertx.mutiny.core.Context; + +class ReaderSubscribtion

implements Subscription

{ + private final static Logger logger = Logger.getLogger(ReaderSubscribtion.class); + + private final Connection connection; + private final ReaderConsumerConfiguration

consumerConfiguration; + private final JetStreamReader reader; + private final JetStreamSubscription subscription; + private final MessageMapper messageMapper; + private final Context context; + + ReaderSubscribtion(Connection connection, + ReaderConsumerConfiguration

consumerConfiguration, + JetStreamSubscription subscription, + JetStreamReader reader, + MessageMapper messageMapper, + Context context) { + this.connection = connection; + this.consumerConfiguration = consumerConfiguration; + this.subscription = subscription; + this.reader = reader; + this.messageMapper = messageMapper; + this.context = context; + } + + @Override + public Multi> subscribe() { + boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); + Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); + ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); + return Multi.createBy().repeating() + .uni(this::readNextMessage) + .whilst(message -> connection.isConnected() && subscription.isActive()) + .runSubscriptionOn(pullExecutor) + .emitOn(context::runOnContext) + .flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, context)); + } + + @Override + public void onEvent(ConnectionEvent event, String message) { + + } + + @Override + public void close() { + try { + reader.stop(); + } catch (Throwable e) { + logger.warnf("Failed to stop reader with message %s", e.getMessage()); + } + try { + if (subscription.isActive()) { + subscription.drain(Duration.ofMillis(1000)); + } + } catch (Throwable e) { + logger.warnf("Interrupted while draining subscription"); + } + try { + if (subscription.isActive()) { + subscription.unsubscribe(); + } + } catch (Throwable e) { + logger.warnf("Failed to unsubscribe subscription with message %s", e.getMessage()); + } + connection.removeListener(this); + } + + private Uni> readNextMessage() { + return Uni.createFrom().emitter(emitter -> { + try { + emitter.complete(Optional + .ofNullable(reader.nextMessage(consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO)))); + } catch (JetStreamStatusException e) { + emitter.fail(new ReaderException(e)); + } catch (IllegalStateException e) { + logger.warnf("The subscription became inactive for stream: %s", + consumerConfiguration.consumerConfiguration().stream()); + emitter.complete(Optional.empty()); + } catch (InterruptedException e) { + emitter.fail(new ReaderException(String.format("The reader was interrupted for stream: %s", + consumerConfiguration.consumerConfiguration().stream()), e)); + } catch (Throwable throwable) { + emitter.fail(new ReaderException(String.format("Error reading next message from stream: %s", + consumerConfiguration.consumerConfiguration().stream()), throwable)); + } + }); + } + + private Multi> createMulti(io.nats.client.Message message, + boolean tracingEnabled, Class

payloadType, Context context) { + if (message == null || message.getData() == null) { + return Multi.createFrom().empty(); + } else { + return Multi.createFrom() + .item(() -> messageMapper.of(message, tracingEnabled, payloadType, context, + new ExponentialBackoff( + consumerConfiguration.consumerConfiguration().exponentialBackoff(), + consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), + consumerConfiguration.consumerConfiguration().ackTimeout())); + } + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java similarity index 76% rename from runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java rename to runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java index c0e4692c..bfd943e3 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Subscription.java @@ -4,7 +4,7 @@ import io.smallrye.mutiny.Multi; -public interface SubscribeConnection extends Connection { +public interface Subscription extends ConnectionListener { Multi> subscribe(); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultErrorListener.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultErrorListener.java index 365a7262..4bf02af4 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultErrorListener.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultErrorListener.java @@ -18,7 +18,7 @@ public void errorOccurred(io.nats.client.Connection conn, String error) { @Override public void exceptionOccurred(io.nats.client.Connection conn, Exception exp) { - logger.errorf("Caught exception connecting to %s with message: %s", conn.getServers(), exp.getMessage()); + logger.debugf("Caught exception connecting to %s with message: %s", conn.getServers(), exp.getMessage()); } @Override diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java index cf5e7776..b6c6dc8b 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/DefaultMessageMapper.java @@ -11,7 +11,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessage; -import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter; +import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument; import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace; import io.quarkus.arc.DefaultBean; import io.vertx.mutiny.core.Context; @@ -23,11 +23,11 @@ public class DefaultMessageMapper implements MessageMapper { public static final String MESSAGE_TYPE_HEADER = "message.type"; private final PayloadMapper payloadMapper; - private final JetStreamInstrumenter instrumenter; + private final JetStreamInstrument instrumenter; @Inject public DefaultMessageMapper(PayloadMapper payloadMapper, - JetStreamInstrumenter instrumenter) { + JetStreamInstrument instrumenter) { this.payloadMapper = payloadMapper; this.instrumenter = instrumenter; } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java index 4f230f89..e4ee8d4e 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java @@ -1,35 +1,10 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors; -import java.util.concurrent.atomic.AtomicReference; - -import org.jboss.logging.Logger; - -import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; -import io.smallrye.mutiny.Uni; - -public interface MessageProcessor { - Logger logger = Logger.getLogger(MessageProcessor.class); +public interface MessageProcessor extends AutoCloseable { String channel(); Status readiness(); Status liveness(); - - AtomicReference connection(); - - default Uni close() { - return Uni.createFrom().item(() -> { - try { - final var connection = connection().getAndSet(null); - if (connection != null) { - connection.close(); - } - return null; - } catch (Throwable failure) { - logger.warnf(failure, "Failed to close connection with message: %s", failure.getMessage()); - return null; - } - }); - } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java index 7253d6ba..0ffecad0 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java @@ -6,6 +6,7 @@ import org.jboss.logging.Logger; import io.quarkiverse.reactive.messaging.nats.jetstream.client.*; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; import io.smallrye.mutiny.Multi; @@ -16,14 +17,21 @@ public abstract class MessagePublisherProcessor implements MessageProcessor, private final AtomicReference readiness; private final AtomicReference liveness; - private final AtomicReference> connection; + private final AtomicReference connection; + private final ConnectionFactory connectionFactory; + private final ConnectionConfiguration connectionConfiguration; + private final AtomicReference> subscription; - public MessagePublisherProcessor() { + public MessagePublisherProcessor(final ConnectionFactory connectionFactory, + final ConnectionConfiguration connectionConfiguration) { this.readiness = new AtomicReference<>( Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build()); this.liveness = new AtomicReference<>( Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build()); this.connection = new AtomicReference<>(); + this.connectionFactory = connectionFactory; + this.connectionConfiguration = connectionConfiguration; + this.subscription = new AtomicReference<>(); } @Override @@ -42,8 +50,9 @@ public Status liveness() { } @Override - public AtomicReference connection() { - return connection; + public void close() { + close(this.connection.getAndSet(null)); + close(this.subscription.getAndSet(null)); } public Multi> publisher() { @@ -62,7 +71,7 @@ public void onEvent(ConnectionEvent event, String message) { this.readiness.set(Status.builder().event(event).message(message).healthy(true).build()); this.liveness.set(Status.builder().event(event).message(message).healthy(true).build()); } - case Closed, CommunicationFailed, Disconnected, SubscriptionInactive -> + case Closed, CommunicationFailed, Disconnected -> this.readiness.set(Status.builder().event(event).message(message).healthy(false).build()); case Reconnected -> this.readiness.set(Status.builder().event(event).message(message).healthy(true).build()); @@ -71,23 +80,43 @@ public void onEvent(ConnectionEvent event, String message) { protected abstract MessagePublisherConfiguration configuration(); - protected abstract Uni> connect(); + protected abstract Uni> subscription(Connection connection); private Multi> recover(Throwable failure) { - return close().onItem().transformToMulti(v -> subscribe()); + return Uni.createFrom(). item(() -> { + close(this.subscription.getAndSet(null)); + return null; + }) + .onItem().transformToMulti(v -> subscribe()); } private Multi> subscribe() { return getOrEstablishConnection() - .onItem().transformToMulti(SubscribeConnection::subscribe) + .onItem().transformToUni(this::subscription) + .onItem().invoke(this.subscription::set) + .onItem().transformToMulti(Subscription::subscribe) .onSubscription().invoke(() -> logger.infof("Subscribed to channel %s", configuration().channel())); } - private Uni> getOrEstablishConnection() { + private Uni getOrEstablishConnection() { return Uni.createFrom().item(() -> Optional.ofNullable(connection.get()) .filter(Connection::isConnected) .orElse(null)) .onItem().ifNull().switchTo(this::connect) .onItem().invoke(this.connection::set); } + + private Uni connect() { + return connectionFactory.create(connectionConfiguration, this); + } + + private void close(AutoCloseable closeable) { + try { + if (closeable != null) { + closeable.close(); + } + } catch (Throwable failure) { + logger.warnf(failure, "Failed to close resource with message: %s", failure.getMessage()); + } + } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java index f6794d20..0f2305fb 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java @@ -1,21 +1,18 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeConnection; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.smallrye.mutiny.Uni; public class MessagePullPublisherProcessor extends MessagePublisherProcessor { private final MessagePullPublisherConfiguration configuration; - private final ConnectionFactory connectionFactory; - private final ConnectionConfiguration connectionConfiguration; public MessagePullPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, final MessagePullPublisherConfiguration configuration) { - super(); - this.connectionFactory = connectionFactory; - this.connectionConfiguration = connectionConfiguration; + super(connectionFactory, connectionConfiguration); this.configuration = configuration; } @@ -25,7 +22,7 @@ protected MessagePublisherConfiguration configuration() { } @Override - protected Uni> connect() { - return connectionFactory.create(connectionConfiguration, this, configuration); + protected Uni> subscription(Connection connection) { + return connection.subscribtion(configuration); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java index dc7826bc..932b4e67 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java @@ -1,20 +1,18 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeConnection; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.smallrye.mutiny.Uni; public class MessagePushPublisherProcessor extends MessagePublisherProcessor { private final MessagePushPublisherConfiguration configuration; - private final ConnectionFactory connectionFactory; - private final ConnectionConfiguration connectionConfiguration; public MessagePushPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, final MessagePushPublisherConfiguration configuration) { - this.connectionConfiguration = connectionConfiguration; - this.connectionFactory = connectionFactory; + super(connectionFactory, connectionConfiguration); this.configuration = configuration; } @@ -24,7 +22,7 @@ protected MessagePublisherConfiguration configuration() { } @Override - protected Uni> connect() { - return connectionFactory.create(connectionConfiguration, this, configuration); + protected Uni> subscription(Connection connection) { + return connection.subscribtion(configuration); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java index fac512bf..981821af 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java @@ -62,8 +62,15 @@ public Status liveness() { } @Override - public AtomicReference connection() { - return connection; + public void close() { + try { + final var connection = this.connection.getAndSet(null); + if (connection != null) { + connection.close(); + } + } catch (Throwable failure) { + logger.warnf(failure, "Failed to close connection with message: %s", failure.getMessage()); + } } @Override @@ -81,7 +88,10 @@ private Uni> publish(final Message message) { } private Uni> recover(final Message message) { - return close() + return Uni.createFrom(). item(() -> { + close(); + return null; + }) .onItem().transformToUni(v -> publish(message)); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrumenter.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java similarity index 96% rename from runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrumenter.java rename to runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java index aa55452b..17f35307 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrumenter.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/tracing/JetStreamInstrument.java @@ -17,11 +17,11 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; @ApplicationScoped -public class JetStreamInstrumenter { +public class JetStreamInstrument { private final Instance openTelemetryInstance; @Inject - public JetStreamInstrumenter(Instance openTelemetryInstance) { + public JetStreamInstrument(Instance openTelemetryInstance) { this.openTelemetryInstance = openTelemetryInstance; }