Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored connection #197

Merged
merged 7 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.15.0"
current-version: "3.15.1"
next-version: "3.16.0-SNAPSHOT"

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamConnector;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamRecorder;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapperImpl;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapperImpl;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -51,7 +54,10 @@ void createNatsConnector(BuildProducer<AdditionalBeanBuildItem> buildProducer) {
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamInstrumenter.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ExecutionHolder.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConnectionFactory.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(MessageFactory.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(PayloadMapper.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(MessageMapper.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConsumerMapperImpl.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(StreamStateMapperImpl.class));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.smallrye.mutiny.Uni;

Expand All @@ -26,7 +25,7 @@ public class DeadLetterConsumingBean {
private final static Logger logger = Logger.getLogger(DeadLetterConsumingBean.class);

private final AtomicReference<Data> lastData;
private final AtomicReference<MessageConnection> connection;
private final AtomicReference<Connection> connection;
private final NatsConfiguration natsConfiguration;
private final ConnectionFactory connectionFactory;

Expand Down Expand Up @@ -70,7 +69,7 @@ public void terminate(
}
}

public Uni<Void> deadLetter(MessageConnection connection, Message<Advisory> message) {
public Uni<Void> deadLetter(Connection connection, Message<Advisory> message) {
logger.infof("Received dead letter on dead-letter-consumer channel: %s", message);
final var advisory = message.getPayload();
return connection.<Data> resolve(advisory.getStream(), advisory.getStream_seq())
Expand All @@ -79,12 +78,12 @@ public Uni<Void> deadLetter(MessageConnection connection, Message<Advisory> mess
.onFailure().recoverWithUni(throwable -> Uni.createFrom().completionStage(message.nack(throwable)));
}

private Uni<MessageConnection> getOrEstablishConnection() {
private Uni<Connection> getOrEstablishConnection() {
return Uni.createFrom().item(() -> Optional.ofNullable(connection.get())
.filter(Connection::isConnected)
.orElse(null))
.onItem().ifNull()
.switchTo(() -> connectionFactory.message(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
.switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
}))
.onItem().invoke(this.connection::set);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.smallrye.mutiny.Uni;

Expand All @@ -29,7 +28,7 @@ public class ExponentialBackoffConsumingBean {
private final AtomicReference<List<Integer>> maxDeliveries;
private final NatsConfiguration natsConfiguration;
private final ConnectionFactory connectionFactory;
private final AtomicReference<MessageConnection> connection;
private final AtomicReference<Connection> connection;

@Inject
public ExponentialBackoffConsumingBean(NatsConfiguration natsConfiguration, ConnectionFactory connectionFactory) {
Expand Down Expand Up @@ -78,7 +77,7 @@ public void terminate(
}
}

private Uni<Void> maxDeliveries(MessageConnection connection, Message<Advisory> message) {
private Uni<Void> maxDeliveries(Connection connection, Message<Advisory> message) {
final var advisory = message.getPayload();
return connection.<Integer> resolve(advisory.getStream(), advisory.getStream_seq())
.onItem().invoke(msg -> {
Expand All @@ -89,12 +88,12 @@ private Uni<Void> maxDeliveries(MessageConnection connection, Message<Advisory>
.replaceWithVoid();
}

private Uni<MessageConnection> getOrEstablishConnection() {
private Uni<Connection> getOrEstablishConnection() {
return Uni.createFrom().item(() -> Optional.ofNullable(connection.get())
.filter(Connection::isConnected)
.orElse(null))
.onItem().ifNull().switchTo(() -> connectionFactory
.message(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
}))
.onItem().invoke(this.connection::set);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.smallrye.mutiny.Uni;

Expand All @@ -25,7 +24,7 @@
public class KeyValueStoreResource {
private final ConnectionFactory connectionFactory;
private final NatsConfiguration natsConfiguration;
private final AtomicReference<MessageConnection> connection;
private final AtomicReference<Connection> connection;

@Inject
public KeyValueStoreResource(ConnectionFactory connectionFactory, NatsConfiguration natsConfiguration) {
Expand Down Expand Up @@ -66,25 +65,26 @@ public void terminate(
}
}

private Uni<Data> getValue(MessageConnection connection, String key) {
private Uni<Data> getValue(Connection connection, String key) {
return connection.getKeyValue("test", key, Data.class)
.onItem().ifNull().failWith(new NotFoundException());
.onItem().ifNull().failWith(new NotFoundException())
.onFailure().transform(failure -> new NotFoundException(failure.getMessage()));
}

public Uni<Void> putValue(MessageConnection keyValueConnection, String key, Data data) {
public Uni<Void> putValue(Connection keyValueConnection, String key, Data data) {
return keyValueConnection.putKeyValue("test", key, data);
}

public Uni<Void> deleteValue(MessageConnection connection, String key) {
public Uni<Void> deleteValue(Connection connection, String key) {
return connection.deleteKeyValue("test", key);
}

private Uni<MessageConnection> getOrEstablishConnection() {
private Uni<Connection> getOrEstablishConnection() {
return Uni.createFrom().item(() -> Optional.ofNullable(connection.get())
.filter(Connection::isConnected)
.orElse(null))
.onItem().ifNull()
.switchTo(() -> connectionFactory.message(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
.switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
}))
.onItem().invoke(this.connection::set);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@
import io.nats.client.api.ReplayPolicy;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerType;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.smallrye.mutiny.Uni;
Expand All @@ -43,42 +40,40 @@ public class RequestReplyResource {
private final ConnectionFactory connectionFactory;
private final NatsConfiguration natsConfiguration;
private final String streamName;
private final AtomicReference<AdministrationConnection> administrationConnection;
private final AtomicReference<MessageConnection> messageConnection;
private final AtomicReference<Connection> messageConnection;

@Inject
public RequestReplyResource(ConnectionFactory connectionFactory,
NatsConfiguration natsConfiguration) {
this.connectionFactory = connectionFactory;
this.natsConfiguration = natsConfiguration;
this.streamName = "request-reply";
this.administrationConnection = new AtomicReference<>();
this.messageConnection = new AtomicReference<>();
}

@GET
@Path("/streams")
public Uni<List<String>> getStreams() {
return getOrEstablishAdministrationConnection().onItem().transformToUni(AdministrationConnection::getStreams);
return getOrEstablishMessageConnection().onItem().transformToUni(Connection::getStreams);
}

@GET
@Path("/streams/{stream}/consumers")
public Uni<List<String>> getConsumers(@PathParam("stream") String stream) {
return getOrEstablishAdministrationConnection().onItem()
return getOrEstablishMessageConnection().onItem()
.transformToUni(connection -> connection.getConsumerNames(stream));
}

@GET
@Path("/streams/{stream}/subjects")
public Uni<List<String>> getSubjects(@PathParam("stream") String stream) {
return getOrEstablishAdministrationConnection().onItem().transformToUni(connection -> connection.getSubjects(stream));
return getOrEstablishMessageConnection().onItem().transformToUni(connection -> connection.getSubjects(stream));
}

@GET
@Path("/streams/{stream}/state")
public Uni<StreamState> getStreamState(@PathParam("stream") String stream) {
return getOrEstablishAdministrationConnection().onItem()
return getOrEstablishMessageConnection().onItem()
.transformToUni(connection -> connection.getStreamState(stream));
}

Expand All @@ -99,13 +94,6 @@ public Uni<Data> consumeData(@PathParam("subject") String subject) {

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object ignored) {
try {
if (administrationConnection.get() != null) {
administrationConnection.get().close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
if (messageConnection.get() != null) {
messageConnection.get().close();
Expand All @@ -115,27 +103,17 @@ public void terminate(
}
}

private Uni<AdministrationConnection> getOrEstablishAdministrationConnection() {
return Uni.createFrom().item(() -> Optional.ofNullable(administrationConnection.get())
.filter(Connection::isConnected)
.orElse(null))
.onItem().ifNull().switchTo(() -> connectionFactory
.administration(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
}))
.onItem().invoke(this.administrationConnection::set);
}

private Uni<MessageConnection> getOrEstablishMessageConnection() {
private Uni<Connection> getOrEstablishMessageConnection() {
return Uni.createFrom().item(() -> Optional.ofNullable(messageConnection.get())
.filter(Connection::isConnected)
.orElse(null))
.onItem().ifNull()
.switchTo(() -> connectionFactory.message(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
.switchTo(() -> connectionFactory.create(ConnectionConfiguration.of(natsConfiguration), (event, message) -> {
}))
.onItem().invoke(this.messageConnection::set);
}

private Uni<Void> produceData(MessageConnection connection, String subject, String id, String data, String messageId) {
private Uni<Void> produceData(Connection connection, String subject, String id, String data, String messageId) {
return connection.publish(
Message.of(new Data(data, id, messageId), Metadata.of(JetStreamOutgoingMessageMetadata.of(messageId))),
new PublishConfiguration() {
Expand All @@ -157,7 +135,7 @@ public String subject() {
.onItem().transformToUni(m -> Uni.createFrom().voidItem());
}

public Uni<Data> consumeData(MessageConnection connection, String subject) {
public Uni<Data> consumeData(Connection connection, String subject) {
return connection.nextMessage(getConsumerConfiguration(streamName, subject))
.map(message -> {
message.ack();
Expand Down Expand Up @@ -229,7 +207,7 @@ public Optional<ZonedDateTime> startTime() {
}

@Override
public Optional<Integer> maxAckPending() {
public Optional<Long> maxAckPending() {
return Optional.empty();
}

Expand All @@ -238,11 +216,6 @@ public Optional<String> durable() {
return Optional.of(subject);
}

@Override
public ConsumerType type() {
return ConsumerType.Fetch;
}

@Override
public List<String> filterSubjects() {
return List.of("events." + subject);
Expand All @@ -264,7 +237,7 @@ public Optional<Duration> inactiveThreshold() {
}

@Override
public Optional<Integer> maxDeliver() {
public Optional<Long> maxDeliver() {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.SubjectState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubjectState;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.restassured.filter.log.RequestLoggingFilter;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void requestReply() {
.then().statusCode(200).extract().as(StreamState.class);

assertThat(streamState).isNotNull();
assertThat(streamState.subjects().stream().map(SubjectState::name)).contains("events." + subject);
assertThat(streamState.subjectStates().stream().map(SubjectState::name)).contains("events." + subject);

final var result = given()
.filters(new RequestLoggingFilter(), new ResponseLoggingFilter())
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.15.0
:project-version: 3.15.1

:examples-dir: ./../examples/
19 changes: 14 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.quarkiverse</groupId>
Expand All @@ -18,10 +19,11 @@
</modules>
<scm>
<connection>scm:git:[email protected]:quarkiverse/quarkus-reactive-messaging-nats-jetstream.git</connection>
<developerConnection>scm:git:[email protected]:quarkiverse/quarkus-reactive-messaging-nats-jetstream.git</developerConnection>
<developerConnection>scm:git:[email protected]:quarkiverse/quarkus-reactive-messaging-nats-jetstream.git
</developerConnection>
<url>https://github.com/quarkiverse/quarkus-reactive-messaging-nats-jetstream</url>
<tag>HEAD</tag>
</scm>
<tag>HEAD</tag>
</scm>
<properties>
<compiler-plugin.version>3.13.0</compiler-plugin.version>
<maven.compiler.release>17</maven.compiler.release>
Expand All @@ -36,7 +38,9 @@
<microprofile-config-api.version>3.1</microprofile-config-api.version>
<assertj-core.version>3.26.3</assertj-core.version>
<lombok.version>1.18.34</lombok.version>
<opentelemetry-instrumentation-api-semconv.version>1.26.0-alpha</opentelemetry-instrumentation-api-semconv.version>
<opentelemetry-instrumentation-api-semconv.version>1.26.0-alpha
</opentelemetry-instrumentation-api-semconv.version>
<org.mapstruct.version>1.6.2</org.mapstruct.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -72,6 +76,11 @@
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${org.mapstruct.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Loading
Loading