diff --git a/.github/project.yml b/.github/project.yml index 7d0a1a8c..dad974ad 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "1.13.1" + current-version: "1.13.2" next-version: "1.14.0-SNAPSHOT" diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index 8e8cddec..5ccde4b6 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 1.13.1 +:project-version: 1.13.2 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java index df1abbe8..be9fca29 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.java @@ -58,11 +58,9 @@ public Uni subscribe(ConnectionConfigu public Uni administration(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) { - return Uni.createFrom() - .item(Unchecked.supplier( - () -> new io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.AdministrationConnection( - connectionConfiguration, - connectionListener))); + return getContext() + .onFailure().invoke(failure -> logger.warn(failure.getMessage(), failure)) + .onItem().transformToUni(context -> administration(connectionConfiguration, connectionListener, context)); } public Uni message(ConnectionConfiguration connectionConfiguration, @@ -109,4 +107,15 @@ private Uni message(ConnectionConfiguration connect connectionListener, messageFactory, context, instrumenter))) .emitOn(context::runOnContext); } + + private Uni administration(ConnectionConfiguration connectionConfiguration, + ConnectionListener connectionListener, Context context) { + return Uni.createFrom() + .item(Unchecked + .supplier( + () -> new io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.AdministrationConnection( + connectionConfiguration, + connectionListener, context))) + .emitOn(context::runOnContext); + } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConsumerNotFoundException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConsumerNotFoundException.java new file mode 100644 index 00000000..89acdfe3 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ConsumerNotFoundException.java @@ -0,0 +1,8 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +public class ConsumerNotFoundException extends RuntimeException { + + public ConsumerNotFoundException(final String stream, final String consumerName) { + super(String.format("Consumer with name = %S not found in stream = %s", consumerName, stream)); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/AdministrationConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/AdministrationConnection.java new file mode 100644 index 00000000..d7df6663 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/AdministrationConnection.java @@ -0,0 +1,85 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx; + +import java.util.List; + +import io.nats.client.api.ConsumerInfo; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.PurgeResult; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.SetupResult; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.StreamState; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.SetupConfiguration; +import io.smallrye.mutiny.Uni; +import io.vertx.mutiny.core.Context; + +public class AdministrationConnection + extends io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.AdministrationConnection { + private final Context context; + + public AdministrationConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, + Context context) { + super(connectionConfiguration, connectionListener); + this.context = context; + } + + @Override + public Uni getConsumerInfo(String stream, String consumerName) { + return super.getConsumerInfo(stream, consumerName) + .emitOn(context::runOnContext); + } + + @Override + public Uni> getStreams() { + return super.getStreams() + .emitOn(context::runOnContext); + } + + @Override + public Uni> getSubjects(String streamName) { + return super.getSubjects(streamName) + .emitOn(context::runOnContext); + } + + @Override + public Uni> getConsumerNames(String streamName) { + return super.getConsumerNames(streamName) + .emitOn(context::runOnContext); + } + + @Override + public Uni purgeStream(String streamName) { + return super.purgeStream(streamName) + .emitOn(context::runOnContext); + } + + @Override + public Uni deleteMessage(String stream, long sequence, boolean erase) { + return super.deleteMessage(stream, sequence, erase) + .emitOn(context::runOnContext); + } + + @Override + public Uni getStreamState(String streamName) { + return super.getStreamState(streamName) + .emitOn(context::runOnContext); + } + + @Override + public Uni> purgeAllStreams() { + return super.purgeAllStreams() + .emitOn(context::runOnContext); + } + + @Override + public Uni addOrUpdateStream(SetupConfiguration setupConfiguration) { + return super.addOrUpdateStream(setupConfiguration) + .emitOn(context::runOnContext); + } + + @Override + public Uni addOrUpdateKeyValueStore(KeyValueSetupConfiguration keyValueSetupConfiguration) { + return super.addOrUpdateKeyValueStore(keyValueSetupConfiguration) + .emitOn(context::runOnContext); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java index d922d9eb..445f839a 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java @@ -6,8 +6,6 @@ import java.io.IOException; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.eclipse.microprofile.reactive.messaging.Message; import org.jboss.logging.Logger; @@ -109,18 +107,14 @@ public Uni> publish(Message message, @Override public Uni> nextMessage(FetchConsumerConfiguration configuration) { - ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); - return getConsumerContext(connection, context, configuration.stream(), - configuration.name().orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured"))) - .runSubscriptionOn(executor) + return getConsumerContext(configuration) .onItem() .transformToUni(consumerContext -> nextMessage(consumerContext, configuration)); } @Override public Multi> nextMessages(FetchConsumerConfiguration configuration) { - return getConsumerContext(connection, context, configuration.stream(), - configuration.name().orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured"))) + return getConsumerContext(configuration) .onItem().transformToMulti(consumerContext -> nextMessages(consumerContext, configuration)) .emitOn(context::runOnContext); } @@ -209,21 +203,35 @@ private Uni> notAcknowledge(final Message message, final Throw .onItem().transformToUni(v -> Uni.createFrom().item(message)); } - private Uni getConsumerContext(final io.nats.client.Connection connection, - final Context context, - final String stream, - final String consumerName) { + private Uni getConsumerContext(final FetchConsumerConfiguration configuration) { return Uni.createFrom().item(Unchecked.supplier(() -> { try { - final var streamContext = connection.getStreamContext(stream); - return streamContext.getConsumerContext(consumerName); - } catch (IOException | JetStreamApiException e) { + final var streamContext = connection.getStreamContext(configuration.stream()); + return streamContext.getConsumerContext(configuration.name() + .orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured"))); + } catch (JetStreamApiException e) { + if (e.getApiErrorCode() == 10014) { // consumer not found + throw new ConsumerNotFoundException(configuration.stream(), configuration.name().orElse(null)); + } else { + throw new FetchException(e); + } + } catch (IOException e) { throw new FetchException(e); } })) + .onFailure().recoverWithUni(failure -> handleConsumerContextFailure(configuration, failure)) .emitOn(context::runOnContext); } + private Uni handleConsumerContextFailure(final FetchConsumerConfiguration configuration, + Throwable failure) { + if (failure instanceof ConsumerNotFoundException) { + return addOrUpdateConsumer(configuration); + } else { + return Uni.createFrom().failure(failure); + } + } + private Uni nextMessage(final ConsumerContext consumerContext, final Duration timeout) { return Uni.createFrom().item(Unchecked.supplier(() -> { @@ -262,15 +270,15 @@ private Headers toJetStreamHeaders(Map> headers) { return result; } - private Uni addOrUpdateConsumer(FetchConsumerConfiguration configuration) { - return Uni.createFrom(). item(Unchecked.supplier(() -> { + private Uni addOrUpdateConsumer(FetchConsumerConfiguration configuration) { + return Uni.createFrom().item(Unchecked.supplier(() -> { try { final var factory = new ConsumerConfigurtationFactory(); final var consumerConfiguration = factory.create(configuration); final var streamContext = connection.getStreamContext(configuration.stream()); - streamContext.createOrUpdateConsumer(consumerConfiguration); + final var consumerContext = streamContext.createOrUpdateConsumer(consumerConfiguration); connection.flush(Duration.ZERO); - return null; + return consumerContext; } catch (IOException | JetStreamApiException e) { throw new FetchException(e); } @@ -280,7 +288,6 @@ private Uni addOrUpdateConsumer(FetchConsumerConfiguration configur private Multi> nextMessages(final ConsumerContext consumerContext, FetchConsumerConfiguration configuration) { - ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return Multi.createFrom().> emitter(emitter -> { try { try (final var fetchConsumer = fetchConsumer(consumerContext, configuration.fetchTimeout().orElse(null))) { @@ -301,7 +308,6 @@ private Multi> nextMessages(final ConsumerContext consumerContext emitter.fail(new FetchException(failure)); } }) - .emitOn(context::runOnContext) - .runSubscriptionOn(executor); + .emitOn(context::runOnContext); } }