Skip to content

Commit

Permalink
Delete consumer and re-create subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
kjeldpaw committed Oct 10, 2024
1 parent 5fc9708 commit 45c676b
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import jakarta.inject.Inject;

import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.nats.client.api.AckPolicy;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.ReplayPolicy;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkus.test.QuarkusUnitTest;

public class ReaderSubscribeConnectionTest {
private Logger logger = Logger.getLogger(ReaderSubscribeConnectionTest.class);

@RegisterExtension
static QuarkusUnitTest runner = new QuarkusUnitTest()
.withConfigurationResource("application-reader.properties");

@Inject
NatsConfiguration natsConfiguration;

@Inject
ConnectionFactory connectionFactory;

@Test
void createConnectionWithModifiedConfiguration() throws Exception {
var consumerConfiguration = createConsumerConfiguration(List.of(Duration.ofSeconds(10)), 2L);

try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
(event, message) -> {
},
consumerConfiguration).await().atMost(Duration.ofSeconds(30))) {
logger.info("Connected to NATS");
final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name())
.await().atMost(Duration.ofSeconds(30));
assertThat(consumer).isNotNull();
assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10)));
assertThat(consumer.configuration().maxDeliver()).isEqualTo(2L);
}

consumerConfiguration = createConsumerConfiguration(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30)), 3L);
try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
(event, message) -> {
},
consumerConfiguration).await().atMost(Duration.ofSeconds(30))) {
logger.info("Connected to NATS");
final var consumer = connection.getConsumer("reader-test", consumerConfiguration.consumerConfiguration().name())
.await().atMost(Duration.ofSeconds(30));
assertThat(consumer).isNotNull();
assertThat(consumer.configuration().backoff()).isEqualTo(List.of(Duration.ofSeconds(10), Duration.ofSeconds(30)));
assertThat(consumer.configuration().maxDeliver()).isEqualTo(3L);
}

}

private ReaderConsumerConfiguration<Object> createConsumerConfiguration(List<Duration> backoff, Long maxDeliver) {
return new ReaderConsumerConfiguration<>() {
@Override
public String subject() {
return "reader-data";
}

@Override
public Integer rePullAt() {
return 50;
}

@Override
public Integer maxRequestBatch() {
return 100;
}

@Override
public Optional<Integer> maxWaiting() {
return Optional.empty();
}

@Override
public Optional<Duration> maxRequestExpires() {
return Optional.empty();
}

@Override
public ConsumerConfiguration<Object> consumerConfiguration() {
return new ConsumerConfiguration<>() {
@Override
public String name() {
return "reader-data-consumer";
}

@Override
public String stream() {
return "reader-test";
}

@Override
public Optional<String> durable() {
return Optional.of("reader-data-consumer");
}

@Override
public List<String> filterSubjects() {
return List.of();
}

@Override
public Optional<Duration> ackWait() {
return Optional.empty();
}

@Override
public Optional<DeliverPolicy> deliverPolicy() {
return Optional.empty();
}

@Override
public Optional<Long> startSequence() {
return Optional.empty();
}

@Override
public Optional<ZonedDateTime> startTime() {
return Optional.empty();
}

@Override
public Optional<String> description() {
return Optional.empty();
}

@Override
public Optional<Duration> inactiveThreshold() {
return Optional.empty();
}

@Override
public Optional<Long> maxAckPending() {
return Optional.empty();
}

@Override
public Optional<Long> maxDeliver() {
return Optional.of(maxDeliver);
}

@Override
public Optional<ReplayPolicy> replayPolicy() {
return Optional.empty();
}

@Override
public Optional<Integer> replicas() {
return Optional.empty();
}

@Override
public Optional<Boolean> memoryStorage() {
return Optional.empty();
}

@Override
public Optional<String> sampleFrequency() {
return Optional.empty();
}

@Override
public Map<String, String> metadata() {
return Map.of();
}

@Override
public List<Duration> backoff() {
return backoff;
}

@Override
public Optional<AckPolicy> ackPolicy() {
return Optional.empty();
}

@Override
public Optional<ZonedDateTime> pauseUntil() {
return Optional.empty();
}

@Override
public Optional<Class<Object>> payloadType() {
return Optional.of(Object.class);
}

@Override
public boolean traceEnabled() {
return false;
}

@Override
public boolean exponentialBackoff() {
return false;
}

@Override
public Duration exponentialBackoffMaxDuration() {
return null;
}

@Override
public Duration ackTimeout() {
return Duration.ofSeconds(5);
}
};
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public Optional<ZonedDateTime> pauseUntil() {
}

@Override
public Optional<String> name() {
return Optional.of(subject);
public String name() {
return subject;
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions deployment/src/test/resources/application-reader.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quarkus.messaging.nats.jet-stream.streams[0].name=reader-test
quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=reader-data
5 changes: 5 additions & 0 deletions runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ default void fireEvent(ConnectionEvent event, String message) {

Uni<List<String>> getConsumerNames(String streamName);

Uni<Void> deleteConsumer(String streamName, String consumerName);

Uni<PurgeResult> purgeStream(String streamName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ public Uni<Consumer> getConsumer(String stream, String consumerName) {
.emitOn(context::runOnContext);
}

@Override
public Uni<Void> deleteConsumer(String streamName, String consumerName) {
return getJetStreamManagement()
.onItem().transformToUni(jsm -> Uni.createFrom().<Void> emitter(emitter -> {
try {
jsm.deleteConsumer(streamName, consumerName);
emitter.complete(null);
} catch (Throwable failure) {
emitter.fail(new SystemException(failure));
}
}))
.emitOn(context::runOnContext);
}

@Override
public Uni<List<String>> getStreams() {
return getJetStreamManagement()
Expand Down Expand Up @@ -414,11 +428,10 @@ private <T> Uni<ConsumerContext> getConsumerContext(final FetchConsumerConfigura
return Uni.createFrom().item(Unchecked.supplier(() -> {
try {
final var streamContext = connection.getStreamContext(configuration.stream());
return streamContext.getConsumerContext(configuration.name()
.orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured")));
return streamContext.getConsumerContext(configuration.name());
} catch (JetStreamApiException e) {
if (e.getApiErrorCode() == 10014) { // consumer not found
throw new ConsumerNotFoundException(configuration.stream(), configuration.name().orElse(null));
throw new ConsumerNotFoundException(configuration.stream(), configuration.name());
} else {
throw new FetchException(e);
}
Expand Down Expand Up @@ -477,7 +490,7 @@ private Headers toJetStreamHeaders(Map<String, List<String>> headers) {
return result;
}

private <T> Uni<ConsumerContext> addOrUpdateConsumer(FetchConsumerConfiguration<T> configuration) {
private <T> Uni<ConsumerContext> addOrUpdateConsumer(ConsumerConfiguration<T> configuration) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
try {
final var factory = new ConsumerConfigurtationFactory();
Expand All @@ -486,8 +499,8 @@ private <T> Uni<ConsumerContext> addOrUpdateConsumer(FetchConsumerConfiguration<
final var consumerContext = streamContext.createOrUpdateConsumer(consumerConfiguration);
connection.flush(Duration.ZERO);
return consumerContext;
} catch (IOException | JetStreamApiException e) {
throw new FetchException(e);
} catch (Throwable failure) {
throw new FetchException(failure);
}
}))
.emitOn(context::runOnContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public Uni<Consumer> getConsumer(String stream, String consumerName) {
return delegate.getConsumer(stream, consumerName);
}

@Override
public Uni<Void> deleteConsumer(String streamName, String consumerName) {
return delegate.deleteConsumer(streamName, consumerName);
}

@Override
public Uni<List<String>> getStreams() {
return delegate.getStreams();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
Expand All @@ -9,6 +10,8 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
Expand Down Expand Up @@ -38,8 +41,7 @@ public class ReaderSubscribeConnection<P> implements SubscribeConnection<P> {
try {
final var jetStream = delegate.connection().jetStream();
final var optionsFactory = new PullSubscribeOptionsFactory();
this.subscription = jetStream.subscribe(consumerConfiguration.subject(),
optionsFactory.create(consumerConfiguration));
this.subscription = createSubscription(jetStream, consumerConfiguration, optionsFactory);
this.reader = subscription.reader(consumerConfiguration.maxRequestBatch(), consumerConfiguration.rePullAt());
} catch (Throwable failure) {
throw new ConnectionException(failure);
Expand Down Expand Up @@ -95,6 +97,11 @@ public Uni<List<String>> getConsumerNames(String streamName) {
return delegate.getConsumerNames(streamName);
}

@Override
public Uni<Void> deleteConsumer(String streamName, String consumerName) {
return delegate.deleteConsumer(streamName, consumerName);
}

@Override
public Uni<PurgeResult> purgeStream(String streamName) {
return delegate.purgeStream(streamName);
Expand Down Expand Up @@ -220,4 +227,22 @@ private Multi<org.eclipse.microprofile.reactive.messaging.Message<P>> createMult
consumerConfiguration.consumerConfiguration().ackTimeout()));
}
}

/**
* Creates a subscription.
* If an IllegalArgumentException is thrown the consumer configuration is modified.
*/
private JetStreamSubscription createSubscription(JetStream jetStream,
ReaderConsumerConfiguration<P> consumerConfiguration,
PullSubscribeOptionsFactory optionsFactory) throws IOException, JetStreamApiException {
try {
return jetStream.subscribe(consumerConfiguration.subject(),
optionsFactory.create(consumerConfiguration));
} catch (IllegalArgumentException e) { // consumer is modified, Existing consumer cannot be modified
deleteConsumer(consumerConfiguration.consumerConfiguration().stream(),
consumerConfiguration.consumerConfiguration().name()).await().indefinitely();
return jetStream.subscribe(consumerConfiguration.subject(),
optionsFactory.create(consumerConfiguration));
}
}
}
Loading

0 comments on commit 45c676b

Please sign in to comment.