Skip to content

Commit

Permalink
Merge pull request #203 from quarkiverse/feature/failure-fix
Browse files Browse the repository at this point in the history
Fixed acknowledge of published messages
  • Loading branch information
kjeldpaw authored Oct 9, 2024
2 parents 0207320 + 3031d95 commit 1c08681
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.15.3"
current-version: "3.15.4"
next-version: "3.16.0-SNAPSHOT"

2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.15.3
:project-version: 3.15.4

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration conf
}
})
.emitOn(context::runOnContext)
.onItem().transformToUni(this::acknowledge)
.onFailure().recoverWithUni(failure -> notAcknowledge(message, failure))
.onFailure().transform(failure -> new PublishException(failure.getMessage(), failure));
}

Expand Down Expand Up @@ -397,6 +399,17 @@ private FetchConsumer fetchConsumer(final ConsumerContext consumerContext, final
}
}

private <T> Uni<Message<T>> acknowledge(final Message<T> message) {
return Uni.createFrom().completionStage(message.ack())
.onItem().transform(v -> message);
}

private <T> Uni<Message<T>> notAcknowledge(final Message<T> message, final Throwable throwable) {
return Uni.createFrom().completionStage(message.nack(throwable))
.onItem().invoke(() -> logger.warnf(throwable, "Message not acknowledged: %s", throwable.getMessage()))
.onItem().transformToUni(v -> Uni.createFrom().item(message));
}

private <T> Uni<ConsumerContext> getConsumerContext(final FetchConsumerConfiguration<T> configuration) {
return Uni.createFrom().item(Unchecked.supplier(() -> {
try {
Expand Down

0 comments on commit 1c08681

Please sign in to comment.