From 552afebac37d3b85d18b33642f47d5b264097187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paw=20Figg=C3=A9=20Kjeldgaard?= Date: Wed, 18 Dec 2024 16:35:07 +0100 Subject: [PATCH] Fixed NPE getting next message --- .github/project.yml | 2 +- .../jetstream/test/FetchMessagesTest.java | 82 +++++++++++++++---- .../ROOT/pages/includes/attributes.adoc | 2 +- .../nats/jetstream/client/Connection.java | 3 +- .../jetstream/client/DefaultConnection.java | 47 +++++++---- .../jetstream/client/ResolveException.java | 8 ++ .../jetstream/client/SubscripeException.java | 8 ++ 7 files changed, 118 insertions(+), 34 deletions(-) create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ResolveException.java create mode 100644 runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscripeException.java diff --git a/.github/project.yml b/.github/project.yml index a6a085e..0418941 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.17.9" + current-version: "3.17.10" next-version: "3.18.0-SNAPSHOT" diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java index 9dab07b..4ce7118 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/FetchMessagesTest.java @@ -26,6 +26,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration; import io.quarkiverse.reactive.messaging.nats.jetstream.test.resources.Data; import io.quarkiverse.reactive.messaging.nats.jetstream.test.resources.TestSpanExporter; @@ -61,7 +62,7 @@ void fetchOneMessage() throws Exception { final var data = new Data("test", "52b13992-749a-4943-ab8f-2403c734c648", "46c818c9-8915-48a6-9378-b8f540b0afe2"); publish(data, "fetch-data"); - final var received = fetch("fetch-data", true); + final var received = next("fetch-data", true); assertThat(received).isEqualTo(data); } @@ -73,10 +74,10 @@ void fetchTwoMessages() throws Exception { publish(data1, "fetch-data"); publish(data2, "fetch-data"); - final var received1 = fetch("fetch-data", true); + final var received1 = next("fetch-data", true); assertThat(received1).isEqualTo(data1); - final var received2 = fetch("fetch-data", true); + final var received2 = next("fetch-data", true); assertThat(received2).isEqualTo(data2); } @@ -88,10 +89,10 @@ void fetchOneNotAcknowledgedMessage() throws Exception { publish(data1, "fetch-data"); publish(data2, "fetch-data"); - final var received1 = fetch("fetch-data", false); + final var received1 = next("fetch-data", false); assertThat(received1).isEqualTo(data1); - final var received2 = fetch("fetch-data", true); + final var received2 = next("fetch-data", true); assertThat(received2).isEqualTo(data1); } @@ -107,16 +108,16 @@ void subjectTokens() throws Exception { publish(data3, "resources." + data3.resourceId()); publish(data4, "resources." + data4.resourceId()); - final var received1 = fetch("resources." + data1.resourceId(), true); + final var received1 = next("resources." + data1.resourceId(), true); assertThat(received1).isEqualTo(data1); - final var received2 = fetch("resources." + data2.resourceId(), true); + final var received2 = next("resources." + data2.resourceId(), true); assertThat(received2).isEqualTo(data2); - final var received3 = fetch("resources." + data3.resourceId(), true); + final var received3 = next("resources." + data3.resourceId(), true); assertThat(received3).isEqualTo(data3); - final var received4 = fetch("resources." + data4.resourceId(), true); + final var received4 = next("resources." + data4.resourceId(), true); assertThat(received4).isEqualTo(data4); } @@ -131,16 +132,30 @@ void addAndRemoveSubject() throws Exception { publish(data1, data1.resourceId()); publish(data2, data2.resourceId()); - final var received1 = fetch(data1.resourceId(), true); + final var received1 = next(data1.resourceId(), true); assertThat(received1).isEqualTo(data1); - final var received2 = fetch(data2.resourceId(), true); + final var received2 = next(data2.resourceId(), true); assertThat(received2).isEqualTo(data2); removeSubject(data1.resourceId()); removeSubject(data2.resourceId()); } + @Test + void fetchMessages() throws Exception { + final var data1 = new Data("test1", "64a8903f-983a-4775-8c41-e59c1a40ca08", "5a6af883-2be2-4c73-9d5d-7cdc4157f2fb"); + final var data2 = new Data("test2", "64a8903f-983a-4775-8c41-e59c1a40ca08", "d38ddb6f-3b9c-4a6c-978e-e97c0b66a2fd"); + + addSubject(data1.resourceId()); + + publish(data1, data1.resourceId()); + publish(data2, data2.resourceId()); + + final var received = fetch(data1.resourceId(), true); + assertThat(received).containsExactly(data1, data2); + } + private void addSubject(String subject) throws Exception { try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { @@ -160,16 +175,17 @@ private void removeSubject(String subject) throws Exception { } private void publish(Data data, String subject) throws Exception { - try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), + try (final var connection = connectionFactory. create(ConnectionConfiguration.of(natsConfiguration), new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { final var publishConfiguragtion = createPublishConfiguration(subject); - connection.publish(Message.of(data), publishConfiguragtion) + final var consumerConfiguration = createConsumerConfiguration(subject); + connection.publish(Message.of(data), publishConfiguragtion, consumerConfiguration) .await() .atMost(Duration.ofSeconds(30)); } } - private Data fetch(String subject, boolean ack) throws Exception { + private Data next(String subject, boolean ack) throws Exception { try (final var connection = connectionFactory. create(ConnectionConfiguration.of(natsConfiguration), new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { final var consumerConfiguration = createConsumerConfiguration(subject); @@ -184,6 +200,44 @@ private Data fetch(String subject, boolean ack) throws Exception { } } + private List fetch(String subject, boolean ack) throws Exception { + try (final var connection = connectionFactory. create(ConnectionConfiguration.of(natsConfiguration), + new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) { + final var consumerConfiguration = createFetchConsumerConfiguration(subject); + final var received = connection.fetch(consumerConfiguration) + .onItem().transformToUniAndMerge(message -> { + if (ack) { + return Uni.createFrom().completionStage(message.ack()) + .onItem().transform(ignored -> message); + } else { + return Uni.createFrom().completionStage(message.nack(new RuntimeException())) + .onItem().transform(ignored -> message); + } + }).collect().asList() + .await().atMost(Duration.ofSeconds(30)); + return received.stream().map(Message::getPayload).toList(); + } + } + + private FetchConsumerConfiguration createFetchConsumerConfiguration(String subject) { + return new FetchConsumerConfiguration<>() { + @Override + public Duration timeout() { + return Duration.ofSeconds(3); + } + + @Override + public Integer batchSize() { + return 10; + } + + @Override + public ConsumerConfiguration consumerConfiguration() { + return createConsumerConfiguration(subject); + } + }; + } + private ConsumerConfiguration createConsumerConfiguration(String subject) { return new ConsumerConfiguration<>() { diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index b335529..2b14b46 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.17.9 +:project-version: 3.17.10 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java index 6351b20..a92deed 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java @@ -7,6 +7,7 @@ import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer; import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; public interface Connection extends AutoCloseable { @@ -24,7 +25,7 @@ Uni> publish(Message message, PublishConfiguration publishConfigur Uni> next(ConsumerConfiguration configuration, Duration timeout); - Uni>> fetch(FetchConsumerConfiguration configuration); + Multi> fetch(FetchConsumerConfiguration configuration); Uni> resolve(String streamName, long sequence); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java index 72e4b14..096fdce 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java @@ -7,6 +7,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.eclipse.microprofile.reactive.messaging.Message; @@ -110,7 +112,8 @@ public Uni> publish(Message message, PublishConfiguration publishC public Uni addConsumer(ConsumerConfiguration configuration) { return context().executeBlocking(addOrUpdateConsumer(configuration) .onItem() - .transform(Unchecked.function(consumerContext -> consumerMapper.of(consumerContext.getConsumerInfo())))); + .transform(Unchecked.function(consumerContext -> consumerMapper.of(consumerContext.getConsumerInfo())))) + .onFailure().transform(SystemException::new); } @Override @@ -121,41 +124,49 @@ public Uni> next(ConsumerConfiguration configuration, Duration tim .transformToUni( consumerContext -> Uni.createFrom().item(Unchecked.supplier(() -> consumerContext.next(timeout)))) .emitOn(context::runOnContext) - .onItem().transformToUni(message -> transformMessage(message, configuration, context())) + .onItem().ifNull().failWith(MessageNotFoundException::new) + .onItem().ifNotNull().transformToUni(message -> transformMessage(message, configuration, context())) .onItem().transformToUni(message -> tracerFactory. create(TracerType.Subscribe).withTrace(message, - new AttachContextTraceSupplier<>()))); + new AttachContextTraceSupplier<>()))) + .onFailure().transform(failure -> { + if (failure instanceof MessageNotFoundException) { + return failure; + } + return new FetchException(failure); + }); } @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override - public Uni>> fetch(FetchConsumerConfiguration configuration) { + public Multi> fetch(FetchConsumerConfiguration configuration) { final var context = context(); - return context.executeBlocking(addOrUpdateConsumer(configuration.consumerConfiguration()) - .onItem().transformToUni(consumerContext -> fetchMessages(consumerContext, configuration, context)) - .onItem().transformToMulti(messages -> Multi.createFrom().items(messages.stream())) + return addOrUpdateConsumer(configuration.consumerConfiguration()) + .onItem().transformToMulti(consumerContext -> fetchMessages(consumerContext, configuration, context)) .onItem().transformToUniAndMerge(message -> tracerFactory. create(TracerType.Subscribe).withTrace(message, new AttachContextTraceSupplier<>())) - .collect().asList()); + .onFailure().transform(FetchException::new); } @Override public Uni> resolve(String streamName, long sequence) { - return context().executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> { + return context().executeBlocking(Uni.createFrom().> item(Unchecked.supplier(() -> { final var jetStream = connection.jetStream(); final var streamContext = jetStream.getStreamContext(streamName); final var messageInfo = streamContext.getMessage(sequence); return new ResolvedMessage<>(messageInfo, payloadMapper. of(messageInfo).orElse(null)); - }))); + }))) + .onFailure().transform(ResolveException::new); } @Override public Uni> subscribe(PushConsumerConfiguration configuration) { final var context = context(); - return context.executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> { + return context.executeBlocking(Uni.createFrom().> item(Unchecked.supplier(() -> { final var subscription = new PushSubscription<>(connection, configuration, messageMapper, tracerFactory, context); subscriptions.put(configuration.subject(), subscription); return subscription; - }))); + }))) + .onFailure().transform(SubscripeException::new); } @Override @@ -166,10 +177,11 @@ public Uni> subscribe(PullConsumerConfiguration configuration .onItem() .transform(reader -> new PullSubscription<>(configuration, subscription, reader, messageMapper, tracerFactory, context))) - .onItem().transform(subscription -> { + .onItem().> transform(subscription -> { subscriptions.put(configuration.consumerConfiguration().subject(), subscription); return subscription; - })); + })) + .onFailure().transform(SubscripeException::new); } @Override @@ -243,8 +255,9 @@ private Uni> addPublishMetadata(final Message message, final Publi })); } - private Uni>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration configuration, + private Multi> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration configuration, Context context) { + ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new); return Multi.createFrom(). emitter(emitter -> { try { try (final var fetchConsumer = fetchConsumer(consumerContext, configuration)) { @@ -259,10 +272,10 @@ private Uni>> fetchMessages(ConsumerContext consumerContext, Fet emitter.fail(new FetchException(failure)); } }) + .runSubscriptionOn(executor) .emitOn(context::runOnContext) .onItem() - .transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context)) - .collect().asList(); + .transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context)); } private FetchConsumer fetchConsumer(final ConsumerContext consumerContext, diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ResolveException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ResolveException.java new file mode 100644 index 0000000..1847331 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ResolveException.java @@ -0,0 +1,8 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +public class ResolveException extends RuntimeException { + + public ResolveException(Throwable cause) { + super(cause); + } +} diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscripeException.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscripeException.java new file mode 100644 index 0000000..7569ea9 --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/SubscripeException.java @@ -0,0 +1,8 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.client; + +public class SubscripeException extends RuntimeException { + + public SubscripeException(Throwable cause) { + super(cause); + } +}