Skip to content

Commit

Permalink
[DENO] Ack Ack (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 23, 2024
1 parent 9ce8fce commit d7b1483
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions examples/jetstream/ack-ack/deno/main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import {
AckPolicy,
StorageType,
connect
} from "https://deno.land/x/[email protected]/src/mod.ts";

const servers = Deno.env.get("NATS_URL")?.split(",");

const nc = await connect({ servers });

// create a stream with a random name with some messages and a consumer
const stream = "confirmAckStream";
const subject = "confirmAckSubject";

const jsm = await nc.jetstreamManager();
const js = nc.jetstream();

// Create a stream
// (remove the stream first so we have a clean starting point)
try {
await jsm.streams.delete(stream);
} catch (err) {
if (err.code != 404) {
console.error(err.message);
}
}

await jsm.streams.add({
name: stream,
subjects: [subject],
storage: StorageType.Memory,
});

// Publish a couple messages so we can look at the state
await js.publish(subject)
await js.publish(subject)

// Consume a message with 2 different consumers
// The first consumer will (regular) ack without confirmation
// The second consumer will ackSync which confirms that ack was handled.

// Consumer 1 will use ack()
const ci1 = await jsm.consumers.add(stream, {
name: "consumer1",
filter_subject: subject,
ack_policy: AckPolicy.Explicit
});
console.log("Consumer 1");
console.log(" Start");
console.log(` pending messages: ${ci1.num_pending}`);
console.log(` messages with ack pending: ${ci1.num_ack_pending}`);

const consumer1 = await js.consumers.get(stream, "consumer1");

try {
const m = await consumer1.next();
if (m) {
let ci1= await consumer1.info(false);
console.log(" After received but before ack");
console.log(` pending messages: ${ci1.num_pending}`);
console.log(` messages with ack pending: ${ci1.num_ack_pending}`);

m.ack()
ci1 = await consumer1.info(false);
console.log(" After ack");
console.log(` pending messages: ${ci1.num_pending}`);
console.log(` messages with ack pending: ${ci1.num_ack_pending}`);
}
} catch (err) {
console.log(`consume failed: ${err.message}`);
}


// Consumer 2 will use ackAck()
const ci2 = await jsm.consumers.add(stream, {
name: "consumer2",
filter_subject: subject,
ack_policy: AckPolicy.Explicit
});
console.log("Consumer 2");
console.log(" Start");
console.log(` pending messages: ${ci2.num_pending}`);
console.log(` messages with ack pending: ${ci2.num_ack_pending}`);

const consumer2 = await js.consumers.get(stream, "consumer2");

try {
const m = await consumer2.next();
if (m) {
let ci2= await consumer2.info(false);
console.log(" After received but before ack");
console.log(` pending messages: ${ci2.num_pending}`);
console.log(` messages with ack pending: ${ci2.num_ack_pending}`);

await m.ackAck()
ci2 = await consumer2.info(false);
console.log(" After ack");
console.log(` pending messages: ${ci2.num_pending}`);
console.log(` messages with ack pending: ${ci2.num_ack_pending}`);
}
} catch (err) {
console.log(`consume failed: ${err.message}`);
}

await nc.close();

1 comment on commit d7b1483

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for nats-by-example ready!

✅ Preview
https://nats-by-example-29cnefpxm-connecteverything.vercel.app

Built with commit d7b1483.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.