Skip to content

Commit

Permalink
Fixed NPE getting next message
Browse files Browse the repository at this point in the history
  • Loading branch information
kjeldpaw committed Dec 18, 2024
1 parent 7aeff54 commit 552afeb
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.17.9"
current-version: "3.17.10"
next-version: "3.18.0-SNAPSHOT"

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.test.resources.Data;
import io.quarkiverse.reactive.messaging.nats.jetstream.test.resources.TestSpanExporter;
Expand Down Expand Up @@ -61,7 +62,7 @@ void fetchOneMessage() throws Exception {
final var data = new Data("test", "52b13992-749a-4943-ab8f-2403c734c648", "46c818c9-8915-48a6-9378-b8f540b0afe2");
publish(data, "fetch-data");

final var received = fetch("fetch-data", true);
final var received = next("fetch-data", true);
assertThat(received).isEqualTo(data);
}

Expand All @@ -73,10 +74,10 @@ void fetchTwoMessages() throws Exception {
publish(data1, "fetch-data");
publish(data2, "fetch-data");

final var received1 = fetch("fetch-data", true);
final var received1 = next("fetch-data", true);
assertThat(received1).isEqualTo(data1);

final var received2 = fetch("fetch-data", true);
final var received2 = next("fetch-data", true);
assertThat(received2).isEqualTo(data2);
}

Expand All @@ -88,10 +89,10 @@ void fetchOneNotAcknowledgedMessage() throws Exception {
publish(data1, "fetch-data");
publish(data2, "fetch-data");

final var received1 = fetch("fetch-data", false);
final var received1 = next("fetch-data", false);
assertThat(received1).isEqualTo(data1);

final var received2 = fetch("fetch-data", true);
final var received2 = next("fetch-data", true);
assertThat(received2).isEqualTo(data1);
}

Expand All @@ -107,16 +108,16 @@ void subjectTokens() throws Exception {
publish(data3, "resources." + data3.resourceId());
publish(data4, "resources." + data4.resourceId());

final var received1 = fetch("resources." + data1.resourceId(), true);
final var received1 = next("resources." + data1.resourceId(), true);
assertThat(received1).isEqualTo(data1);

final var received2 = fetch("resources." + data2.resourceId(), true);
final var received2 = next("resources." + data2.resourceId(), true);
assertThat(received2).isEqualTo(data2);

final var received3 = fetch("resources." + data3.resourceId(), true);
final var received3 = next("resources." + data3.resourceId(), true);
assertThat(received3).isEqualTo(data3);

final var received4 = fetch("resources." + data4.resourceId(), true);
final var received4 = next("resources." + data4.resourceId(), true);
assertThat(received4).isEqualTo(data4);
}

Expand All @@ -131,16 +132,30 @@ void addAndRemoveSubject() throws Exception {
publish(data1, data1.resourceId());
publish(data2, data2.resourceId());

final var received1 = fetch(data1.resourceId(), true);
final var received1 = next(data1.resourceId(), true);
assertThat(received1).isEqualTo(data1);

final var received2 = fetch(data2.resourceId(), true);
final var received2 = next(data2.resourceId(), true);
assertThat(received2).isEqualTo(data2);

removeSubject(data1.resourceId());
removeSubject(data2.resourceId());
}

@Test
void fetchMessages() throws Exception {
final var data1 = new Data("test1", "64a8903f-983a-4775-8c41-e59c1a40ca08", "5a6af883-2be2-4c73-9d5d-7cdc4157f2fb");
final var data2 = new Data("test2", "64a8903f-983a-4775-8c41-e59c1a40ca08", "d38ddb6f-3b9c-4a6c-978e-e97c0b66a2fd");

addSubject(data1.resourceId());

publish(data1, data1.resourceId());
publish(data2, data2.resourceId());

final var received = fetch(data1.resourceId(), true);
assertThat(received).containsExactly(data1, data2);
}

private void addSubject(String subject) throws Exception {
try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {
Expand All @@ -160,16 +175,17 @@ private void removeSubject(String subject) throws Exception {
}

private void publish(Data data, String subject) throws Exception {
try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
try (final var connection = connectionFactory.<Data> create(ConnectionConfiguration.of(natsConfiguration),
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {
final var publishConfiguragtion = createPublishConfiguration(subject);
connection.publish(Message.of(data), publishConfiguragtion)
final var consumerConfiguration = createConsumerConfiguration(subject);
connection.publish(Message.of(data), publishConfiguragtion, consumerConfiguration)
.await()
.atMost(Duration.ofSeconds(30));
}
}

private Data fetch(String subject, boolean ack) throws Exception {
private Data next(String subject, boolean ack) throws Exception {
try (final var connection = connectionFactory.<Data> create(ConnectionConfiguration.of(natsConfiguration),
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {
final var consumerConfiguration = createConsumerConfiguration(subject);
Expand All @@ -184,6 +200,44 @@ private Data fetch(String subject, boolean ack) throws Exception {
}
}

private List<Data> fetch(String subject, boolean ack) throws Exception {
try (final var connection = connectionFactory.<Data> create(ConnectionConfiguration.of(natsConfiguration),
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {
final var consumerConfiguration = createFetchConsumerConfiguration(subject);
final var received = connection.fetch(consumerConfiguration)
.onItem().transformToUniAndMerge(message -> {
if (ack) {
return Uni.createFrom().completionStage(message.ack())
.onItem().transform(ignored -> message);
} else {
return Uni.createFrom().completionStage(message.nack(new RuntimeException()))
.onItem().transform(ignored -> message);
}
}).collect().asList()
.await().atMost(Duration.ofSeconds(30));
return received.stream().map(Message::getPayload).toList();
}
}

private FetchConsumerConfiguration<Data> createFetchConsumerConfiguration(String subject) {
return new FetchConsumerConfiguration<>() {
@Override
public Duration timeout() {
return Duration.ofSeconds(3);
}

@Override
public Integer batchSize() {
return 10;
}

@Override
public ConsumerConfiguration<Data> consumerConfiguration() {
return createConsumerConfiguration(subject);
}
};
}

private ConsumerConfiguration<Data> createConsumerConfiguration(String subject) {
return new ConsumerConfiguration<>() {

Expand Down
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.17.9
:project-version: 3.17.10

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public interface Connection<T> extends AutoCloseable {
Expand All @@ -24,7 +25,7 @@ Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfigur

Uni<Message<T>> next(ConsumerConfiguration<T> configuration, Duration timeout);

Uni<List<Message<T>>> fetch(FetchConsumerConfiguration<T> configuration);
Multi<Message<T>> fetch(FetchConsumerConfiguration<T> configuration);

Uni<Message<T>> resolve(String streamName, long sequence);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down Expand Up @@ -110,7 +112,8 @@ public Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishC
public Uni<Consumer> addConsumer(ConsumerConfiguration<T> configuration) {
return context().executeBlocking(addOrUpdateConsumer(configuration)
.onItem()
.transform(Unchecked.function(consumerContext -> consumerMapper.of(consumerContext.getConsumerInfo()))));
.transform(Unchecked.function(consumerContext -> consumerMapper.of(consumerContext.getConsumerInfo()))))
.onFailure().transform(SystemException::new);
}

@Override
Expand All @@ -121,41 +124,49 @@ public Uni<Message<T>> next(ConsumerConfiguration<T> configuration, Duration tim
.transformToUni(
consumerContext -> Uni.createFrom().item(Unchecked.supplier(() -> consumerContext.next(timeout))))
.emitOn(context::runOnContext)
.onItem().transformToUni(message -> transformMessage(message, configuration, context()))
.onItem().ifNull().failWith(MessageNotFoundException::new)
.onItem().ifNotNull().transformToUni(message -> transformMessage(message, configuration, context()))
.onItem().transformToUni(message -> tracerFactory.<T> create(TracerType.Subscribe).withTrace(message,
new AttachContextTraceSupplier<>())));
new AttachContextTraceSupplier<>())))
.onFailure().transform(failure -> {
if (failure instanceof MessageNotFoundException) {
return failure;
}
return new FetchException(failure);
});
}

@SuppressWarnings("ReactiveStreamsUnusedPublisher")
@Override
public Uni<List<Message<T>>> fetch(FetchConsumerConfiguration<T> configuration) {
public Multi<Message<T>> fetch(FetchConsumerConfiguration<T> configuration) {
final var context = context();
return context.executeBlocking(addOrUpdateConsumer(configuration.consumerConfiguration())
.onItem().transformToUni(consumerContext -> fetchMessages(consumerContext, configuration, context))
.onItem().transformToMulti(messages -> Multi.createFrom().items(messages.stream()))
return addOrUpdateConsumer(configuration.consumerConfiguration())
.onItem().transformToMulti(consumerContext -> fetchMessages(consumerContext, configuration, context))
.onItem().transformToUniAndMerge(message -> tracerFactory.<T> create(TracerType.Subscribe).withTrace(message,
new AttachContextTraceSupplier<>()))
.collect().asList());
.onFailure().transform(FetchException::new);
}

@Override
public Uni<Message<T>> resolve(String streamName, long sequence) {
return context().executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> {
return context().executeBlocking(Uni.createFrom().<Message<T>> item(Unchecked.supplier(() -> {
final var jetStream = connection.jetStream();
final var streamContext = jetStream.getStreamContext(streamName);
final var messageInfo = streamContext.getMessage(sequence);
return new ResolvedMessage<>(messageInfo, payloadMapper.<T> of(messageInfo).orElse(null));
})));
})))
.onFailure().transform(ResolveException::new);
}

@Override
public Uni<Subscription<T>> subscribe(PushConsumerConfiguration<T> configuration) {
final var context = context();
return context.executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> {
return context.executeBlocking(Uni.createFrom().<Subscription<T>> item(Unchecked.supplier(() -> {
final var subscription = new PushSubscription<>(connection, configuration, messageMapper, tracerFactory, context);
subscriptions.put(configuration.subject(), subscription);
return subscription;
})));
})))
.onFailure().transform(SubscripeException::new);
}

@Override
Expand All @@ -166,10 +177,11 @@ public Uni<Subscription<T>> subscribe(PullConsumerConfiguration<T> configuration
.onItem()
.transform(reader -> new PullSubscription<>(configuration, subscription, reader, messageMapper,
tracerFactory, context)))
.onItem().transform(subscription -> {
.onItem().<Subscription<T>> transform(subscription -> {
subscriptions.put(configuration.consumerConfiguration().subject(), subscription);
return subscription;
}));
}))
.onFailure().transform(SubscripeException::new);
}

@Override
Expand Down Expand Up @@ -243,8 +255,9 @@ private Uni<Message<T>> addPublishMetadata(final Message<T> message, final Publi
}));
}

private Uni<List<Message<T>>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration,
private Multi<Message<T>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration,
Context context) {
ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
return Multi.createFrom().<io.nats.client.Message> emitter(emitter -> {
try {
try (final var fetchConsumer = fetchConsumer(consumerContext, configuration)) {
Expand All @@ -259,10 +272,10 @@ private Uni<List<Message<T>>> fetchMessages(ConsumerContext consumerContext, Fet
emitter.fail(new FetchException(failure));
}
})
.runSubscriptionOn(executor)
.emitOn(context::runOnContext)
.onItem()
.transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context))
.collect().asList();
.transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context));
}

private FetchConsumer fetchConsumer(final ConsumerContext consumerContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

public class ResolveException extends RuntimeException {

public ResolveException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

public class SubscripeException extends RuntimeException {

public SubscripeException(Throwable cause) {
super(cause);
}
}

0 comments on commit 552afeb

Please sign in to comment.