diff --git a/.github/project.yml b/.github/project.yml index 36074e7d..bfd2b043 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.15.2" + current-version: "3.15.3" next-version: "3.16.0-SNAPSHOT" diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/Advisory.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/Advisory.java index 31026e79..1ca91878 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/Advisory.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/Advisory.java @@ -1,70 +1,10 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.test; -public class Advisory { - private String type; - private String id; - private String timestamp; - private String stream; - private String consumer; - private long stream_seq; - private long deliveries; - - public Advisory() { - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getTimestamp() { - return timestamp; - } - - public void setTimestamp(String timestamp) { - this.timestamp = timestamp; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public String getConsumer() { - return consumer; - } - - public void setConsumer(String consumer) { - this.consumer = consumer; - } - - public long getStream_seq() { - return stream_seq; - } - - public void setStream_seq(long stream_seq) { - this.stream_seq = stream_seq; - } - - public long getDeliveries() { - return deliveries; - } - - public void setDeliveries(long deliveries) { - this.deliveries = deliveries; - } +public record Advisory(String type, + String id, + String timestamp, + String stream, + String consumer, + long stream_seq, + long deliveries) { } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java index cd039133..9ce62378 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/DeadLetterConsumingBean.java @@ -72,7 +72,7 @@ public void terminate( public Uni deadLetter(Connection connection, Message message) { logger.infof("Received dead letter on dead-letter-consumer channel: %s", message); final var advisory = message.getPayload(); - return connection. resolve(advisory.getStream(), advisory.getStream_seq()) + return connection. resolve(advisory.stream(), advisory.stream_seq()) .onItem().invoke(dataMessage -> lastData.set(dataMessage.getPayload())) .onItem().transformToUni(m -> Uni.createFrom().completionStage(message.ack())) .onFailure().recoverWithUni(throwable -> Uni.createFrom().completionStage(message.nack(throwable))); diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java index fe1e7e7f..917910fb 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ExponentialBackoffConsumingBean.java @@ -79,7 +79,7 @@ public void terminate( private Uni maxDeliveries(Connection connection, Message message) { final var advisory = message.getPayload(); - return connection. resolve(advisory.getStream(), advisory.getStream_seq()) + return connection. resolve(advisory.stream(), advisory.stream_seq()) .onItem().invoke(msg -> { maxDeliveries.get().add(msg.getPayload()); message.ack(); diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java index 36d10f82..df7c68d0 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTest.java @@ -13,6 +13,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.QuarkusUnitTest; +import io.restassured.filter.log.RequestLoggingFilter; +import io.restassured.filter.log.ResponseLoggingFilter; import io.restassured.parsing.Parser; public class ReactiveMesssagingNatsJetstreamPullTest { @@ -33,6 +35,24 @@ public void setup() { defaultParser = Parser.JSON; } + @Test + public void healthLive() { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/live") + .then() + .statusCode(200); + } + + @Test + public void healthReady() { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/ready") + .then() + .statusCode(200); + } + @Test public void metadata() { final var messageId = "4dc58197-8cfb-4099-a211-25d5c2d04f4b"; diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java index 53995628..c3cd1ff2 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPullTracingTest.java @@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.List; -import java.util.stream.Collectors; import jakarta.inject.Inject; @@ -48,7 +47,7 @@ public void tracing() { assertThat(spans).isNotEmpty(); List parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) - .collect(Collectors.toList()); + .toList(); assertEquals(1, parentSpans.size()); for (var parentSpan : parentSpans) { diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java index 1d38cea5..853e63f4 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java @@ -14,6 +14,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.QuarkusUnitTest; +import io.restassured.filter.log.RequestLoggingFilter; +import io.restassured.filter.log.ResponseLoggingFilter; import io.restassured.parsing.Parser; public class ReactiveMesssagingNatsJetstreamPushTest { @@ -35,6 +37,24 @@ public void setup() { defaultParser = Parser.JSON; } + @Test + public void healthLive() { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/live") + .then() + .statusCode(200); + } + + @Test + public void healthReady() { + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/ready") + .then() + .statusCode(200); + } + @Test public void metadata() { final var messageId = "4e54818a-c624-495a-81c8-0145ad4c9925"; diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java index 83dfde39..650eb858 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTracingTest.java @@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.List; -import java.util.stream.Collectors; import jakarta.inject.Inject; @@ -48,7 +47,7 @@ public void tracing() { assertThat(spans).isNotEmpty(); List parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) - .collect(Collectors.toList()); + .toList(); assertEquals(1, parentSpans.size()); for (var parentSpan : parentSpans) { diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index 36da8f57..aed3936b 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.15.2 +:project-version: 3.15.3 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java b/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java index bbbd9d18..5b3de904 100644 --- a/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java +++ b/integration-tests/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/it/DataResourceTest.java @@ -37,33 +37,19 @@ public void data() { @Test public void healthLive() { - await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> { - try { - given() - .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) - .when().get("/q/health/live") - .then() - .statusCode(200); - return true; - } catch (AssertionError e) { - return false; - } - }); + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/live") + .then() + .statusCode(200); } @Test public void healthReady() { - await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> { - try { - given() - .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) - .when().get("/q/health/ready") - .then() - .statusCode(200); - return true; - } catch (AssertionError e) { - return false; - } - }); + given() + .filters(new RequestLoggingFilter(), new ResponseLoggingFilter()) + .when().get("/q/health/ready") + .then() + .statusCode(200); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java index 51f45372..fcd4227f 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamConnector.java @@ -132,18 +132,19 @@ public HealthReport getLiveness() { public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object ignored) { - this.processors.forEach(MessageProcessor::close); + this.processors.forEach(processor -> processor.close().await().indefinitely()); } - private MessagePublisherProcessor createMessagePublisherProcessor(JetStreamConnectorIncomingConfiguration configuration) { + private MessagePublisherProcessor createMessagePublisherProcessor( + JetStreamConnectorIncomingConfiguration configuration) { final var connectionConfiguration = ConnectionConfiguration.of(natsConfiguration); final var type = ConsumerType.valueOf(configuration.getPublisherType()); if (ConsumerType.Pull.equals(type)) { - return new MessagePullPublisherProcessor(connectionFactory, + return new MessagePullPublisherProcessor<>(connectionFactory, connectionConfiguration, MessagePullPublisherConfiguration.of(configuration)); } else { - return new MessagePushPublisherProcessor(connectionFactory, + return new MessagePushPublisherProcessor<>(connectionFactory, connectionConfiguration, MessagePushPublisherConfiguration.of(configuration)); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java index e33175aa..b38426c2 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionEvent.java @@ -5,5 +5,6 @@ public enum ConnectionEvent { Connected, Closed, Reconnected, - CommunicationFailed + CommunicationFailed, + SubscriptionInactive, } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/InternalConnectionListener.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/InternalConnectionListener.java index b5af6a67..ea89fb53 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/InternalConnectionListener.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/InternalConnectionListener.java @@ -11,12 +11,10 @@ class InternalConnectionListener implements io.nats.client.ConnectionListener { public void connectionEvent(io.nats.client.Connection connection, Events type) { switch (type) { case CONNECTED -> this.connection.fireEvent(ConnectionEvent.Connected, "Connection established"); - case RECONNECTED -> + case RECONNECTED, RESUBSCRIBED -> this.connection.fireEvent(ConnectionEvent.Reconnected, "Connection reestablished to server"); case CLOSED -> this.connection.fireEvent(ConnectionEvent.Closed, "Connection closed"); case DISCONNECTED -> this.connection.fireEvent(ConnectionEvent.Disconnected, "Connection disconnected"); - case RESUBSCRIBED -> - this.connection.fireEvent(ConnectionEvent.Reconnected, "Connection reestablished to server"); case LAME_DUCK -> this.connection.fireEvent(ConnectionEvent.CommunicationFailed, "Lame duck mode"); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderException.java index fab921fa..d50c1b67 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderException.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderException.java @@ -5,4 +5,8 @@ public class ReaderException extends RuntimeException { public ReaderException(Throwable cause) { super(cause); } + + public ReaderException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java index 91c1ec6c..300ce582 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java @@ -15,7 +15,10 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult; import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.Context; @@ -49,11 +52,12 @@ public Multi> subscribe() { Class

payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null); ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return Multi.createBy().repeating() - .supplier(this::nextMessage) - .until(message -> !subscription.isActive()) + .uni(this::readNextMessage) + .whilst(message -> isConnected() && subscription.isActive()) .runSubscriptionOn(pullExecutor) .emitOn(runable -> delegate.context().runOnContext(runable)) - .flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, delegate.context())); + .flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, delegate.context())) + .onCompletion().invoke(() -> fireEvent(ConnectionEvent.SubscriptionInactive, "Subscription became inactive")); } @Override @@ -181,25 +185,25 @@ public void close() { delegate.close(); } - private Optional nextMessage() { - try { - return Optional.ofNullable(reader.nextMessage(consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO))); - } catch (JetStreamStatusException e) { - logger.debugf(e, e.getMessage()); - return Optional.empty(); - } catch (IllegalStateException e) { - logger.debugf(e, "The subscription became inactive for stream: %s", - consumerConfiguration.consumerConfiguration().stream()); - return Optional.empty(); - } catch (InterruptedException e) { - logger.debugf(e, "The reader was interrupted for stream: %s", - consumerConfiguration.consumerConfiguration().stream()); - return Optional.empty(); - } catch (Throwable throwable) { - logger.warnf(throwable, "Error reading next message from stream: %s", - consumerConfiguration.consumerConfiguration().stream()); - return Optional.empty(); - } + private Uni> readNextMessage() { + return Uni.createFrom().emitter(emitter -> { + try { + emitter.complete(Optional + .ofNullable(reader.nextMessage(consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO)))); + } catch (JetStreamStatusException e) { + emitter.fail(new ReaderException(e)); + } catch (IllegalStateException e) { + logger.warnf("The subscription became inactive for stream: %s", + consumerConfiguration.consumerConfiguration().stream()); + emitter.complete(Optional.empty()); + } catch (InterruptedException e) { + emitter.fail(new ReaderException(String.format("The reader was interrupted for stream: %s", + consumerConfiguration.consumerConfiguration().stream()), e)); + } catch (Throwable throwable) { + emitter.fail(new ReaderException(String.format("Error reading next message from stream: %s", + consumerConfiguration.consumerConfiguration().stream()), throwable)); + } + }); } @SuppressWarnings("unchecked") diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SetupException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SetupException.java index c13a84b1..c95b1fc8 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SetupException.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SetupException.java @@ -6,7 +6,4 @@ public SetupException(String message, Throwable cause) { super(message, cause); } - public SetupException(Throwable cause) { - super(cause); - } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java deleted file mode 100644 index 54c3cfe0..00000000 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscribeException.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.quarkiverse.reactive.messaging.nats.jetstream.client; - -public class SubscribeException extends RuntimeException { - - public SubscribeException(Throwable cause) { - super(cause); - } - -} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SystemException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SystemException.java index 3d163775..2440ba45 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SystemException.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SystemException.java @@ -6,10 +6,6 @@ public SystemException(Throwable cause) { super(cause); } - public SystemException(String message) { - super(message); - } - public SystemException(String message, Throwable cause) { super(message, cause); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PullSubscribeOptionsFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PullSubscribeOptionsFactory.java index ea56371c..032a0966 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PullSubscribeOptionsFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/PullSubscribeOptionsFactory.java @@ -11,7 +11,7 @@ public PullSubscribeOptionsFactory() { public PullSubscribeOptions create(final PullConsumerConfiguration configuration) { var builder = PullSubscribeOptions.builder(); - configuration.consumerConfiguration().durable().map(builder::durable).orElse(builder); + builder = configuration.consumerConfiguration().durable().map(builder::durable).orElse(builder); builder = builder.stream(configuration.consumerConfiguration().stream()); builder = builder.configuration(consumerConfigurationFactory.create(configuration.consumerConfiguration())); return builder.build(); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java index d86ed871..4f230f89 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/MessageProcessor.java @@ -1,6 +1,14 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors; +import java.util.concurrent.atomic.AtomicReference; + +import org.jboss.logging.Logger; + +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; +import io.smallrye.mutiny.Uni; + public interface MessageProcessor { + Logger logger = Logger.getLogger(MessageProcessor.class); String channel(); @@ -8,5 +16,20 @@ public interface MessageProcessor { Status liveness(); - void close(); + AtomicReference connection(); + + default Uni close() { + return Uni.createFrom().item(() -> { + try { + final var connection = connection().getAndSet(null); + if (connection != null) { + connection.close(); + } + return null; + } catch (Throwable failure) { + logger.warnf(failure, "Failed to close connection with message: %s", failure.getMessage()); + return null; + } + }); + } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java index 6ac3e501..bef43886 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.java @@ -1,12 +1,10 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher; -import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.jboss.logging.Logger; -import io.nats.client.JetStreamApiException; import io.quarkiverse.reactive.messaging.nats.jetstream.client.*; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; @@ -16,8 +14,6 @@ public abstract class MessagePublisherProcessor implements MessageProcessor, ConnectionListener { private final static Logger logger = Logger.getLogger(MessagePublisherProcessor.class); - private final static int CONSUMER_ALREADY_IN_USE = 10013; - private final AtomicReference readiness; private final AtomicReference liveness; private final AtomicReference> connection; @@ -46,21 +42,16 @@ public Status liveness() { } @Override - public void close() { - try { - final var connection = this.connection.getAndSet(null); - if (connection != null) { - connection.close(); - } - } catch (Throwable failure) { - logger.warnf(failure, "Failed to close connection", failure); - } + public AtomicReference connection() { + return connection; } public Multi> publisher() { return subscribe() - .onFailure().transform(this::transformFailure) - .onFailure().retry().withBackOff(configuration().retryBackoff()).indefinitely(); + .onFailure() + .invoke(failure -> logger.errorf(failure, "Failed to subscribe with message: %s", failure.getMessage())) + .onFailure().recoverWithMulti(this::recover); + } @Override @@ -70,7 +61,7 @@ public void onEvent(ConnectionEvent event, String message) { this.readiness.set(Status.builder().event(event).message(message).healthy(true).build()); this.liveness.set(Status.builder().event(event).message(message).healthy(true).build()); } - case Closed, CommunicationFailed, Disconnected -> + case Closed, CommunicationFailed, Disconnected, SubscriptionInactive -> this.readiness.set(Status.builder().event(event).message(message).healthy(false).build()); case Reconnected -> this.readiness.set(Status.builder().event(event).message(message).healthy(true).build()); @@ -81,9 +72,14 @@ public void onEvent(ConnectionEvent event, String message) { protected abstract Uni> connect(); + private Multi> recover(Throwable failure) { + return close().onItem().transformToMulti(v -> subscribe()); + } + private Multi> subscribe() { return getOrEstablishConnection() - .onItem().transformToMulti(SubscribeConnection::subscribe); + .onItem().transformToMulti(SubscribeConnection::subscribe) + .onSubscription().invoke(() -> logger.infof("Subscribed to channel %s", configuration().channel())); } private Uni> getOrEstablishConnection() { @@ -93,25 +89,4 @@ private Uni> getOrEstablishConnection() { .onItem().ifNull().switchTo(this::connect) .onItem().invoke(this.connection::set); } - - private SubscribeException transformFailure(final Throwable failure) { - if (isCommunicationFailure(failure) && !isConsumerAlreadyInUse(failure)) { - logger.errorf(failure, "Failed to publish messages: %s", failure.getMessage()); - Optional.ofNullable(this.connection.get()) - .ifPresent(connection -> connection.fireEvent(ConnectionEvent.CommunicationFailed, failure.getMessage())); - close(); - } - return new SubscribeException(failure); - } - - private boolean isConsumerAlreadyInUse(Throwable throwable) { - if (throwable instanceof JetStreamApiException jetStreamApiException) { - return jetStreamApiException.getApiErrorCode() == CONSUMER_ALREADY_IN_USE; - } - return false; - } - - private boolean isCommunicationFailure(Throwable failure) { - return failure instanceof JetStreamApiException || failure instanceof IOException; - } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java index 021e5a2e..dc7826bc 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java @@ -5,14 +5,14 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.smallrye.mutiny.Uni; -public class MessagePushPublisherProcessor extends MessagePublisherProcessor { - private final MessagePushPublisherConfiguration configuration; +public class MessagePushPublisherProcessor extends MessagePublisherProcessor { + private final MessagePushPublisherConfiguration configuration; private final ConnectionFactory connectionFactory; private final ConnectionConfiguration connectionConfiguration; public MessagePushPublisherProcessor(final ConnectionFactory connectionFactory, final ConnectionConfiguration connectionConfiguration, - final MessagePushPublisherConfiguration configuration) { + final MessagePushPublisherConfiguration configuration) { this.connectionConfiguration = connectionConfiguration; this.connectionFactory = connectionFactory; this.configuration = configuration; @@ -24,7 +24,7 @@ protected MessagePublisherConfiguration configuration() { } @Override - protected Uni connect() { + protected Uni> connect() { return connectionFactory.create(connectionConfiguration, this, configuration); } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java index a091f1fa..1f527375 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.java @@ -7,13 +7,15 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.jboss.logging.Logger; -import io.quarkiverse.reactive.messaging.nats.jetstream.client.*; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor; import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.unchecked.Unchecked; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; public class MessageSubscriberProcessor implements MessageProcessor, ConnectionListener { @@ -36,11 +38,11 @@ public MessageSubscriberProcessor( this.connection = new AtomicReference<>(); } - public Flow.Subscriber> subscriber() { + public Flow.Subscriber> subscriber() { return MultiUtils.via(this::subscribe); } - private Multi> subscribe(Multi> subscription) { + private Multi> subscribe(Multi> subscription) { return subscription.onItem().transformToUniAndConcatenate(this::publish); } @@ -60,15 +62,8 @@ public Status liveness() { } @Override - public void close() { - try { - final var connection = this.connection.getAndSet(null); - if (connection != null) { - connection.close(); - } - } catch (Throwable failure) { - logger.warnf(failure, "Failed to close connection", failure); - } + public AtomicReference connection() { + return connection; } @Override @@ -79,21 +74,14 @@ public void onEvent(ConnectionEvent event, String message) { private Uni> publish(final Message message) { return getOrEstablishConnection() .onItem().transformToUni(connection -> connection.publish(message, configuration)) - .onItem().transformToUni(this::acknowledge) - .onFailure().recoverWithUni(failure -> recover(message, failure)); + .onFailure() + .invoke(failure -> logger.errorf(failure, "Failed to publish with message: %s", failure.getMessage())) + .onFailure().recoverWithUni(() -> recover(message)); } - private Uni> recover(final Message message, final Throwable failure) { - return notAcknowledge(message, failure) - .onItem().transformToUni(this::closeConnection) - .onFailure().recoverWithUni(() -> closeConnection(message)); - } - - private Uni> closeConnection(final Message message) { - return Uni.createFrom().item(Unchecked.supplier(() -> { - close(); - return message; - })); + private Uni> recover(final Message message) { + return close() + .onItem().transformToUni(v -> publish(message)); } private Uni getOrEstablishConnection() { @@ -103,15 +91,4 @@ private Uni getOrEstablishConnection() { .onItem().ifNull().switchTo(() -> connectionFactory.create(connectionConfiguration, this)) .onItem().invoke(this.connection::set); } - - private Uni> acknowledge(final Message message) { - return Uni.createFrom().completionStage(message.ack()) - .onItem().transform(v -> message); - } - - private Uni> notAcknowledge(final Message message, final Throwable throwable) { - return Uni.createFrom().completionStage(message.nack(throwable)) - .onItem().invoke(() -> logger.warnf(throwable, "Message not published: %s", throwable.getMessage())) - .onItem().transformToUni(v -> Uni.createFrom().item(message)); - } }