Skip to content

Commit

Permalink
Implemented liveness and readiness health report
Browse files Browse the repository at this point in the history
  • Loading branch information
kjeldpaw committed Oct 8, 2024
1 parent 89107fa commit e5ecb9b
Show file tree
Hide file tree
Showing 27 changed files with 430 additions and 301 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.15.1"
current-version: "3.15.2"
next-version: "3.16.0-SNAPSHOT"

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class ReactiveMesssagingNatsJetstreamDevModeTest {

@RegisterExtension
static QuarkusDevModeTest devModeTest = new QuarkusDevModeTest()
final static QuarkusDevModeTest devModeTest = new QuarkusDevModeTest()
.withApplicationRoot((jar) -> jar
.addClasses(ValueConsumingBean.class, ValueProducingBean.class, ValueResource.class,
TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;
import io.restassured.parsing.Parser;

public class ReactiveMesssagingNatsJetstreamPullTest {
Expand All @@ -34,9 +36,35 @@ public void setup() {
}

@Test
public void health() {
given().get("/q/health/ready").then().statusCode(200);
given().get("/q/health/live").then().statusCode(200);
public void readiness() {
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
public void liveness() {
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;
import io.restassured.parsing.Parser;

public class ReactiveMesssagingNatsJetstreamPushTest {
Expand All @@ -37,9 +38,35 @@ public void setup() {
}

@Test
public void health() {
given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/ready").then().statusCode(200);
given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/live").then().statusCode(200);
public void readiness() {
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
public void liveness() {
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.15.1
:project-version: 3.15.2

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;

@QuarkusTest
public class DataResourceTest {
Expand All @@ -35,17 +37,33 @@ public void data() {

@Test
public void healthLive() {
given()
.when().get("/q/health/live")
.then()
.statusCode(200);
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
public void healthReady() {
given()
.when().get("/q/health/ready")
.then()
.statusCode(200);
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkiverse.reactive.messaging.nats;

import java.time.Duration;
import java.util.Optional;

import io.nats.client.AuthHandler;
Expand Down Expand Up @@ -49,6 +50,16 @@ public interface NatsConfiguration {
*/
Optional<Long> connectionTimeout();

/**
* Back-off delay between to attempt to re-connect to NATS
*/
Optional<Duration> connectionBackoff();

/**
* The maximum number of attempts to attempt to re-connect to NATS
*/
Optional<Long> connectionAttempts();

/**
* The classname for the error listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Optional;
import java.util.Set;

import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
Expand All @@ -19,25 +21,6 @@ public interface JetStreamBuildConfiguration {
@WithDefault("true")
Boolean autoConfigure();

/**
* The number of replicas a message must be stored. Default value is 1.
*/
@WithDefault("1")
Integer replicas();

/**
* The storage type for stream data (File or Memory).
*/
@WithDefault("File")
String storageType();

/**
* Declares the retention policy for the stream. @see
* <a href="https://docs.nats.io/jetstream/concepts/streams#retention-policies">Retention Policy</a>
*/
@WithDefault("Interest")
String retentionPolicy();

/**
* If auto-configure is true the streams are created on Nats server.
*/
Expand All @@ -63,7 +46,7 @@ interface KeyValueStore {
* The storage type (File or Memory).
*/
@WithDefault("File")
String storageType();
StorageType storageType();

/**
* The maximum number of bytes for this bucket
Expand Down Expand Up @@ -108,5 +91,24 @@ interface Stream {
* Stream subjects
*/
Set<String> subjects();

/**
* The number of replicas a message must be stored. Default value is 1.
*/
@WithDefault("1")
Integer replicas();

/**
* The storage type for stream data (File or Memory).
*/
@WithDefault("File")
StorageType storageType();

/**
* Declares the retention policy for the stream. @see
* <a href="https://docs.nats.io/jetstream/concepts/streams#retention-policies">Retention Policy</a>
*/
@WithDefault("Interest")
RetentionPolicy retentionPolicy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,21 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {

@Override
public HealthReport getReadiness() {
return getHealth();
final HealthReport.HealthReportBuilder builder = HealthReport.builder();
processors.forEach(client -> builder.add(new HealthReport.ChannelInfo(
client.channel(),
client.readiness().healthy(),
client.readiness().message())));
return builder.build();
}

@Override
public HealthReport getLiveness() {
return getHealth();
}

HealthReport getHealth() {
final HealthReport.HealthReportBuilder builder = HealthReport.builder();
processors.forEach(client -> builder.add(new HealthReport.ChannelInfo(
client.getChannel(),
client.getStatus().healthy(),
client.getStatus().message())));
client.channel(),
client.liveness().healthy(),
client.liveness().message())));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import java.time.Duration;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.jboss.logging.Logger;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
Expand All @@ -21,6 +24,11 @@

@ApplicationScoped
public class ConnectionFactory {
private final static Logger logger = Logger.getLogger(ConnectionFactory.class);

private final static Duration DEFAULT_CONNECTION_BACKOFF = Duration.ofMillis(500);
private final static Long DEFAULT_CONNECTION_ATTEMPTS = 10L;

private final ExecutionHolder executionHolder;
private final MessageMapper messageMapper;
private final JetStreamInstrumenter instrumenter;
Expand All @@ -43,7 +51,7 @@ public ConnectionFactory(ExecutionHolder executionHolder,
this.streamStateMapper = streamStateMapper;
}

public <T> Uni<? extends SubscribeConnection> create(ConnectionConfiguration connectionConfiguration,
public <T> Uni<? extends SubscribeConnection<T>> create(ConnectionConfiguration connectionConfiguration,
ConnectionListener connectionListener,
ReaderConsumerConfiguration<T> consumerConfiguration) {
return getContext()
Expand All @@ -52,10 +60,15 @@ public <T> Uni<? extends SubscribeConnection> create(ConnectionConfiguration con
.item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener,
context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, instrumenter))))
.onItem().transformToUni(connection -> Uni.createFrom()
.item(Unchecked.supplier(() -> new ReaderSubscribeConnection<>(connection, consumerConfiguration))));
.item(Unchecked.supplier(() -> new ReaderSubscribeConnection<>(connection, consumerConfiguration))))
.onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage()))
.onFailure()
.retry()
.withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF))
.atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS));
}

public <T> Uni<? extends SubscribeConnection> create(ConnectionConfiguration connectionConfiguration,
public <T> Uni<? extends SubscribeConnection<T>> create(ConnectionConfiguration connectionConfiguration,
ConnectionListener connectionListener,
PushConsumerConfiguration<T> consumerConfiguration) {

Expand All @@ -65,7 +78,12 @@ public <T> Uni<? extends SubscribeConnection> create(ConnectionConfiguration con
.item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration, connectionListener,
context, messageMapper, payloadMapper, consumerMapper, streamStateMapper, instrumenter))))
.onItem().transformToUni(connection -> Uni.createFrom()
.item(Unchecked.supplier(() -> new PushSubscribeConnection<>(connection, consumerConfiguration))));
.item(Unchecked.supplier(() -> new PushSubscribeConnection<>(connection, consumerConfiguration))))
.onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage()))
.onFailure()
.retry()
.withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF))
.atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS));
}

public Uni<? extends Connection> create(ConnectionConfiguration connectionConfiguration,
Expand All @@ -74,8 +92,12 @@ public Uni<? extends Connection> create(ConnectionConfiguration connectionConfig
.onItem().transformToUni(
context -> Uni.createFrom().item(Unchecked.supplier(() -> new DefaultConnection(connectionConfiguration,
connectionListener, context, messageMapper, payloadMapper, consumerMapper, streamStateMapper,
instrumenter))));

instrumenter))))
.onFailure().invoke(failure -> logger.errorf(failure, "Failed connecting to NATS: %s", failure.getMessage()))
.onFailure()
.retry()
.withBackOff(connectionConfiguration.connectionBackoff().orElse(DEFAULT_CONNECTION_BACKOFF))
.atMost(connectionConfiguration.connectionAttempts().orElse(DEFAULT_CONNECTION_ATTEMPTS));
}

private Optional<Vertx> getVertx() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration conf
}
})
.emitOn(context::runOnContext)
.onItem().transformToUni(this::acknowledge)
.onFailure().recoverWithUni(throwable -> notAcknowledge(message, throwable));
.onFailure().transform(failure -> new PublishException(failure.getMessage(), failure));
}

@Override
Expand Down Expand Up @@ -397,17 +396,6 @@ private FetchConsumer fetchConsumer(final ConsumerContext consumerContext, final
}
}

private <T> Uni<Message<T>> acknowledge(final Message<T> message) {
return Uni.createFrom().completionStage(message.ack())
.onItem().transform(v -> message);
}

private <T> Uni<Message<T>> notAcknowledge(final Message<T> message, final Throwable throwable) {
return Uni.createFrom().completionStage(message.nack(throwable))
.onItem().invoke(() -> logger.warnf(throwable, "Message not published: %s", throwable.getMessage()))
.onItem().transformToUni(v -> Uni.createFrom().item(message));
}

private <T> Uni<ConsumerContext> getConsumerContext(final FetchConsumerConfiguration<T> configuration) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
try {
Expand Down
Loading

0 comments on commit e5ecb9b

Please sign in to comment.