Skip to content

Commit

Permalink
Fixed failure handling subscribing to streams
Browse files Browse the repository at this point in the history
  • Loading branch information
kjeldpaw committed Oct 9, 2024
1 parent 34cf647 commit 231c011
Show file tree
Hide file tree
Showing 23 changed files with 158 additions and 227 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.15.2"
current-version: "3.15.3"
next-version: "3.16.0-SNAPSHOT"

Original file line number Diff line number Diff line change
@@ -1,70 +1,10 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

public class Advisory {
private String type;
private String id;
private String timestamp;
private String stream;
private String consumer;
private long stream_seq;
private long deliveries;

public Advisory() {
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getTimestamp() {
return timestamp;
}

public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}

public String getStream() {
return stream;
}

public void setStream(String stream) {
this.stream = stream;
}

public String getConsumer() {
return consumer;
}

public void setConsumer(String consumer) {
this.consumer = consumer;
}

public long getStream_seq() {
return stream_seq;
}

public void setStream_seq(long stream_seq) {
this.stream_seq = stream_seq;
}

public long getDeliveries() {
return deliveries;
}

public void setDeliveries(long deliveries) {
this.deliveries = deliveries;
}
public record Advisory(String type,
String id,
String timestamp,
String stream,
String consumer,
long stream_seq,
long deliveries) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void terminate(
public Uni<Void> deadLetter(Connection connection, Message<Advisory> message) {
logger.infof("Received dead letter on dead-letter-consumer channel: %s", message);
final var advisory = message.getPayload();
return connection.<Data> resolve(advisory.getStream(), advisory.getStream_seq())
return connection.<Data> resolve(advisory.stream(), advisory.stream_seq())
.onItem().invoke(dataMessage -> lastData.set(dataMessage.getPayload()))
.onItem().transformToUni(m -> Uni.createFrom().completionStage(message.ack()))
.onFailure().recoverWithUni(throwable -> Uni.createFrom().completionStage(message.nack(throwable)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void terminate(

private Uni<Void> maxDeliveries(Connection connection, Message<Advisory> message) {
final var advisory = message.getPayload();
return connection.<Integer> resolve(advisory.getStream(), advisory.getStream_seq())
return connection.<Integer> resolve(advisory.stream(), advisory.stream_seq())
.onItem().invoke(msg -> {
maxDeliveries.get().add(msg.getPayload());
message.ack();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;
import io.restassured.parsing.Parser;

public class ReactiveMesssagingNatsJetstreamPullTest {
Expand All @@ -33,6 +35,24 @@ public void setup() {
defaultParser = Parser.JSON;
}

@Test
public void healthLive() {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
}

@Test
public void healthReady() {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
}

@Test
public void metadata() {
final var messageId = "4dc58197-8cfb-4099-a211-25d5c2d04f4b";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.List;
import java.util.stream.Collectors;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -48,7 +47,7 @@ public void tracing() {
assertThat(spans).isNotEmpty();

List<SpanData> parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid()))
.collect(Collectors.toList());
.toList();
assertEquals(1, parentSpans.size());

for (var parentSpan : parentSpans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;
import io.restassured.parsing.Parser;

public class ReactiveMesssagingNatsJetstreamPushTest {
Expand All @@ -35,6 +37,24 @@ public void setup() {
defaultParser = Parser.JSON;
}

@Test
public void healthLive() {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
}

@Test
public void healthReady() {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
}

@Test
public void metadata() {
final var messageId = "4e54818a-c624-495a-81c8-0145ad4c9925";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.List;
import java.util.stream.Collectors;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -48,7 +47,7 @@ public void tracing() {
assertThat(spans).isNotEmpty();

List<SpanData> parentSpans = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid()))
.collect(Collectors.toList());
.toList();
assertEquals(1, parentSpans.size());

for (var parentSpan : parentSpans) {
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.15.2
:project-version: 3.15.3

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,19 @@ public void data() {

@Test
public void healthLive() {
await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
}

@Test
public void healthReady() {
await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,19 @@ public HealthReport getLiveness() {

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object ignored) {
this.processors.forEach(MessageProcessor::close);
this.processors.forEach(processor -> processor.close().await().indefinitely());
}

private MessagePublisherProcessor createMessagePublisherProcessor(JetStreamConnectorIncomingConfiguration configuration) {
private MessagePublisherProcessor<?> createMessagePublisherProcessor(
JetStreamConnectorIncomingConfiguration configuration) {
final var connectionConfiguration = ConnectionConfiguration.of(natsConfiguration);
final var type = ConsumerType.valueOf(configuration.getPublisherType());
if (ConsumerType.Pull.equals(type)) {
return new MessagePullPublisherProcessor(connectionFactory,
return new MessagePullPublisherProcessor<>(connectionFactory,
connectionConfiguration,
MessagePullPublisherConfiguration.of(configuration));
} else {
return new MessagePushPublisherProcessor(connectionFactory,
return new MessagePushPublisherProcessor<>(connectionFactory,
connectionConfiguration,
MessagePushPublisherConfiguration.of(configuration));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ public enum ConnectionEvent {
Connected,
Closed,
Reconnected,
CommunicationFailed
CommunicationFailed,
SubscriptionInactive,
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ class InternalConnectionListener implements io.nats.client.ConnectionListener {
public void connectionEvent(io.nats.client.Connection connection, Events type) {
switch (type) {
case CONNECTED -> this.connection.fireEvent(ConnectionEvent.Connected, "Connection established");
case RECONNECTED ->
case RECONNECTED, RESUBSCRIBED ->
this.connection.fireEvent(ConnectionEvent.Reconnected, "Connection reestablished to server");
case CLOSED -> this.connection.fireEvent(ConnectionEvent.Closed, "Connection closed");
case DISCONNECTED -> this.connection.fireEvent(ConnectionEvent.Disconnected, "Connection disconnected");
case RESUBSCRIBED ->
this.connection.fireEvent(ConnectionEvent.Reconnected, "Connection reestablished to server");
case LAME_DUCK -> this.connection.fireEvent(ConnectionEvent.CommunicationFailed, "Lame duck mode");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ public class ReaderException extends RuntimeException {
public ReaderException(Throwable cause) {
super(cause);
}

public ReaderException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
Expand Down Expand Up @@ -49,11 +52,12 @@ public Multi<Message<P>> subscribe() {
Class<P> payloadType = consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
return Multi.createBy().repeating()
.supplier(this::nextMessage)
.until(message -> !subscription.isActive())
.uni(this::readNextMessage)
.whilst(message -> isConnected() && subscription.isActive())
.runSubscriptionOn(pullExecutor)
.emitOn(runable -> delegate.context().runOnContext(runable))
.flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, delegate.context()));
.flatMap(message -> createMulti(message.orElse(null), traceEnabled, payloadType, delegate.context()))
.onCompletion().invoke(() -> fireEvent(ConnectionEvent.SubscriptionInactive, "Subscription became inactive"));
}

@Override
Expand Down Expand Up @@ -181,25 +185,25 @@ public void close() {
delegate.close();
}

private Optional<io.nats.client.Message> nextMessage() {
try {
return Optional.ofNullable(reader.nextMessage(consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO)));
} catch (JetStreamStatusException e) {
logger.debugf(e, e.getMessage());
return Optional.empty();
} catch (IllegalStateException e) {
logger.debugf(e, "The subscription became inactive for stream: %s",
consumerConfiguration.consumerConfiguration().stream());
return Optional.empty();
} catch (InterruptedException e) {
logger.debugf(e, "The reader was interrupted for stream: %s",
consumerConfiguration.consumerConfiguration().stream());
return Optional.empty();
} catch (Throwable throwable) {
logger.warnf(throwable, "Error reading next message from stream: %s",
consumerConfiguration.consumerConfiguration().stream());
return Optional.empty();
}
private Uni<Optional<io.nats.client.Message>> readNextMessage() {
return Uni.createFrom().emitter(emitter -> {
try {
emitter.complete(Optional
.ofNullable(reader.nextMessage(consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO))));
} catch (JetStreamStatusException e) {
emitter.fail(new ReaderException(e));
} catch (IllegalStateException e) {
logger.warnf("The subscription became inactive for stream: %s",
consumerConfiguration.consumerConfiguration().stream());
emitter.complete(Optional.empty());
} catch (InterruptedException e) {
emitter.fail(new ReaderException(String.format("The reader was interrupted for stream: %s",
consumerConfiguration.consumerConfiguration().stream()), e));
} catch (Throwable throwable) {
emitter.fail(new ReaderException(String.format("Error reading next message from stream: %s",
consumerConfiguration.consumerConfiguration().stream()), throwable));
}
});
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,4 @@ public SetupException(String message, Throwable cause) {
super(message, cause);
}

public SetupException(Throwable cause) {
super(cause);
}
}

This file was deleted.

Loading

0 comments on commit 231c011

Please sign in to comment.