Skip to content

Commit

Permalink
Fixed NPE getting next message
Browse files Browse the repository at this point in the history
  • Loading branch information
kjeldpaw committed Dec 18, 2024
1 parent 7aeff54 commit 4122bfb
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.17.9"
current-version: "3.17.10"
next-version: "3.18.0-SNAPSHOT"

2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.17.9
:project-version: 3.17.10

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public interface Connection<T> extends AutoCloseable {
Expand All @@ -24,7 +25,7 @@ Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfigur

Uni<Message<T>> next(ConsumerConfiguration<T> configuration, Duration timeout);

Uni<List<Message<T>>> fetch(FetchConsumerConfiguration<T> configuration);
Multi<Message<T>> fetch(FetchConsumerConfiguration<T> configuration);

Uni<Message<T>> resolve(String streamName, long sequence);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,27 @@ public Uni<Message<T>> next(ConsumerConfiguration<T> configuration, Duration tim
.transformToUni(
consumerContext -> Uni.createFrom().item(Unchecked.supplier(() -> consumerContext.next(timeout))))
.emitOn(context::runOnContext)
.onItem().transformToUni(message -> transformMessage(message, configuration, context()))
.onItem().ifNull().failWith(MessageNotFoundException::new)
.onItem().ifNotNull().transformToUni(message -> transformMessage(message, configuration, context()))
.onItem().transformToUni(message -> tracerFactory.<T> create(TracerType.Subscribe).withTrace(message,
new AttachContextTraceSupplier<>())));
new AttachContextTraceSupplier<>())))
.onFailure().transform(failure -> {
if (failure instanceof MessageNotFoundException) {
return failure;
}
return new FetchException(failure);
});
}

@SuppressWarnings("ReactiveStreamsUnusedPublisher")
@Override
public Uni<List<Message<T>>> fetch(FetchConsumerConfiguration<T> configuration) {
public Multi<Message<T>> fetch(FetchConsumerConfiguration<T> configuration) {
final var context = context();
return context.executeBlocking(addOrUpdateConsumer(configuration.consumerConfiguration())
.onItem().transformToUni(consumerContext -> fetchMessages(consumerContext, configuration, context))
.onItem().transformToMulti(messages -> Multi.createFrom().items(messages.stream()))
return addOrUpdateConsumer(configuration.consumerConfiguration())
.onItem().transformToMulti(consumerContext -> fetchMessages(consumerContext, configuration, context))
.onItem().transformToUniAndMerge(message -> tracerFactory.<T> create(TracerType.Subscribe).withTrace(message,
new AttachContextTraceSupplier<>()))
.collect().asList());
.onFailure().transform(FetchException::new);
}

@Override
Expand Down Expand Up @@ -243,7 +249,7 @@ private Uni<Message<T>> addPublishMetadata(final Message<T> message, final Publi
}));
}

private Uni<List<Message<T>>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration,
private Multi<Message<T>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration,
Context context) {
return Multi.createFrom().<io.nats.client.Message> emitter(emitter -> {
try {
Expand All @@ -261,8 +267,7 @@ private Uni<List<Message<T>>> fetchMessages(ConsumerContext consumerContext, Fet
})
.emitOn(context::runOnContext)
.onItem()
.transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context))
.collect().asList();
.transformToUniAndMerge(message -> transformMessage(message, configuration.consumerConfiguration(), context));
}

private FetchConsumer fetchConsumer(final ConsumerContext consumerContext,
Expand Down

0 comments on commit 4122bfb

Please sign in to comment.