diff --git a/.github/project.yml b/.github/project.yml index 8cade693..f56af920 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "3.15.4" + current-version: "3.15.5" next-version: "3.16.0-SNAPSHOT" diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java new file mode 100644 index 00000000..7b469e57 --- /dev/null +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReaderSubscribeConnectionTest.java @@ -0,0 +1,229 @@ +package io.quarkiverse.reactive.messaging.nats.jetstream.test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import jakarta.inject.Inject; + +import org.jboss.logging.Logger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.nats.client.api.AckPolicy; +import io.nats.client.api.DeliverPolicy; +import io.nats.client.api.ReplayPolicy; +import io.quarkiverse.reactive.messaging.nats.NatsConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration; +import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration; +import io.quarkus.test.QuarkusUnitTest; + +public class ReaderSubscribeConnectionTest { + private Logger logger = Logger.getLogger(ReaderSubscribeConnectionTest.class); + + @RegisterExtension + static QuarkusUnitTest runner = new QuarkusUnitTest() + .withConfigurationResource("application-reader.properties"); + + @Inject + NatsConfiguration natsConfiguration; + + @Inject + ConnectionFactory connectionFactory; + + @Test + void createConnectionWithModifiedConfiguration() throws Exception { + var consumerConfiguration = createConsumerConfiguration(List.of(Duration.ofSeconds(10)), 2L); + + try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), + (event, message) -> { + }, + consumerConfiguration).await().atMost(Duration.ofSeconds(30))) { + logger.info("Connected to NATS"); + final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name()) + .await().atMost(Duration.ofSeconds(30)); + assertThat(consumer).isNotNull(); + assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10))); + assertThat(consumer.configuration().maxDeliver()).isEqualTo(2L); + } + + consumerConfiguration = createConsumerConfiguration(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30)), 3L); + try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), + (event, message) -> { + }, + consumerConfiguration).await().atMost(Duration.ofSeconds(30))) { + logger.info("Connected to NATS"); + final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name()) + .await().atMost(Duration.ofSeconds(30)); + assertThat(consumer).isNotNull(); + assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30))); + assertThat(consumer.configuration().maxDeliver()).isEqualTo(3L); + } + + } + + private ReaderConsumerConfiguration createConsumerConfiguration(List backoff, Long maxDeliver) { + return new ReaderConsumerConfiguration<>() { + @Override + public String subject() { + return "reader-data"; + } + + @Override + public Integer rePullAt() { + return 50; + } + + @Override + public Integer maxRequestBatch() { + return 100; + } + + @Override + public Optional maxWaiting() { + return Optional.empty(); + } + + @Override + public Optional maxRequestExpires() { + return Optional.empty(); + } + + @Override + public ConsumerConfiguration consumerConfiguration() { + return new ConsumerConfiguration<>() { + @Override + public String name() { + return "reader-data-consumer"; + } + + @Override + public String stream() { + return "reader-test"; + } + + @Override + public Optional durable() { + return Optional.of("reader-data-consumer"); + } + + @Override + public List filterSubjects() { + return List.of(); + } + + @Override + public Optional ackWait() { + return Optional.empty(); + } + + @Override + public Optional deliverPolicy() { + return Optional.empty(); + } + + @Override + public Optional startSequence() { + return Optional.empty(); + } + + @Override + public Optional startTime() { + return Optional.empty(); + } + + @Override + public Optional description() { + return Optional.empty(); + } + + @Override + public Optional inactiveThreshold() { + return Optional.empty(); + } + + @Override + public Optional maxAckPending() { + return Optional.empty(); + } + + @Override + public Optional maxDeliver() { + return Optional.of(maxDeliver); + } + + @Override + public Optional replayPolicy() { + return Optional.empty(); + } + + @Override + public Optional replicas() { + return Optional.empty(); + } + + @Override + public Optional memoryStorage() { + return Optional.empty(); + } + + @Override + public Optional sampleFrequency() { + return Optional.empty(); + } + + @Override + public Map metadata() { + return Map.of(); + } + + @Override + public List backoff() { + return backoff; + } + + @Override + public Optional ackPolicy() { + return Optional.empty(); + } + + @Override + public Optional pauseUntil() { + return Optional.empty(); + } + + @Override + public Optional> payloadType() { + return Optional.of(Object.class); + } + + @Override + public boolean traceEnabled() { + return false; + } + + @Override + public boolean exponentialBackoff() { + return false; + } + + @Override + public Duration exponentialBackoffMaxDuration() { + return null; + } + + @Override + public Duration ackTimeout() { + return Duration.ofSeconds(5); + } + }; + } + }; + } +} diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java index 2463591f..5bbdd94d 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/RequestReplyResource.java @@ -167,8 +167,8 @@ public Optional pauseUntil() { } @Override - public Optional name() { - return Optional.of(subject); + public String name() { + return subject; } @Override diff --git a/deployment/src/test/resources/application-reader.properties b/deployment/src/test/resources/application-reader.properties new file mode 100644 index 00000000..15de959a --- /dev/null +++ b/deployment/src/test/resources/application-reader.properties @@ -0,0 +1,2 @@ +quarkus.messaging.nats.jet-stream.streams[0].name=reader-test +quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=reader-data \ No newline at end of file diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index cf040593..d9f61f8a 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 3.15.4 +:project-version: 3.15.5 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/runtime/pom.xml b/runtime/pom.xml index 2b6b185c..87ad344d 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -66,6 +66,11 @@ assertj-core test + + io.quarkus + quarkus-junit5-internal + test + diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java index 4ee6e8f6..caae73c5 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/Connection.java @@ -35,6 +35,8 @@ default void fireEvent(ConnectionEvent event, String message) { Uni> getConsumerNames(String streamName); + Uni deleteConsumer(String streamName, String consumerName); + Uni purgeStream(String streamName); /** diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java index bb5793ad..8ed212e9 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.java @@ -118,6 +118,20 @@ public Uni getConsumer(String stream, String consumerName) { .emitOn(context::runOnContext); } + @Override + public Uni deleteConsumer(String streamName, String consumerName) { + return getJetStreamManagement() + .onItem().transformToUni(jsm -> Uni.createFrom(). emitter(emitter -> { + try { + jsm.deleteConsumer(streamName, consumerName); + emitter.complete(null); + } catch (Throwable failure) { + emitter.fail(new SystemException(failure)); + } + })) + .emitOn(context::runOnContext); + } + @Override public Uni> getStreams() { return getJetStreamManagement() @@ -414,11 +428,10 @@ private Uni getConsumerContext(final FetchConsumerConfigura return Uni.createFrom().item(Unchecked.supplier(() -> { try { final var streamContext = connection.getStreamContext(configuration.stream()); - return streamContext.getConsumerContext(configuration.name() - .orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured"))); + return streamContext.getConsumerContext(configuration.name()); } catch (JetStreamApiException e) { if (e.getApiErrorCode() == 10014) { // consumer not found - throw new ConsumerNotFoundException(configuration.stream(), configuration.name().orElse(null)); + throw new ConsumerNotFoundException(configuration.stream(), configuration.name()); } else { throw new FetchException(e); } @@ -477,7 +490,7 @@ private Headers toJetStreamHeaders(Map> headers) { return result; } - private Uni addOrUpdateConsumer(FetchConsumerConfiguration configuration) { + private Uni addOrUpdateConsumer(ConsumerConfiguration configuration) { return Uni.createFrom().item(Unchecked.supplier(() -> { try { final var factory = new ConsumerConfigurtationFactory(); @@ -486,8 +499,8 @@ private Uni addOrUpdateConsumer(FetchConsumerConfiguration< final var consumerContext = streamContext.createOrUpdateConsumer(consumerConfiguration); connection.flush(Duration.ZERO); return consumerContext; - } catch (IOException | JetStreamApiException e) { - throw new FetchException(e); + } catch (Throwable failure) { + throw new FetchException(failure); } })) .emitOn(context::runOnContext); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java index b8daf4df..e7c89f45 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscribeConnection.java @@ -95,6 +95,11 @@ public Uni getConsumer(String stream, String consumerName) { return delegate.getConsumer(stream, consumerName); } + @Override + public Uni deleteConsumer(String streamName, String consumerName) { + return delegate.deleteConsumer(streamName, consumerName); + } + @Override public Uni> getStreams() { return delegate.getStreams(); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java index 300ce582..9d9b1ecd 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.java @@ -1,5 +1,6 @@ package io.quarkiverse.reactive.messaging.nats.jetstream.client; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -9,6 +10,8 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.jboss.logging.Logger; +import io.nats.client.JetStream; +import io.nats.client.JetStreamApiException; import io.nats.client.JetStreamStatusException; import io.nats.client.JetStreamSubscription; import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff; @@ -38,8 +41,7 @@ public class ReaderSubscribeConnection

implements SubscribeConnection

{ try { final var jetStream = delegate.connection().jetStream(); final var optionsFactory = new PullSubscribeOptionsFactory(); - this.subscription = jetStream.subscribe(consumerConfiguration.subject(), - optionsFactory.create(consumerConfiguration)); + this.subscription = createSubscription(jetStream, consumerConfiguration, optionsFactory); this.reader = subscription.reader(consumerConfiguration.maxRequestBatch(), consumerConfiguration.rePullAt()); } catch (Throwable failure) { throw new ConnectionException(failure); @@ -95,6 +97,11 @@ public Uni> getConsumerNames(String streamName) { return delegate.getConsumerNames(streamName); } + @Override + public Uni deleteConsumer(String streamName, String consumerName) { + return delegate.deleteConsumer(streamName, consumerName); + } + @Override public Uni purgeStream(String streamName) { return delegate.purgeStream(streamName); @@ -220,4 +227,22 @@ private Multi> createMult consumerConfiguration.consumerConfiguration().ackTimeout())); } } + + /** + * Creates a subscription. + * If an IllegalArgumentException is thrown the consumer configuration is modified. + */ + private JetStreamSubscription createSubscription(JetStream jetStream, + ReaderConsumerConfiguration

consumerConfiguration, + PullSubscribeOptionsFactory optionsFactory) throws IOException, JetStreamApiException { + try { + return jetStream.subscribe(consumerConfiguration.subject(), + optionsFactory.create(consumerConfiguration)); + } catch (IllegalArgumentException e) { // consumer is modified, Existing consumer cannot be modified + deleteConsumer(consumerConfiguration.consumerConfiguration().stream(), + consumerConfiguration.consumerConfiguration().name()).await().indefinitely(); + return jetStream.subscribe(consumerConfiguration.subject(), + optionsFactory.create(consumerConfiguration)); + } + } } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ConsumerConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ConsumerConfiguration.java index 8095a6bd..a49f67f7 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ConsumerConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/api/ConsumerConfiguration.java @@ -26,7 +26,7 @@ public record ConsumerConfiguration(DeliverPolicy deliverPolicy, Duration maxExpires, Duration inactiveThreshold, Long startSeq, // server side this is unsigned - Integer maxDeliver, + Long maxDeliver, Long rateLimit, // server side this is unsigned Integer maxAckPending, Integer maxPullWaiting, diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java index cf15e0f3..9caeca5b 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfiguration.java @@ -12,7 +12,7 @@ public interface ConsumerConfiguration { - Optional name(); + String name(); String stream(); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfigurtationFactory.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfigurtationFactory.java index 311406a4..07ee6995 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfigurtationFactory.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/configuration/ConsumerConfigurtationFactory.java @@ -34,7 +34,7 @@ private io.nats.client.api.ConsumerConfiguration.Builder builder(final Consu if (!configuration.filterSubjects().isEmpty()) { builder = builder.filterSubjects(configuration.filterSubjects()); } - builder = configuration.name().map(builder::name).orElse(builder); + builder = builder.name(configuration.name()); builder = builder.ackPolicy(AckPolicy.Explicit); builder = configuration.ackWait().map(builder::ackWait).orElse(builder); builder = configuration.deliverPolicy().map(builder::deliverPolicy).orElse(builder.deliverPolicy(DeliverPolicy.All)); diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/ConsumerMapper.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/ConsumerMapper.java index b06d253d..e077843a 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/ConsumerMapper.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/mapper/ConsumerMapper.java @@ -13,6 +13,7 @@ public interface ConsumerMapper { @Mapping(source = "numWaiting", target = "waiting") @Mapping(source = "numAckPending", target = "acknowledgePending") @Mapping(source = "clusterInfo", target = "cluster") + @Mapping(source = "consumerConfiguration", target = "configuration") Consumer of(ConsumerInfo consumerInfo); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java index b3953fc1..df867a8f 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePullPublisherConfiguration.java @@ -60,8 +60,14 @@ public String subject() { public ConsumerConfiguration consumerConfiguration() { return new ConsumerConfiguration<>() { @Override - public Optional name() { - return configuration.getName(); + public String name() { + return configuration.getName() + .orElseGet(() -> durable().orElseGet(() -> String.format("%s-consumer", subject()) + .replace("*", "") + .replace(".", "") + .replace(">", "") + .replace("\\", "") + .replace("/", ""))); } @Override diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java index 9884cf3d..3dec3891 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/DefaultMessagePushPublisherConfiguration.java @@ -75,8 +75,14 @@ public Duration retryBackoff() { public ConsumerConfiguration consumerConfiguration() { return new ConsumerConfiguration<>() { @Override - public Optional name() { - return configuration.getName(); + public String name() { + return configuration.getName() + .orElseGet(() -> durable().orElseGet(() -> String.format("%s-consumer", subject()) + .replace("*", "") + .replace(".", "") + .replace(">", "") + .replace("\\", "") + .replace("/", ""))); } @Override diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherConfiguration.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherConfiguration.java index 2b51e04d..43565daa 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherConfiguration.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherConfiguration.java @@ -9,5 +9,4 @@ public interface MessagePushPublisherConfiguration static MessagePushPublisherConfiguration of(JetStreamConnectorIncomingConfiguration configuration) { return new DefaultMessagePushPublisherConfiguration(configuration); } - } diff --git a/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java b/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java index ef35d981..d6fdd3ba 100644 --- a/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java +++ b/runtime/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessorTest.java @@ -83,8 +83,8 @@ public ConsumerConfiguration consumerConfiguration() { return new ConsumerConfiguration<>() { @Override - public Optional name() { - return Optional.empty(); + public String name() { + return durable; } @Override