Skip to content

Commit

Permalink
Added nextMessages method to connection (#185)
Browse files Browse the repository at this point in the history
* Added nextMessages method to connection

---------

Co-authored-by: Paw Figgé Kjeldgaard <[email protected]>
  • Loading branch information
kjeldpaw and Paw Figgé Kjeldgaard authored Sep 9, 2024
1 parent 5a7aedb commit e3054db
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "1.13.0"
current-version: "1.13.1"
next-version: "1.14.0-SNAPSHOT"

2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 1.13.0
:project-version: 1.13.1

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public interface MessageConnection extends Connection {
Expand All @@ -15,6 +16,8 @@ <T> Uni<Message<T>> publish(final Message<T> message, final PublishConfiguration

<T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> configuration);

<T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> configuration);

<T> Uni<T> getKeyValue(String bucketName, String key, Class<T> valueType);

<T> Uni<Void> putKeyValue(String bucketName, String key, T value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.mutiny.core.Context;
Expand Down Expand Up @@ -116,6 +117,14 @@ public <T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> configurati
.transformToUni(consumerContext -> nextMessage(consumerContext, configuration));
}

@Override
public <T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> configuration) {
return getConsumerContext(connection, context, configuration.stream(),
configuration.name().orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured")))
.onItem().transformToMulti(consumerContext -> nextMessages(consumerContext, configuration))
.emitOn(context::runOnContext);
}

@Override
public <T> Uni<T> getKeyValue(String bucketName, String key, Class<T> valueType) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
Expand Down Expand Up @@ -268,4 +277,31 @@ private <T> Uni<Void> addOrUpdateConsumer(FetchConsumerConfiguration<T> configur
}))
.emitOn(context::runOnContext);
}

private <T> Multi<Message<T>> nextMessages(final ConsumerContext consumerContext,
FetchConsumerConfiguration<T> configuration) {
ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
return Multi.createFrom().<Message<T>> emitter(emitter -> {
try {
try (final var fetchConsumer = fetchConsumer(consumerContext, configuration.fetchTimeout().orElse(null))) {
var message = fetchConsumer.nextMessage();
while (message != null) {
emitter.emit(messageFactory.create(
message,
configuration.traceEnabled(),
configuration.payloadType().orElse(null),
context,
new ExponentialBackoff(false, Duration.ZERO),
configuration.ackTimeout()));
message = fetchConsumer.nextMessage();
}
emitter.complete();
}
} catch (Throwable failure) {
emitter.fail(new FetchException(failure));
}
})
.emitOn(context::runOnContext)
.runSubscriptionOn(executor);
}
}

0 comments on commit e3054db

Please sign in to comment.