Skip to content

Commit

Permalink
Minor fixes and cleanup (#244)
Browse files Browse the repository at this point in the history
* Minor fixes and cleanup
  • Loading branch information
kjeldpaw authored Dec 8, 2024
1 parent a6f9ca1 commit a4d4988
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.17.3"
current-version: "3.17.4"
next-version: "3.18.0-SNAPSHOT"

2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.17.3
:project-version: 3.17.4

:examples-dir: ./../examples/
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ quarkus.messaging.nats.connection-backoff,Back-off delay between to attempt to r
[format="csv",cols="2"]
|======
quarkus.messaging.nats.jet-stream.auto-configure,Autoconfigure stream and subjects based on channel configuration
quarkus.messaging.nats.jet-stream.trace,Enable tracing
quarkus.messaging.nats.jet-stream.streams[i].replicas,The number of replicas a message must be stored. Default value is 1
quarkus.messaging.nats.jet-stream.streams[i].storage-type,The storage type for stream data (File or Memory)
quarkus.messaging.nats.jet-stream.streams[i].retention-policy,Declares the retention policy for the stream (Limits or Interest)
Expand Down Expand Up @@ -151,7 +152,6 @@ mp.messaging.outgoing.[channel-name].trace-enabled,Enable traces for subscriber
mp.messaging.incoming.[channel-name].name,The name of the NATS consumer
mp.messaging.incoming.[channel-name].stream,The stream to publish messages from
mp.messaging.incoming.[channel-name].subject,The subject to publish messages from
mp.messaging.incoming.[channel-name].trace-enabled,Enable traces for publisher
mp.messaging.incoming.[channel-name].publisher-type,"The publisher type (Pull, Push)"
mp.messaging.incoming.[channel-name].payload-type,The class name of the payload type
mp.messaging.incoming.[channel-name].durable,Sets the durable name for the consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
import org.jboss.logging.Logger;

import io.nats.client.*;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.nats.client.api.*;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.*;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.*;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
Expand Down Expand Up @@ -249,24 +248,19 @@ public Uni<List<PurgeResult>> purgeAllStreams() {
}

@Override
public <T> Uni<Message<T>> publish(final Message<T> message, PublishConfiguration publishConfiguration, Tracer<T> tracer,
Context context) {
public <T> Uni<Message<T>> publish(final Message<T> message, final PublishConfiguration publishConfiguration,
final Tracer<T> tracer, final Context context) {
return Uni.createFrom().voidItem()
.emitOn(context::runOnContext)
.onItem().transformToUni(ignore -> tracer.withTrace(message, publishConfiguration))
.onItem()
.transformToUni(subscribeMessage -> getJetStream().onItem()
.transform(jetStream -> Tuple2.of(jetStream, subscribeMessage)))
.onItem().transformToUni(tuple -> Uni.createFrom().completionStage(
tuple.getItem1().publishAsync(tuple.getItem2().configuration().subject(),
toJetStreamHeaders(tuple.getItem2().headers()),
tuple.getItem2().payload(),
createPublishOptions(tuple.getItem2().messageId(), tuple.getItem2().configuration().stream()))))
.onItem().transform(ack -> {
logger.debugf("Message published to stream: %s with sequence number: %d", ack.getStream(), ack.getSeqno());
return message;
.onItem().transformToUni(this::getJetStream)
.onItem().transformToUni(this::publishMessage)
.onItem().transform(tuple -> {
logger.debugf("Message published to stream: %s with sequence number: %d", tuple.getItem1().getStream(),
tuple.getItem1().getSeqno());
return tuple.getItem2();
})
.onItem().transformToUni(ignore -> acknowledge(message))
.onItem().transformToUni(this::acknowledge)
.onFailure().recoverWithUni(failure -> notAcknowledge(message, failure))
.onFailure().transform(failure -> new PublishException(failure.getMessage(), failure));
}
Expand Down Expand Up @@ -743,4 +737,17 @@ private Uni<Void> deleteStream(final JetStreamManagement jsm, String streamName)
}
});
}

private <T> Uni<Tuple2<JetStream, SubscribeMessage<T>>> getJetStream(SubscribeMessage<T> subscribeMessage) {
return getJetStream().onItem().transform(jetStream -> Tuple2.of(jetStream, subscribeMessage));
}

private <T> Uni<Tuple2<PublishAck, Message<T>>> publishMessage(Tuple2<JetStream, SubscribeMessage<T>> tuple) {
return Uni.createFrom().completionStage(
tuple.getItem1().publishAsync(tuple.getItem2().configuration().subject(),
toJetStreamHeaders(tuple.getItem2().headers()),
tuple.getItem2().payload(),
createPublishOptions(tuple.getItem2().messageId(), tuple.getItem2().configuration().stream())))
.onItem().transform(ack -> Tuple2.of(ack, tuple.getItem2().message()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@

import io.quarkiverse.reactive.messaging.nats.jetstream.client.*;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;

public class MessageSubscriberProcessor implements MessageProcessor, ConnectionListener {
public class MessageSubscriberProcessor<T> implements MessageProcessor, ConnectionListener {
private final static Logger logger = Logger.getLogger(MessageSubscriberProcessor.class);

private final ConnectionConfiguration connectionConfiguration;
private final MessageSubscriberConfiguration configuration;
private final ConnectionFactory connectionFactory;
private final AtomicReference<Status> status;
private final AtomicReference<Connection> connection;
private final TracerFactory tracerFactory;
private final Tracer<T> tracer;
private final Context context;

public MessageSubscriberProcessor(
Expand All @@ -38,15 +39,15 @@ public MessageSubscriberProcessor(
this.configuration = configuration;
this.status = new AtomicReference<>(new Status(true, "Subscriber processor inactive", ConnectionEvent.Closed));
this.connection = new AtomicReference<>();
this.tracerFactory = tracerFactory;
this.tracer = tracerFactory.create();
this.context = context;
}

public <T> Flow.Subscriber<Message<T>> subscriber() {
public Flow.Subscriber<Message<T>> subscriber() {
return MultiUtils.via(this::subscribe);
}

private <T> Multi<Message<T>> subscribe(Multi<Message<T>> subscription) {
private Multi<Message<T>> subscribe(Multi<Message<T>> subscription) {
return subscription.onItem().transformToUniAndConcatenate(this::publish);
}

Expand Down Expand Up @@ -83,17 +84,17 @@ public void onEvent(ConnectionEvent event, String message) {
this.status.set(Status.builder().healthy(true).message(message).event(event).build());
}

private <T> Uni<Message<T>> publish(final Message<T> message) {
private Uni<Message<T>> publish(final Message<T> message) {
return getOrEstablishConnection()
.onItem()
.transformToUni(connection -> context
.withContext(ctx -> connection.publish(message, configuration, tracerFactory.create(), ctx)))
.withContext(ctx -> connection.publish(message, configuration, tracer, ctx)))
.onFailure()
.invoke(failure -> logger.errorf(failure, "Failed to publish with message: %s", failure.getMessage()))
.onFailure().recoverWithUni(() -> recover(message));
}

private <T> Uni<Message<T>> recover(final Message<T> message) {
private Uni<Message<T>> recover(final Message<T> message) {
return Uni.createFrom().<Void> item(() -> {
close();
return null;
Expand Down

0 comments on commit a4d4988

Please sign in to comment.