diff --git a/.github/project.yml b/.github/project.yml index 859817b6..3034edb6 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "1.13.3" - next-version: "1.14.0-SNAPSHOT" + current-version: "1.14.0" + next-version: "1.15.0-SNAPSHOT" diff --git a/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamDevServicesProcessor.java b/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamDevServicesProcessor.java index b7f7fb3d..fced83f0 100644 --- a/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamDevServicesProcessor.java +++ b/deployment/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/deployment/ReactiveMesssagingNatsJetstreamDevServicesProcessor.java @@ -138,7 +138,7 @@ private RunningDevService startJetStreamContainer(DockerStatusBuildItem dockerSt return null; } - if (!dockerStatusBuildItem.isDockerAvailable()) { + if (!dockerStatusBuildItem.isContainerRuntimeAvailable()) { logger.warn("Docker isn't working, please configure the NATS JetStream broker location."); return null; } diff --git a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java index 2b2ac24a..905e26b4 100644 --- a/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java +++ b/deployment/src/test/java/io/quarkiverse/reactive/messaging/nats/jetstream/test/ReactiveMesssagingNatsJetstreamPushTest.java @@ -14,6 +14,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.QuarkusUnitTest; +import io.restassured.filter.log.RequestLoggingFilter; import io.restassured.parsing.Parser; public class ReactiveMesssagingNatsJetstreamPushTest { @@ -37,8 +38,8 @@ public void setup() { @Test public void health() { - given().get("/q/health/ready").then().statusCode(200); - given().get("/q/health/live").then().statusCode(200); + given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/ready").then().statusCode(200); + given().filters(new RequestLoggingFilter(), new RequestLoggingFilter()).get("/q/health/live").then().statusCode(200); } @Test diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index 7d605e20..f78c3f83 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 1.13.3 +:project-version: 1.14.0 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3c54dff5..9616315f 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 17 UTF-8 UTF-8 - 3.14.3 + 3.15.1 2.20.2 0.3.0 diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java index 445f839a..01d24610 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.java @@ -234,21 +234,21 @@ private Uni handleConsumerContextFailure(final FetchConsume private Uni nextMessage(final ConsumerContext consumerContext, final Duration timeout) { - return Uni.createFrom().item(Unchecked.supplier(() -> { + return Uni.createFrom(). emitter(emitter -> { try { try (final var fetchConsumer = fetchConsumer(consumerContext, timeout)) { final var message = fetchConsumer.nextMessage(); if (message != null) { - return message; + emitter.complete(message); } else { - throw new MessageNotFoundException(); + emitter.fail(new MessageNotFoundException()); } } } catch (Throwable failure) { logger.errorf(failure, "Failed to fetch message: %s", failure.getMessage()); - throw new FetchException(failure); + emitter.fail(new FetchException(failure)); } - })) + }) .emitOn(context::runOnContext); } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java index fdf8bd03..d9603b60 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.java @@ -28,7 +28,7 @@ public MessagePullPublisherProcessor(final ConnectionFactory connectionFactory, final MessagePullPublisherConfiguration configuration) { this.configuration = configuration; this.connectionFactory = connectionFactory; - this.status = new AtomicReference<>(new Status(false, "Not connected", ConnectionEvent.Closed)); + this.status = new AtomicReference<>(new Status(true, "Not connected", ConnectionEvent.Closed)); this.connection = new AtomicReference<>(); this.connectionConfiguration = connectionConfiguration; } diff --git a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java index 1803e0a2..a474e482 100644 --- a/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java +++ b/runtime/src/main/java/io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.java @@ -32,7 +32,7 @@ public MessagePushPublisherProcessor(final ConnectionFactory connectionFactory, this.connectionConfiguration = connectionConfiguration; this.configuration = configuration; this.connectionFactory = connectionFactory; - this.status = new AtomicReference<>(new Status(false, "Not connected", ConnectionEvent.Closed)); + this.status = new AtomicReference<>(new Status(true, "Not connected", ConnectionEvent.Closed)); this.optionsFactory = new PushSubscribeOptionsFactory(); this.connection = new AtomicReference<>(); }