Skip to content

Commit

Permalink
Merge pull request #187 from quarkiverse/feature/consumer-not-found
Browse files Browse the repository at this point in the history
Create consumer if not exists on nextMessage
  • Loading branch information
kjeldpaw authored Sep 11, 2024
2 parents 51e7def + d7c8ac4 commit 138bd81
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "1.13.1"
current-version: "1.13.2"
next-version: "1.14.0-SNAPSHOT"

2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 1.13.1
:project-version: 1.13.2

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ public <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfigu

public Uni<? extends AdministrationConnection> administration(ConnectionConfiguration connectionConfiguration,
ConnectionListener connectionListener) {
return Uni.createFrom()
.item(Unchecked.supplier(
() -> new io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.AdministrationConnection(
connectionConfiguration,
connectionListener)));
return getContext()
.onFailure().invoke(failure -> logger.warn(failure.getMessage(), failure))
.onItem().transformToUni(context -> administration(connectionConfiguration, connectionListener, context));
}

public Uni<? extends MessageConnection> message(ConnectionConfiguration connectionConfiguration,
Expand Down Expand Up @@ -109,4 +107,15 @@ private Uni<? extends MessageConnection> message(ConnectionConfiguration connect
connectionListener, messageFactory, context, instrumenter)))
.emitOn(context::runOnContext);
}

private Uni<? extends AdministrationConnection> administration(ConnectionConfiguration connectionConfiguration,
ConnectionListener connectionListener, Context context) {
return Uni.createFrom()
.item(Unchecked
.supplier(
() -> new io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.AdministrationConnection(
connectionConfiguration,
connectionListener, context)))
.emitOn(context::runOnContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

public class ConsumerNotFoundException extends RuntimeException {

public ConsumerNotFoundException(final String stream, final String consumerName) {
super(String.format("Consumer with name = %S not found in stream = %s", consumerName, stream));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import java.util.List;

import io.nats.client.api.ConsumerInfo;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.SetupResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.SetupConfiguration;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;

public class AdministrationConnection
extends io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.AdministrationConnection {
private final Context context;

public AdministrationConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener,
Context context) {
super(connectionConfiguration, connectionListener);
this.context = context;
}

@Override
public Uni<ConsumerInfo> getConsumerInfo(String stream, String consumerName) {
return super.getConsumerInfo(stream, consumerName)
.emitOn(context::runOnContext);
}

@Override
public Uni<List<String>> getStreams() {
return super.getStreams()
.emitOn(context::runOnContext);
}

@Override
public Uni<List<String>> getSubjects(String streamName) {
return super.getSubjects(streamName)
.emitOn(context::runOnContext);
}

@Override
public Uni<List<String>> getConsumerNames(String streamName) {
return super.getConsumerNames(streamName)
.emitOn(context::runOnContext);
}

@Override
public Uni<PurgeResult> purgeStream(String streamName) {
return super.purgeStream(streamName)
.emitOn(context::runOnContext);
}

@Override
public Uni<Void> deleteMessage(String stream, long sequence, boolean erase) {
return super.deleteMessage(stream, sequence, erase)
.emitOn(context::runOnContext);
}

@Override
public Uni<StreamState> getStreamState(String streamName) {
return super.getStreamState(streamName)
.emitOn(context::runOnContext);
}

@Override
public Uni<List<PurgeResult>> purgeAllStreams() {
return super.purgeAllStreams()
.emitOn(context::runOnContext);
}

@Override
public Uni<SetupResult> addOrUpdateStream(SetupConfiguration setupConfiguration) {
return super.addOrUpdateStream(setupConfiguration)
.emitOn(context::runOnContext);
}

@Override
public Uni<Void> addOrUpdateKeyValueStore(KeyValueSetupConfiguration keyValueSetupConfiguration) {
return super.addOrUpdateKeyValueStore(keyValueSetupConfiguration)
.emitOn(context::runOnContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -109,18 +107,14 @@ public <T> Uni<Message<T>> publish(Message<T> message,

@Override
public <T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> configuration) {
ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
return getConsumerContext(connection, context, configuration.stream(),
configuration.name().orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured")))
.runSubscriptionOn(executor)
return getConsumerContext(configuration)
.onItem()
.transformToUni(consumerContext -> nextMessage(consumerContext, configuration));
}

@Override
public <T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> configuration) {
return getConsumerContext(connection, context, configuration.stream(),
configuration.name().orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured")))
return getConsumerContext(configuration)
.onItem().transformToMulti(consumerContext -> nextMessages(consumerContext, configuration))
.emitOn(context::runOnContext);
}
Expand Down Expand Up @@ -209,21 +203,35 @@ private <T> Uni<Message<T>> notAcknowledge(final Message<T> message, final Throw
.onItem().transformToUni(v -> Uni.createFrom().item(message));
}

private Uni<ConsumerContext> getConsumerContext(final io.nats.client.Connection connection,
final Context context,
final String stream,
final String consumerName) {
private <T> Uni<ConsumerContext> getConsumerContext(final FetchConsumerConfiguration<T> configuration) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
try {
final var streamContext = connection.getStreamContext(stream);
return streamContext.getConsumerContext(consumerName);
} catch (IOException | JetStreamApiException e) {
final var streamContext = connection.getStreamContext(configuration.stream());
return streamContext.getConsumerContext(configuration.name()
.orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured")));
} catch (JetStreamApiException e) {
if (e.getApiErrorCode() == 10014) { // consumer not found
throw new ConsumerNotFoundException(configuration.stream(), configuration.name().orElse(null));
} else {
throw new FetchException(e);
}
} catch (IOException e) {
throw new FetchException(e);
}
}))
.onFailure().recoverWithUni(failure -> handleConsumerContextFailure(configuration, failure))
.emitOn(context::runOnContext);
}

private <T> Uni<ConsumerContext> handleConsumerContextFailure(final FetchConsumerConfiguration<T> configuration,
Throwable failure) {
if (failure instanceof ConsumerNotFoundException) {
return addOrUpdateConsumer(configuration);
} else {
return Uni.createFrom().failure(failure);
}
}

private Uni<io.nats.client.Message> nextMessage(final ConsumerContext consumerContext,
final Duration timeout) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
Expand Down Expand Up @@ -262,15 +270,15 @@ private Headers toJetStreamHeaders(Map<String, List<String>> headers) {
return result;
}

private <T> Uni<Void> addOrUpdateConsumer(FetchConsumerConfiguration<T> configuration) {
return Uni.createFrom().<Void> item(Unchecked.supplier(() -> {
private <T> Uni<ConsumerContext> addOrUpdateConsumer(FetchConsumerConfiguration<T> configuration) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
try {
final var factory = new ConsumerConfigurtationFactory();
final var consumerConfiguration = factory.create(configuration);
final var streamContext = connection.getStreamContext(configuration.stream());
streamContext.createOrUpdateConsumer(consumerConfiguration);
final var consumerContext = streamContext.createOrUpdateConsumer(consumerConfiguration);
connection.flush(Duration.ZERO);
return null;
return consumerContext;
} catch (IOException | JetStreamApiException e) {
throw new FetchException(e);
}
Expand All @@ -280,7 +288,6 @@ private <T> Uni<Void> addOrUpdateConsumer(FetchConsumerConfiguration<T> configur

private <T> Multi<Message<T>> nextMessages(final ConsumerContext consumerContext,
FetchConsumerConfiguration<T> configuration) {
ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
return Multi.createFrom().<Message<T>> emitter(emitter -> {
try {
try (final var fetchConsumer = fetchConsumer(consumerContext, configuration.fetchTimeout().orElse(null))) {
Expand All @@ -301,7 +308,6 @@ private <T> Multi<Message<T>> nextMessages(final ConsumerContext consumerContext
emitter.fail(new FetchException(failure));
}
})
.emitOn(context::runOnContext)
.runSubscriptionOn(executor);
.emitOn(context::runOnContext);
}
}

0 comments on commit 138bd81

Please sign in to comment.