Skip to content

Commit

Permalink
Merge pull request #199 from quarkiverse/feature/health
Browse files Browse the repository at this point in the history
Implemented liveness and readiness health report
  • Loading branch information
kjeldpaw authored Oct 8, 2024
2 parents 89107fa + 95e44b5 commit 4a323d4
Show file tree
Hide file tree
Showing 30 changed files with 439 additions and 309 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.15.1"
current-version: "3.15.2"
next-version: "3.16.0-SNAPSHOT"

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.TimeUnit;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;

public class HealthTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class))
.withConfigurationResource("application-health.properties");

@Test
public void readiness() {
await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
public void liveness() {
await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class ReactiveMesssagingNatsJetstreamDevModeTest {

@RegisterExtension
static QuarkusDevModeTest devModeTest = new QuarkusDevModeTest()
final static QuarkusDevModeTest devModeTest = new QuarkusDevModeTest()
.withApplicationRoot((jar) -> jar
.addClasses(ValueConsumingBean.class, ValueProducingBean.class, ValueResource.class,
TestSpanExporter.class, Data.class, DataResource.class, DataConsumingBean.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public void setup() {
defaultParser = Parser.JSON;
}

@Test
public void health() {
given().get("/q/health/ready").then().statusCode(200);
given().get("/q/health/live").then().statusCode(200);
}

@Test
public void metadata() {
final var messageId = "4dc58197-8cfb-4099-a211-25d5c2d04f4b";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.parsing.Parser;

public class ReactiveMesssagingNatsJetstreamPushTest {
Expand All @@ -36,12 +35,6 @@ public void setup() {
defaultParser = Parser.JSON;
}

@Test
public void health() {
given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/ready").then().statusCode(200);
given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/live").then().statusCode(200);
}

@Test
public void metadata() {
final var messageId = "4e54818a-c624-495a-81c8-0145ad4c9925";
Expand Down
10 changes: 10 additions & 0 deletions deployment/src/test/resources/application-health.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
quarkus.messaging.nats.jet-stream.streams[0].name=test
quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=data

mp.messaging.outgoing.data.connector=quarkus-jetstream
mp.messaging.outgoing.data.stream=test
mp.messaging.outgoing.data.subject=data

mp.messaging.incoming.data-consumer.connector=quarkus-jetstream
mp.messaging.incoming.data-consumer.stream=test
mp.messaging.incoming.data-consumer.subject=data
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.15.1
:project-version: 3.15.2

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;

@QuarkusTest
public class DataResourceTest {
Expand All @@ -35,17 +37,33 @@ public void data() {

@Test
public void healthLive() {
given()
.when().get("/q/health/live")
.then()
.statusCode(200);
await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/live")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}

@Test
public void healthReady() {
given()
.when().get("/q/health/ready")
.then()
.statusCode(200);
await().atMost(60, TimeUnit.SECONDS).pollInterval(5, TimeUnit.SECONDS).until(() -> {
try {
given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
.when().get("/q/health/ready")
.then()
.statusCode(200);
return true;
} catch (AssertionError e) {
return false;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkiverse.reactive.messaging.nats;

import java.time.Duration;
import java.util.Optional;

import io.nats.client.AuthHandler;
Expand Down Expand Up @@ -49,6 +50,16 @@ public interface NatsConfiguration {
*/
Optional<Long> connectionTimeout();

/**
* Back-off delay between to attempt to re-connect to NATS
*/
Optional<Duration> connectionBackoff();

/**
* The maximum number of attempts to attempt to re-connect to NATS
*/
Optional<Long> connectionAttempts();

/**
* The classname for the error listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Optional;
import java.util.Set;

import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
Expand All @@ -19,25 +21,6 @@ public interface JetStreamBuildConfiguration {
@WithDefault("true")
Boolean autoConfigure();

/**
* The number of replicas a message must be stored. Default value is 1.
*/
@WithDefault("1")
Integer replicas();

/**
* The storage type for stream data (File or Memory).
*/
@WithDefault("File")
String storageType();

/**
* Declares the retention policy for the stream. @see
* <a href="https://docs.nats.io/jetstream/concepts/streams#retention-policies">Retention Policy</a>
*/
@WithDefault("Interest")
String retentionPolicy();

/**
* If auto-configure is true the streams are created on Nats server.
*/
Expand All @@ -63,7 +46,7 @@ interface KeyValueStore {
* The storage type (File or Memory).
*/
@WithDefault("File")
String storageType();
StorageType storageType();

/**
* The maximum number of bytes for this bucket
Expand Down Expand Up @@ -108,5 +91,24 @@ interface Stream {
* Stream subjects
*/
Set<String> subjects();

/**
* The number of replicas a message must be stored. Default value is 1.
*/
@WithDefault("1")
Integer replicas();

/**
* The storage type for stream data (File or Memory).
*/
@WithDefault("File")
StorageType storageType();

/**
* Declares the retention policy for the stream. @see
* <a href="https://docs.nats.io/jetstream/concepts/streams#retention-policies">Retention Policy</a>
*/
@WithDefault("Interest")
RetentionPolicy retentionPolicy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,21 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {

@Override
public HealthReport getReadiness() {
return getHealth();
final HealthReport.HealthReportBuilder builder = HealthReport.builder();
processors.forEach(client -> builder.add(new HealthReport.ChannelInfo(
client.channel(),
client.readiness().healthy(),
client.readiness().message())));
return builder.build();
}

@Override
public HealthReport getLiveness() {
return getHealth();
}

HealthReport getHealth() {
final HealthReport.HealthReportBuilder builder = HealthReport.builder();
processors.forEach(client -> builder.add(new HealthReport.ChannelInfo(
client.getChannel(),
client.getStatus().healthy(),
client.getStatus().message())));
client.channel(),
client.liveness().healthy(),
client.liveness().message())));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
Expand All @@ -14,6 +15,7 @@
import io.smallrye.mutiny.Uni;

public interface Connection extends AutoCloseable {
Logger logger = Logger.getLogger(Connection.class);

boolean isConnected();

Expand All @@ -24,6 +26,7 @@ public interface Connection extends AutoCloseable {
void addListener(ConnectionListener listener);

default void fireEvent(ConnectionEvent event, String message) {
logger.infof("Event: %s, message: %s", event, message);
listeners().forEach(listener -> listener.onEvent(event, message));
}

Expand Down
Loading

0 comments on commit 4a323d4

Please sign in to comment.