From 95e44b5585ebd464907d41b77d56d1ce2020e94f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paw=20Figg=C3=A9=20Kjeldgaard?= Date: Tue, 8 Oct 2024 08:09:58 +0200 Subject: [PATCH] Implemented liveness and readiness health report --- .github/project.yml | 2 +- .../nats/jetstream/test/HealthTest.java | 57 +++++++++ ...iveMesssagingNatsJetstreamDevModeTest.java | 2 +- ...activeMesssagingNatsJetstreamPullTest.java | 6 - ...activeMesssagingNatsJetstreamPushTest.java | 7 -- .../resources/application-health.properties | 10 ++ .../ROOT/pages/includes/attributes.adoc | 2 +- .../nats/jetstream/it/DataResourceTest.java | 34 ++++-- .../messaging/nats/NatsConfiguration.java | 11 ++ .../JetStreamBuildConfiguration.java | 42 +++---- .../nats/jetstream/JetStreamConnector.java | 17 +-- .../nats/jetstream/client/Connection.java | 3 + .../jetstream/client/ConnectionFactory.java | 34 +++++- .../jetstream/client/DefaultConnection.java | 14 +-- .../client/PushSubscribeConnection.java | 20 ++-- .../client/ReaderSubscribeConnection.java | 18 +-- .../jetstream/client/SubscribeConnection.java | 4 +- .../jetstream/client/SubscribeException.java | 9 ++ .../ConnectionConfiguration.java | 31 ++--- .../ConnectionOptionsFactory.java | 26 ++--- .../DefaultConnectionConfiguration.java | 37 +++--- .../KeyValueSetupConfiguration.java | 2 +- .../configuration/SetupConfiguration.java | 6 +- .../processors/MessageProcessor.java | 6 +- .../nats/jetstream/processors/Status.java | 2 + ...aultMessagePullPublisherConfiguration.java | 13 ++- .../publisher/MessagePublisherProcessor.java | 108 +++++++++++++++++- .../MessagePullPublisherProcessor.java | 76 ++---------- .../MessagePushPublisherProcessor.java | 76 +----------- .../MessageSubscriberProcessor.java | 73 ++++++++---- 30 files changed, 439 insertions(+), 309 deletions(-) create mode 100644 deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java create mode 100644 deployment/src/test/resources/application-health.properties create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java diff --git a/.github/project.yml b/.github/project.yml index 9ff9c967..36074e7d 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.15.1" + current-version: "3.15.2" next-version: "3.16.0-SNAPSHOT" diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java new file mode 100644 index 00000000..fec1c4de --- /dev/null +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/HealthTest.java @@ -0,0 +1,57 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.test; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; + +import java.util.concurrent.TimeUnit; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.filter.log.RequestLoggingFilter; +import io.restassured.filter.log.ResponseLoggingFilter; + +public class HealthTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class)) + .withConfigurationResource("application-health.properties"); + + @Test + public void readiness() { + await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> { + try { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/ready") + .then() + .statusCode(200); + return true; + } catch (AssertionError e) { + return false; + } + }); + } + + @Test + public void liveness() { + await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> { + try { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/live") + .then() + .statusCode(200); + return true; + } catch (AssertionError e) { + return false; + } + }); + } + +} diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java index 503adf2d..abc0e9b8 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamDevModeTest.java @@ -13,7 +13,7 @@ public class ReactiveMesssagingNatsJetstreamDevModeTest { @RegisterExtension - static QuarkusDevModeTest devModeTest = new QuarkusDevModeTest() + final static QuarkusDevModeTest devModeTest = new QuarkusDevModeTest() .withApplicationRoot((jar) -> jar .addClasses(ValueConsumingBean.class, ValueProducingBean.class, ValueResource.class, TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class, diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java index 7149a740..36d10f82 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java @@ -33,12 +33,6 @@ public void setup() { defaultParser = Parser.JSON; } - @Test - public void health() { - given().get("/q/health/ready").then().statusCode(200); - given().get("/q/health/live").then().statusCode(200); - } - @Test public void metadata() { final var messageId = "4dc58197-8cfb-4099-a211-25d5c2d04f4b"; diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java index 905e26b4..1d38cea5 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java @@ -14,7 +14,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.QuarkusUnitTest; -import io.restassured.filter.log.RequestLoggingFilter; import io.restassured.parsing.Parser; public class ReactiveMesssagingNatsJetstreamPushTest { @@ -36,12 +35,6 @@ public void setup() { defaultParser = Parser.JSON; } - @Test - public void health() { - given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/ready").then().statusCode(200); - given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/live").then().statusCode(200); - } - @Test public void metadata() { final var messageId = "4e54818a-c624-495a-81c8-0145ad4c9925"; diff --git a/deployment/src/test/resources/application-health.properties b/deployment/src/test/resources/application-health.properties new file mode 100644 index 00000000..f36c74a7 --- /dev/null +++ b/deployment/src/test/resources/application-health.properties @@ -0,0 +1,10 @@ +quarkus.messaging.nats.jet-stream.streams[0].name=test +quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=data + +mp.messaging.outgoing.data.connector=quarkus-jetstream +mp.messaging.outgoing.data.stream=test +mp.messaging.outgoing.data.subject=data + +mp.messaging.incoming.data-consumer.connector=quarkus-jetstream +mp.messaging.incoming.data-consumer.stream=test +mp.messaging.incoming.data-consumer.subject=data \ No newline at end of file diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index a73e7218..36da8f57 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.15.1 +:project-version: 3.15.2 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java b/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java index 48f1e808..bbbd9d18 100644 --- a/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java +++ b/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java @@ -9,6 +9,8 @@ import org.junit.jupiter.api.Test; import io.quarkus.test.junit.QuarkusTest; +import io.restassured.filter.log.RequestLoggingFilter; +import io.restassured.filter.log.ResponseLoggingFilter; @QuarkusTest public class DataResourceTest { @@ -35,17 +37,33 @@ public void data() { @Test public void healthLive() { - given() - .when().get("/q/health/live") - .then() - .statusCode(200); + await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> { + try { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/live") + .then() + .statusCode(200); + return true; + } catch (AssertionError e) { + return false; + } + }); } @Test public void healthReady() { - given() - .when().get("/q/health/ready") - .then() - .statusCode(200); + await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> { + try { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/ready") + .then() + .statusCode(200); + return true; + } catch (AssertionError e) { + return false; + } + }); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/NatsConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/NatsConfiguration.java index 1bdc8123..5b9ad8e2 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/NatsConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/NatsConfiguration.java @@ -1,5 +1,6 @@ package io.quarkiverse.reactive.messaging.nats; +import java.time.Duration; import java.util.Optional; import io.nats.client.AuthHandler; @@ -49,6 +50,16 @@ public interface NatsConfiguration { */ Optional connectionTimeout(); + /** + * Back-off delay between to attempt to re-connect to NATS + */ + Optional connectionBackoff(); + + /** + * The maximum number of attempts to attempt to re-connect to NATS + */ + Optional connectionAttempts(); + /** * The classname for the error listener */ diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java index fbcf8821..098194cb 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamBuildConfiguration.java @@ -4,6 +4,8 @@ import java.util.Optional; import java.util.Set; +import io.nats.client.api.RetentionPolicy; +import io.nats.client.api.StorageType; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; import io.smallrye.config.ConfigMapping; @@ -19,25 +21,6 @@ public interface JetStreamBuildConfiguration { @WithDefault("true") Boolean autoConfigure(); - /** - * The number of replicas a message must be stored. Default value is 1. - */ - @WithDefault("1") - Integer replicas(); - - /** - * The storage type for stream data (File or Memory). - */ - @WithDefault("File") - String storageType(); - - /** - * Declares the retention policy for the stream. @see - * Retention Policy - */ - @WithDefault("Interest") - String retentionPolicy(); - /** * If auto-configure is true the streams are created on Nats server. */ @@ -63,7 +46,7 @@ interface KeyValueStore { * The storage type (File or Memory). */ @WithDefault("File") - String storageType(); + StorageType storageType(); /** * The maximum number of bytes for this bucket @@ -108,5 +91,24 @@ interface Stream { * Stream subjects */ Set subjects(); + + /** + * The number of replicas a message must be stored. Default value is 1. + */ + @WithDefault("1") + Integer replicas(); + + /** + * The storage type for stream data (File or Memory). + */ + @WithDefault("File") + StorageType storageType(); + + /** + * Declares the retention policy for the stream. @see + * Retention Policy + */ + @WithDefault("Interest") + RetentionPolicy retentionPolicy(); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java index 2203ec0d..51f45372 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java @@ -112,20 +112,21 @@ public Flow.Subscriber> getSubscriber(Config config) { @Override public HealthReport getReadiness() { - return getHealth(); + final HealthReport.HealthReportBuilder builder = HealthReport.builder(); + processors.forEach(client -> builder.add(new HealthReport.ChannelInfo( + client.channel(), + client.readiness().healthy(), + client.readiness().message()))); + return builder.build(); } @Override public HealthReport getLiveness() { - return getHealth(); - } - - HealthReport getHealth() { final HealthReport.HealthReportBuilder builder = HealthReport.builder(); processors.forEach(client -> builder.add(new HealthReport.ChannelInfo( - client.getChannel(), - client.getStatus().healthy(), - client.getStatus().message()))); + client.channel(), + client.liveness().healthy(), + client.liveness().message()))); return builder.build(); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java index 4ee6e8f6..5d95f93e 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java @@ -4,6 +4,7 @@ import java.util.List; import org.eclipse.microprofile.reactive.messaging.Message; +import org.jboss.logging.Logger; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult; @@ -14,6 +15,7 @@ import io.smallrye.mutiny.Uni; public interface Connection extends AutoCloseable { + Logger logger = Logger.getLogger(Connection.class); boolean isConnected(); @@ -24,6 +26,7 @@ public interface Connection extends AutoCloseable { void addListener(ConnectionListener listener); default void fireEvent(ConnectionEvent event, String message) { + logger.infof("Event: %s, message: %s", event, message); listeners().forEach(listener -> listener.onEvent(event, message)); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java index dc90d66c..9b748dbf 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java @@ -1,10 +1,13 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client; +import java.time.Duration; import java.util.Optional; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.jboss.logging.Logger; + import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; @@ -21,6 +24,11 @@ @ApplicationScoped public class ConnectionFactory { + private final static Logger logger = Logger.getLogger(ConnectionFactory.class); + + private final static Duration DEFAULT_CONNECTION_BACKOFF = Duration.ofMillis(500); + private final static Long DEFAULT_CONNECTION_ATTEMPTS = 10L; + private final ExecutionHolder executionHolder; private final MessageMapper messageMapper; private final JetStreamInstrumenter instrumenter; @@ -43,7 +51,7 @@ public ConnectionFactory(ExecutionHolder executionHolder, this.streamStateMapper = streamStateMapper; } - public Uni create(ConnectionConfiguration connectionConfiguration, + public Uni> create(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, ReaderConsumerConfiguration consumerConfiguration) { return getContext() @@ -52,10 +60,15 @@ public Uni create(ConnectionConfiguration con .item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener, context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, instrumenter)))) .onItem().transformToUni(connection -> Uni.createFrom() - .item(Unchecked.supplier(() -> new ReaderSubscribeConnection<>(connection, consumerConfiguration)))); + .item(Unchecked.supplier(() -> new ReaderSubscribeConnection<>(connection, consumerConfiguration)))) + .onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage())) + .onFailure() + .retry() + .withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF)) + .atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS)); } - public Uni create(ConnectionConfiguration connectionConfiguration, + public Uni> create(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, PushConsumerConfiguration consumerConfiguration) { @@ -65,7 +78,12 @@ public Uni create(ConnectionConfiguration con .item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener, context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, instrumenter)))) .onItem().transformToUni(connection -> Uni.createFrom() - .item(Unchecked.supplier(() -> new PushSubscribeConnection<>(connection, consumerConfiguration)))); + .item(Unchecked.supplier(() -> new PushSubscribeConnection<>(connection, consumerConfiguration)))) + .onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage())) + .onFailure() + .retry() + .withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF)) + .atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS)); } public Uni create(ConnectionConfiguration connectionConfiguration, @@ -74,8 +92,12 @@ public Uni create(ConnectionConfiguration connectionConfig .onItem().transformToUni( context -> Uni.createFrom().item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener, context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, - instrumenter)))); - + instrumenter)))) + .onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage())) + .onFailure() + .retry() + .withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF)) + .atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS)); } private Optional getVertx() { diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java index 6a662783..8f8a017b 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java @@ -238,8 +238,7 @@ public Uni> publish(Message message, PublishConfiguration conf } }) .emitOn(context::runOnContext) - .onItem().transformToUni(this::acknowledge) - .onFailure().recoverWithUni(throwable -> notAcknowledge(message, throwable)); + .onFailure().transform(failure -> new PublishException(failure.getMessage(), failure)); } @Override @@ -397,17 +396,6 @@ private FetchConsumer fetchConsumer(final ConsumerContext consumerContext, final } } - private Uni> acknowledge(final Message message) { - return Uni.createFrom().completionStage(message.ack()) - .onItem().transform(v -> message); - } - - private Uni> notAcknowledge(final Message message, final Throwable throwable) { - return Uni.createFrom().completionStage(message.nack(throwable)) - .onItem().invoke(() -> logger.warnf(throwable, "Message not published: %s", throwable.getMessage())) - .onItem().transformToUni(v -> Uni.createFrom().item(message)); - } - private Uni getConsumerContext(final FetchConsumerConfiguration configuration) { return Uni.createFrom().item(Unchecked.supplier(() -> { try { diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java index d6945ef6..b8daf4df 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java @@ -16,27 +16,27 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -public class PushSubscribeConnection implements SubscribeConnection { +public class PushSubscribeConnection

implements SubscribeConnection

{ private static final Logger logger = Logger.getLogger(PushSubscribeConnection.class); private final DefaultConnection delegate; - private final PushConsumerConfiguration consumerConfiguration; + private final PushConsumerConfiguration

consumerConfiguration; private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory; private volatile JetStreamSubscription subscription; private volatile Dispatcher dispatcher; PushSubscribeConnection(DefaultConnection delegate, - PushConsumerConfiguration consumerConfiguration) { + PushConsumerConfiguration

consumerConfiguration) { this.delegate = delegate; this.consumerConfiguration = consumerConfiguration; this.pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory(); } @Override - public Multi> subscribe() { + public Multi> subscribe() { boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); - Class payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); + Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); final var subject = consumerConfiguration.subject(); return Multi.createFrom(). emitter(emitter -> { try { @@ -172,20 +172,20 @@ public Uni> resolve(String streamName, long sequence) { } @Override - public void close() throws Exception { + public void close() { try { if (subscription.isActive()) { subscription.drain(Duration.ofMillis(1000)); } - } catch (InterruptedException | IllegalStateException e) { - logger.warnf("Interrupted while draining subscription"); + } catch (Throwable failure) { + logger.warnf(failure, "Interrupted while draining subscription: %s", failure.getMessage()); } try { if (subscription != null && dispatcher != null && dispatcher.isActive()) { dispatcher.unsubscribe(subscription); } - } catch (Exception e) { - logger.errorf(e, "Failed to shutdown pull executor"); + } catch (Throwable failure) { + logger.warnf(failure, "Failed to shutdown pull executor: %s", failure.getMessage()); } delegate.close(); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java index 5f9071f0..91c1ec6c 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java @@ -20,16 +20,16 @@ import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.Context; -public class ReaderSubscribeConnection implements SubscribeConnection { +public class ReaderSubscribeConnection

implements SubscribeConnection

{ private final static Logger logger = Logger.getLogger(ReaderSubscribeConnection.class); private final DefaultConnection delegate; - private final ReaderConsumerConfiguration consumerConfiguration; + private final ReaderConsumerConfiguration

consumerConfiguration; private final io.nats.client.JetStreamReader reader; private final JetStreamSubscription subscription; ReaderSubscribeConnection(DefaultConnection delegate, - ReaderConsumerConfiguration consumerConfiguration) throws ConnectionException { + ReaderConsumerConfiguration

consumerConfiguration) throws ConnectionException { this.delegate = delegate; this.consumerConfiguration = consumerConfiguration; try { @@ -44,9 +44,9 @@ public class ReaderSubscribeConnection implements SubscribeConnection { } @Override - public Multi> subscribe() { + public Multi> subscribe() { boolean traceEnabled = consumerConfiguration.consumerConfiguration().traceEnabled(); - Class payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); + Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return Multi.createBy().repeating() .supplier(this::nextMessage) @@ -158,7 +158,7 @@ public Uni flush(Duration duration) { } @Override - public void close() throws Exception { + public void close() { try { reader.stop(); } catch (Throwable e) { @@ -203,13 +203,13 @@ private Optional nextMessage() { } @SuppressWarnings("unchecked") - private Multi> createMulti(io.nats.client.Message message, - boolean tracingEnabled, Class payloadType, Context context) { + private Multi> createMulti(io.nats.client.Message message, + boolean tracingEnabled, Class

payloadType, Context context) { if (message == null || message.getData() == null) { return Multi.createFrom().empty(); } else { return Multi.createFrom() - .item(() -> delegate.messageMapper().of(message, tracingEnabled, (Class) payloadType, context, + .item(() -> delegate.messageMapper().of(message, tracingEnabled, payloadType, context, new ExponentialBackoff( consumerConfiguration.consumerConfiguration().exponentialBackoff(), consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java index 72ca4723..c0e4692c 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeConnection.java @@ -4,8 +4,8 @@ import io.smallrye.mutiny.Multi; -public interface SubscribeConnection extends Connection { +public interface SubscribeConnection extends Connection { - Multi> subscribe(); + Multi> subscribe(); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java new file mode 100644 index 00000000..54c3cfe0 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java @@ -0,0 +1,9 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +public class SubscribeException extends RuntimeException { + + public SubscribeException(Throwable cause) { + super(cause); + } + +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionConfiguration.java index d2c941d0..1555ef3e 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionConfiguration.java @@ -1,5 +1,6 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration; +import java.time.Duration; import java.util.Optional; import io.nats.client.ErrorListener; @@ -7,33 +8,37 @@ public interface ConnectionConfiguration { - String getServers(); + String servers(); - Optional getPassword(); + Optional password(); - Optional getUsername(); + Optional username(); - Optional getToken(); + Optional token(); boolean sslEnabled(); - Optional getBufferSize(); + Optional bufferSize(); - Optional getErrorListener(); + Optional errorListener(); - Optional getConnectionTimeout(); + Optional connectionTimeout(); - Optional getCredentialPath(); + Optional credentialPath(); - Optional getKeystorePath(); + Optional keystorePath(); - Optional getKeystorePassword(); + Optional keystorePassword(); - Optional getTruststorePath(); + Optional truststorePath(); - Optional getTruststorePassword(); + Optional truststorePassword(); - Optional getTlsAlgorithm(); + Optional tlsAlgorithm(); + + Optional connectionBackoff(); + + Optional connectionAttempts(); static ConnectionConfiguration of(NatsConfiguration configuration) { return new DefaultConnectionConfiguration(configuration); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionOptionsFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionOptionsFactory.java index 8ab5ddff..9ab8bbc3 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionOptionsFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConnectionOptionsFactory.java @@ -11,7 +11,7 @@ public class ConnectionOptionsFactory { public Options create(ConnectionConfiguration configuration, io.nats.client.ConnectionListener connectionListener) throws NoSuchAlgorithmException { - final var servers = configuration.getServers().split(","); + final var servers = configuration.servers().split(","); final var optionsBuilder = new Options.Builder(); optionsBuilder.servers(servers); optionsBuilder.maxReconnects(0); @@ -19,21 +19,21 @@ public Options create(ConnectionConfiguration configuration, optionsBuilder.connectionListener(connectionListener); } optionsBuilder.errorListener(getErrorListener(configuration)); - configuration.getUsername() - .ifPresent(username -> optionsBuilder.userInfo(username, configuration.getPassword().orElse(""))); - configuration.getToken().map(String::toCharArray).ifPresent(optionsBuilder::token); - configuration.getCredentialPath().ifPresent(optionsBuilder::credentialPath); - configuration.getKeystorePath().ifPresent(optionsBuilder::keystorePath); - configuration.getKeystorePassword().map(String::toCharArray).ifPresent(optionsBuilder::keystorePassword); - configuration.getTruststorePath().ifPresent(optionsBuilder::truststorePath); - configuration.getKeystorePassword().map(String::toCharArray).ifPresent(optionsBuilder::truststorePassword); - configuration.getBufferSize().ifPresent(optionsBuilder::bufferSize); - configuration.getConnectionTimeout() + configuration.username() + .ifPresent(username -> optionsBuilder.userInfo(username, configuration.password().orElse(""))); + configuration.token().map(String::toCharArray).ifPresent(optionsBuilder::token); + configuration.credentialPath().ifPresent(optionsBuilder::credentialPath); + configuration.keystorePath().ifPresent(optionsBuilder::keystorePath); + configuration.keystorePassword().map(String::toCharArray).ifPresent(optionsBuilder::keystorePassword); + configuration.truststorePath().ifPresent(optionsBuilder::truststorePath); + configuration.truststorePassword().map(String::toCharArray).ifPresent(optionsBuilder::truststorePassword); + configuration.bufferSize().ifPresent(optionsBuilder::bufferSize); + configuration.connectionTimeout() .ifPresent(connectionTimeout -> optionsBuilder.connectionTimeout(Duration.ofMillis(connectionTimeout))); if (configuration.sslEnabled()) { optionsBuilder.opentls(); } - configuration.getTlsAlgorithm().ifPresent(optionsBuilder::tlsAlgorithm); + configuration.tlsAlgorithm().ifPresent(optionsBuilder::tlsAlgorithm); return optionsBuilder.build(); } @@ -43,7 +43,7 @@ public Options create(ConnectionConfiguration configuration) } private ErrorListener getErrorListener(ConnectionConfiguration configuration) { - return configuration.getErrorListener() + return configuration.errorListener() .orElseGet(DefaultErrorListener::new); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultConnectionConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultConnectionConfiguration.java index 0bbbded8..1320cb38 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultConnectionConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/DefaultConnectionConfiguration.java @@ -1,6 +1,7 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration; import java.lang.reflect.InvocationTargetException; +import java.time.Duration; import java.util.Optional; import io.nats.client.ErrorListener; @@ -15,27 +16,27 @@ class DefaultConnectionConfiguration implements ConnectionConfiguration { } @Override - public String getServers() { + public String servers() { return configuration.servers(); } @Override - public Optional getPassword() { + public Optional password() { return configuration.password(); } @Override - public Optional getUsername() { + public Optional username() { return configuration.username(); } @Override - public Optional getToken() { + public Optional token() { return configuration.token(); } @Override - public Optional getCredentialPath() { + public Optional credentialPath() { return configuration.credentialPath(); } @@ -45,45 +46,55 @@ public boolean sslEnabled() { } @Override - public Optional getBufferSize() { + public Optional bufferSize() { return configuration.bufferSize(); } @Override - public Optional getErrorListener() { + public Optional errorListener() { return configuration.errorListener().map(this::getInstanceOfErrorListener); } @Override - public Optional getConnectionTimeout() { + public Optional connectionTimeout() { return configuration.connectionTimeout(); } @Override - public Optional getKeystorePath() { + public Optional keystorePath() { return configuration.keystorePath(); } @Override - public Optional getKeystorePassword() { + public Optional keystorePassword() { return configuration.keystorePassword(); } @Override - public Optional getTruststorePath() { + public Optional truststorePath() { return configuration.truststorePath(); } @Override - public Optional getTruststorePassword() { + public Optional truststorePassword() { return configuration.truststorePassword(); } @Override - public Optional getTlsAlgorithm() { + public Optional tlsAlgorithm() { return configuration.tlsAlgorithm(); } + @Override + public Optional connectionBackoff() { + return configuration.connectionBackoff(); + } + + @Override + public Optional connectionAttempts() { + return configuration.connectionAttempts(); + } + private ErrorListener getInstanceOfErrorListener(String className) { try { var clazz = DefaultPayloadMapper.loadClass(className); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/KeyValueSetupConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/KeyValueSetupConfiguration.java index 0a75488e..fa875260 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/KeyValueSetupConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/KeyValueSetupConfiguration.java @@ -58,7 +58,7 @@ static List of(JetStreamBuildConfiguration return configuration.keyValueStores().stream().map(store -> DefaultKeyValueSetupConfiguration.builder() .bucketName(store.bucketName()) .description(store.description()) - .storageType(StorageType.valueOf(store.storageType())) + .storageType(store.storageType()) .maxBucketSize(store.maxBucketSize()) .maxHistoryPerKey(store.maxHistoryPerKey()) .maxValueSize(store.maxValueSize()) diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/SetupConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/SetupConfiguration.java index caf9345d..cad52f45 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/SetupConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/SetupConfiguration.java @@ -23,9 +23,9 @@ static List of(JetStreamBuildConfiguration configuration) { return configuration.streams().stream().map(stream -> new DefaultSetupConfiguration( stream.name(), stream.subjects(), - configuration.replicas(), - StorageType.valueOf(configuration.storageType()), - RetentionPolicy.valueOf(configuration.retentionPolicy()))) + stream.replicas(), + stream.storageType(), + stream.retentionPolicy())) .collect(Collectors.toUnmodifiableList()); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java index f7c96dac..d86ed871 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java @@ -2,9 +2,11 @@ public interface MessageProcessor { - String getChannel(); + String channel(); - Status getStatus(); + Status readiness(); + + Status liveness(); void close(); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/Status.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/Status.java index 87f2decb..0a836767 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/Status.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/Status.java @@ -1,6 +1,8 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent; +import lombok.Builder; +@Builder public record Status(boolean healthy, String message, ConnectionEvent event) { } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java index 6f50564c..b3953fc1 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java @@ -2,7 +2,6 @@ import java.time.Duration; import java.time.ZonedDateTime; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -13,6 +12,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamConnectorIncomingConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultPayloadMapper; +import io.quarkus.runtime.configuration.DurationConverter; public class DefaultMessagePullPublisherConfiguration implements MessagePullPublisherConfiguration { private final JetStreamConnectorIncomingConfiguration configuration; @@ -145,8 +145,8 @@ public List backoff() { return configuration.getBackOff() .map(backoff -> backoff.split(",")) .map(List::of) - .map(this::getBackOff) - .orElseGet(Collections::emptyList); + .map(this::of) + .orElseGet(List::of); } @Override @@ -191,11 +191,12 @@ public Duration ackTimeout() { return Duration.parse(configuration.getAckTimeout()); } - private List getBackOff(List backoff) { - if (backoff == null || backoff.isEmpty()) { + private List of(List values) { + final var converter = new DurationConverter(); + if (values == null || values.isEmpty()) { return List.of(); } else { - return backoff.stream().map(Duration::parse).toList(); + return values.stream().map(converter::convert).toList(); } } }; diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java index 59495694..6ac3e501 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java @@ -1,19 +1,117 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import org.jboss.logging.Logger; + import io.nats.client.JetStreamApiException; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.*; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; +import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public abstract class MessagePublisherProcessor implements MessageProcessor, ConnectionListener { + private final static Logger logger = Logger.getLogger(MessagePublisherProcessor.class); + + private final static int CONSUMER_ALREADY_IN_USE = 10013; + + private final AtomicReference readiness; + private final AtomicReference liveness; + private final AtomicReference> connection; + + public MessagePublisherProcessor() { + this.readiness = new AtomicReference<>( + Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build()); + this.liveness = new AtomicReference<>( + Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build()); + this.connection = new AtomicReference<>(); + } + + @Override + public String channel() { + return configuration().channel(); + } + + @Override + public Status readiness() { + return readiness.get(); + } + + @Override + public Status liveness() { + return liveness.get(); + } + + @Override + public void close() { + try { + final var connection = this.connection.getAndSet(null); + if (connection != null) { + connection.close(); + } + } catch (Throwable failure) { + logger.warnf(failure, "Failed to close connection", failure); + } + } + + public Multi> publisher() { + return subscribe() + .onFailure().transform(this::transformFailure) + .onFailure().retry().withBackOff(configuration().retryBackoff()).indefinitely(); + } + + @Override + public void onEvent(ConnectionEvent event, String message) { + switch (event) { + case Connected -> { + this.readiness.set(Status.builder().event(event).message(message).healthy(true).build()); + this.liveness.set(Status.builder().event(event).message(message).healthy(true).build()); + } + case Closed, CommunicationFailed, Disconnected -> + this.readiness.set(Status.builder().event(event).message(message).healthy(false).build()); + case Reconnected -> + this.readiness.set(Status.builder().event(event).message(message).healthy(true).build()); + } + } -public interface MessagePublisherProcessor extends MessageProcessor, ConnectionListener { - int CONSUMER_ALREADY_IN_USE = 10013; + protected abstract MessagePublisherConfiguration configuration(); - Multi> publisher(); + protected abstract Uni> connect(); + + private Multi> subscribe() { + return getOrEstablishConnection() + .onItem().transformToMulti(SubscribeConnection::subscribe); + } - default boolean isConsumerAlreadyInUse(Throwable throwable) { + private Uni> getOrEstablishConnection() { + return Uni.createFrom().item(() -> Optional.ofNullable(connection.get()) + .filter(Connection::isConnected) + .orElse(null)) + .onItem().ifNull().switchTo(this::connect) + .onItem().invoke(this.connection::set); + } + + private SubscribeException transformFailure(final Throwable failure) { + if (isCommunicationFailure(failure) && !isConsumerAlreadyInUse(failure)) { + logger.errorf(failure, "Failed to publish messages: %s", failure.getMessage()); + Optional.ofNullable(this.connection.get()) + .ifPresent(connection -> connection.fireEvent(ConnectionEvent.CommunicationFailed, failure.getMessage())); + close(); + } + return new SubscribeException(failure); + } + + private boolean isConsumerAlreadyInUse(Throwable throwable) { if (throwable instanceof JetStreamApiException jetStreamApiException) { return jetStreamApiException.getApiErrorCode() == CONSUMER_ALREADY_IN_USE; } return false; } + + private boolean isCommunicationFailure(Throwable failure) { + return failure instanceof JetStreamApiException || failure instanceof IOException; + } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java index 67b0cc91..f6794d20 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java @@ -1,89 +1,31 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import org.jboss.logging.Logger; - -import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeConnection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; -import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -public class MessagePullPublisherProcessor implements MessagePublisherProcessor { - private final static Logger logger = Logger.getLogger(MessagePullPublisherProcessor.class); - - private final MessagePullPublisherConfiguration configuration; +public class MessagePullPublisherProcessor extends MessagePublisherProcessor { + private final MessagePullPublisherConfiguration configuration; private final ConnectionFactory connectionFactory; - private final AtomicReference status; - private final AtomicReference connection; private final ConnectionConfiguration connectionConfiguration; public MessagePullPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, - final MessagePullPublisherConfiguration configuration) { - this.configuration = configuration; + final MessagePullPublisherConfiguration configuration) { + super(); this.connectionFactory = connectionFactory; - this.status = new AtomicReference<>(new Status(true, "Not connected", ConnectionEvent.Closed)); - this.connection = new AtomicReference<>(); this.connectionConfiguration = connectionConfiguration; + this.configuration = configuration; } @Override - public Status getStatus() { - return status.get(); - } - - @Override - public void close() { - try { - final var connection = this.connection.get(); - if (connection != null) { - connection.close(); - } - } catch (Throwable failure) { - logger.warnf(failure, "Failed to close connection", failure); - } - } - - @Override - public String getChannel() { - return configuration.channel(); - } - - @Override - public Multi> publisher() { - return getOrEstablishConnection() - .onItem().transformToMulti(SubscribeConnection::subscribe) - .onFailure().invoke(throwable -> { - if (!isConsumerAlreadyInUse(throwable)) { - logger.errorf(throwable, "Failed to publish messages: %s", throwable.getMessage()); - status.set(new Status(false, throwable.getMessage(), ConnectionEvent.CommunicationFailed)); - } - }) - .onFailure().retry().withBackOff(configuration.retryBackoff()).indefinitely(); + protected MessagePublisherConfiguration configuration() { + return configuration; } @Override - public void onEvent(ConnectionEvent event, String message) { - switch (event) { - case Connected -> this.status.set(new Status(true, message, event)); - case Closed -> this.status.set(new Status(false, message, event)); - case Disconnected -> this.status.set(new Status(false, message, event)); - case Reconnected -> this.status.set(new Status(true, message, event)); - case CommunicationFailed -> this.status.set(new Status(false, message, event)); - } - } - - private Uni getOrEstablishConnection() { - return Uni.createFrom().item(() -> Optional.ofNullable(connection.get()) - .filter(Connection::isConnected) - .orElse(null)) - .onItem().ifNull().switchTo(() -> connectionFactory.create(connectionConfiguration, this, configuration)) - .onItem().invoke(this.connection::set); + protected Uni> connect() { + return connectionFactory.create(connectionConfiguration, this, configuration); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java index af546ba0..021e5a2e 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java @@ -1,94 +1,30 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.microprofile.reactive.messaging.Message; -import org.jboss.logging.Logger; - -import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent; import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeConnection; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; -import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; -import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -public class MessagePushPublisherProcessor implements MessagePublisherProcessor { - private final static Logger logger = Logger.getLogger(MessagePushPublisherProcessor.class); - +public class MessagePushPublisherProcessor extends MessagePublisherProcessor { private final MessagePushPublisherConfiguration configuration; private final ConnectionFactory connectionFactory; - private final AtomicReference status; - private final AtomicReference connection; private final ConnectionConfiguration connectionConfiguration; public MessagePushPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, final MessagePushPublisherConfiguration configuration) { this.connectionConfiguration = connectionConfiguration; - this.configuration = configuration; this.connectionFactory = connectionFactory; - this.status = new AtomicReference<>(new Status(true, "Not connected", ConnectionEvent.Closed)); - this.connection = new AtomicReference<>(); - } - - @Override - public Multi> publisher() { - return getOrEstablishConnection() - .onItem().transformToMulti(SubscribeConnection::subscribe) - .onFailure().invoke(throwable -> { - if (!isConsumerAlreadyInUse(throwable)) { - logger.errorf(throwable, "Failed to publish messages: %s", throwable.getMessage()); - final var connection = this.connection.get(); - if (connection != null) { - connection.fireEvent(ConnectionEvent.CommunicationFailed, throwable.getMessage()); - } - } - }) - .onFailure().retry().withBackOff(configuration.retryBackoff()).indefinitely(); - } - - @Override - public Status getStatus() { - return status.get(); - } - - @Override - public void close() { - try { - final var connection = this.connection.get(); - if (connection != null) { - connection.close(); - } - } catch (Throwable failure) { - logger.warnf(failure, "Failed to close connection", failure); - } + this.configuration = configuration; } @Override - public String getChannel() { - return configuration.channel(); + protected MessagePublisherConfiguration configuration() { + return configuration; } @Override - public void onEvent(ConnectionEvent event, String message) { - switch (event) { - case Connected -> this.status.set(new Status(true, message, event)); - case Closed -> this.status.set(new Status(false, message, event)); - case Disconnected -> this.status.set(new Status(false, message, event)); - case Reconnected -> this.status.set(new Status(true, message, event)); - case CommunicationFailed -> this.status.set(new Status(false, message, event)); - } - } - - private Uni getOrEstablishConnection() { - return Uni.createFrom().item(() -> Optional.ofNullable(connection.get()) - .filter(Connection::isConnected) - .orElse(null)) - .onItem().ifNull() - .switchTo(() -> connectionFactory.create(connectionConfiguration, this, configuration)) - .onItem().invoke(this.connection::set); + protected Uni connect() { + return connectionFactory.create(connectionConfiguration, this, configuration); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java index e1dabb5d..a091f1fa 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java @@ -11,7 +11,9 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.unchecked.Unchecked; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; public class MessageSubscriberProcessor implements MessageProcessor, ConnectionListener { @@ -30,29 +32,37 @@ public MessageSubscriberProcessor( this.connectionConfiguration = connectionConfiguration; this.connectionFactory = connectionFactory; this.configuration = configuration; - this.status = new AtomicReference<>(new Status(true, "Connection closed", ConnectionEvent.Closed)); + this.status = new AtomicReference<>(new Status(true, "Subscriber processor inactive", ConnectionEvent.Closed)); this.connection = new AtomicReference<>(); } public Flow.Subscriber> subscriber() { - return MultiUtils.via(m -> m.onSubscription() - .call(this::getOrEstablishConnection) - .onItem() - .transformToUniAndConcatenate(this::publish) - .onFailure() - .invoke(throwable -> connection.get().fireEvent(ConnectionEvent.CommunicationFailed, throwable.getMessage()))); + return MultiUtils.via(this::subscribe); + } + + private Multi> subscribe(Multi> subscription) { + return subscription.onItem().transformToUniAndConcatenate(this::publish); + } + @Override + public String channel() { + return configuration.getChannel(); } @Override - public Status getStatus() { - return this.status.get(); + public Status readiness() { + return status.get(); + } + + @Override + public Status liveness() { + return status.get(); } @Override public void close() { try { - final var connection = this.connection.get(); + final var connection = this.connection.getAndSet(null); if (connection != null) { connection.close(); } @@ -61,25 +71,29 @@ public void close() { } } - @Override - public String getChannel() { - return configuration.getChannel(); - } - @Override public void onEvent(ConnectionEvent event, String message) { - switch (event) { - case Connected -> this.status.set(new Status(true, message, event)); - case Closed -> this.status.set(new Status(true, message, event)); - case Disconnected -> this.status.set(new Status(false, message, event)); - case Reconnected -> this.status.set(new Status(true, message, event)); - case CommunicationFailed -> this.status.set(new Status(false, message, event)); - } + this.status.set(Status.builder().healthy(true).message(message).event(event).build()); } - private Uni> publish(Message message) { + private Uni> publish(final Message message) { return getOrEstablishConnection() - .onItem().transformToUni(connection -> connection.publish(message, configuration)); + .onItem().transformToUni(connection -> connection.publish(message, configuration)) + .onItem().transformToUni(this::acknowledge) + .onFailure().recoverWithUni(failure -> recover(message, failure)); + } + + private Uni> recover(final Message message, final Throwable failure) { + return notAcknowledge(message, failure) + .onItem().transformToUni(this::closeConnection) + .onFailure().recoverWithUni(() -> closeConnection(message)); + } + + private Uni> closeConnection(final Message message) { + return Uni.createFrom().item(Unchecked.supplier(() -> { + close(); + return message; + })); } private Uni getOrEstablishConnection() { @@ -89,4 +103,15 @@ private Uni getOrEstablishConnection() { .onItem().ifNull().switchTo(() -> connectionFactory.create(connectionConfiguration, this)) .onItem().invoke(this.connection::set); } + + private Uni> acknowledge(final Message message) { + return Uni.createFrom().completionStage(message.ack()) + .onItem().transform(v -> message); + } + + private Uni> notAcknowledge(final Message message, final Throwable throwable) { + return Uni.createFrom().completionStage(message.nack(throwable)) + .onItem().invoke(() -> logger.warnf(throwable, "Message not published: %s", throwable.getMessage())) + .onItem().transformToUni(v -> Uni.createFrom().item(message)); + } }