Skip to content

Commit

Permalink
Ack Ack category and java
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Jun 17, 2024
1 parent 372eb05 commit 19c39d0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
76 changes: 76 additions & 0 deletions examples/jetstream/ack-ack/java/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package example;

import io.nats.client.*;
import io.nats.client.api.*;

import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class Main {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}

try (Connection nc = Nats.connect(natsURL)) {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = jsm.jetStream();

// remove the stream so we have a clean starting point
try {
jsm.deleteStream("verifyAckStream");
}
catch (JetStreamApiException e) {
// means does not exist
}

// Create a stream with a few subjects
jsm.addStream(StreamConfiguration.builder()
.name("verifyAckStream")
.subjects("verifyAckSubject")
.storageType(StorageType.Memory)
.build());

// Publish a message
js.publish("verifyAckSubject", "A".getBytes());
js.publish("verifyAckSubject", "B".getBytes());

// Consume the message
StreamContext sc = nc.getStreamContext("verifyAckStream");
ConsumerContext cc1 = sc.createOrUpdateConsumer(ConsumerConfiguration.builder().filterSubject("verifyAckSubject").build());
ConsumerContext cc2 = sc.createOrUpdateConsumer(ConsumerConfiguration.builder().filterSubject("verifyAckSubject").build());

ConsumerInfo ci = cc1.getConsumerInfo();
System.out.println("Consumer 1");
System.out.println(" Start\n # pending messages: " + ci.getNumPending() + "\n # messages with ack pending: " + ci.getNumAckPending());

Message m = cc1.next();
ci = cc1.getConsumerInfo();
System.out.println(" After received but before ack\n # pending messages: " + ci.getNumPending() + "\n # messages with ack pending: " + ci.getNumAckPending());

m.ack();
Thread.sleep(100); // to give time for the ack to be completed on the server
ci = cc1.getConsumerInfo();
System.out.println(" After ack\n # pending messages: " + ci.getNumPending() + "\n # messages with ack pending: " + ci.getNumAckPending());


ci = cc2.getConsumerInfo();
System.out.println("Consumer 2");
System.out.println(" Start\n # pending messages: " + ci.getNumPending() + "\n # messages with ack pending: " + ci.getNumAckPending());

m = cc2.next();
ci = cc2.getConsumerInfo();
System.out.println(" After received but before ack\n # pending messages: " + ci.getNumPending() + "\n # messages with ack pending: " + ci.getNumAckPending());

m.ackSync(Duration.ofMillis(500));
ci = cc2.getConsumerInfo();
System.out.println(" After ack\n # pending messages: " + ci.getNumPending() + "\n # messages with ack pending: " + ci.getNumAckPending());
} catch (InterruptedException | IOException | JetStreamApiException | JetStreamStatusCheckedException | TimeoutException e) {
e.printStackTrace();
}
}
}
10 changes: 10 additions & 0 deletions examples/jetstream/ack-ack/meta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: Confirmed message ack
description: |-
A confirmed message ack means that the client waits for an ack from the server to ensure that the ack was received and processed.
The functionality can be found in various clients under the following:
<table>
<tr><th>Name</th><th>Clients</th></tr>
<tr><td>ack ack</th><th>Javascript</th></tr>
<tr><td>double ack</th><th>Rust, C# .NET V2</th></tr>
<tr><td>ack ack</th><th>Go, Python, Java, C</th></tr>
</table>
1 change: 1 addition & 0 deletions examples/jetstream/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ examples:
- consumer-fetch-messages
- partitions
- list-subjects
- ack-ack

0 comments on commit 19c39d0

Please sign in to comment.