diff --git a/.github/project.yml b/.github/project.yml index a6a085e..0418941 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.17.9" + current-version: "3.17.10" next-version: "3.18.0-SNAPSHOT" diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index b335529..2b14b46 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.17.9 +:project-version: 3.17.10 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java index 6351b20..a92deed 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java @@ -7,6 +7,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; public interface Connection extends AutoCloseable { @@ -24,7 +25,7 @@ Uni> publish(Message message, PublishConfiguration publishConfigur Uni> next(ConsumerConfiguration configuration, Duration timeout); - Uni>> fetch(FetchConsumerConfiguration configuration); + Multi> fetch(FetchConsumerConfiguration configuration); Uni> resolve(String streamName, long sequence); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java index 72e4b14..8489459 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java @@ -121,21 +121,27 @@ public Uni> next(ConsumerConfiguration configuration, Duration tim .transformToUni( consumerContext -> Uni.createFrom().item(Unchecked.supplier(() -> consumerContext.next(timeout)))) .emitOn(context::runOnContext) - .onItem().transformToUni(message -> transformMessage(message, configuration, context())) + .onItem().ifNull().failWith(MessageNotFoundException::new) + .onItem().ifNotNull().transformToUni(message -> transformMessage(message, configuration, context())) .onItem().transformToUni(message -> tracerFactory. create(TracerType.Subscribe).withTrace(message, - new AttachContextTraceSupplier<>()))); + new AttachContextTraceSupplier<>()))) + .onFailure().transform(failure -> { + if (failure instanceof MessageNotFoundException) { + return failure; + } + return new FetchException(failure); + }); } @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override - public Uni>> fetch(FetchConsumerConfiguration configuration) { + public Multi> fetch(FetchConsumerConfiguration configuration) { final var context = context(); - return context.executeBlocking(addOrUpdateConsumer(configuration.consumerConfiguration()) - .onItem().transformToUni(consumerContext -> fetchMessages(consumerContext, configuration, context)) - .onItem().transformToMulti(messages -> Multi.createFrom().items(messages.stream())) + return addOrUpdateConsumer(configuration.consumerConfiguration()) + .onItem().transformToMulti(consumerContext -> fetchMessages(consumerContext, configuration, context)) .onItem().transformToUniAndMerge(message -> tracerFactory. create(TracerType.Subscribe).withTrace(message, new AttachContextTraceSupplier<>())) - .collect().asList()); + .onFailure().transform(FetchException::new); } @Override @@ -243,7 +249,7 @@ private Uni> addPublishMetadata(final Message message, final Publi })); } - private Uni>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration configuration, + private Multi> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration configuration, Context context) { return Multi.createFrom(). emitter(emitter -> { try { @@ -261,8 +267,7 @@ private Uni>> fetchMessages(ConsumerContext consumerContext, Fet }) .emitOn(context::runOnContext) .onItem() - .transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context)) - .collect().asList(); + .transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context)); } private FetchConsumer fetchConsumer(final ConsumerContext consumerContext,