-
Notifications
You must be signed in to change notification settings - Fork 40
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
111 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
// import the library - in node.js `import {connect, etc} from "nats";` | ||
// or if not doing a module, `const {connect, etc} = require("nats");` | ||
import { | ||
AckPolicy, | ||
connect, | ||
} from "https://deno.land/x/[email protected]/src/mod.ts"; | ||
import {StorageType} from "../../jetstream/jsapi_types.ts"; | ||
|
||
// Get the passed NATS_URL or fallback to the default. This can be | ||
// a comma-separated string. | ||
const servers = Deno.env.get("NATS_URL") || "nats://localhost:4222"; | ||
|
||
// Create a client connection to an available NATS server. | ||
|
||
const nc = await connect({ servers }); | ||
|
||
// create a stream with a random name with some messages and a consumer | ||
const stream = "confirmAckStream"; | ||
const subject = "confirmAckSubject"; | ||
|
||
const jsm = await nc.jetstreamManager(); | ||
const js = nc.jetstream(); | ||
|
||
// Create a stream | ||
// (remove the stream first so we have a clean starting point) | ||
try { | ||
await jsm.streams.delete(stream); | ||
} catch (err) { | ||
if (err.code != 404) { | ||
console.error(err.message); | ||
} | ||
} | ||
|
||
const si = await jsm.streams.add({ | ||
name: stream, | ||
subjects: [subject], | ||
storage: StorageType.Memory, | ||
}); | ||
|
||
// Publish a couple messages so we can look at the state | ||
await js.publish(subject) | ||
await js.publish(subject) | ||
|
||
// Consume a message with 2 different consumers | ||
// The first consumer will (regular) ack without confirmation | ||
// The second consumer will ackSync which confirms that ack was handled. | ||
|
||
// Consumer 1 will use ack() | ||
let ci1 = await jsm.consumers.add(stream, { | ||
name: "consumer1", | ||
filter_subject: subject, | ||
ack_policy: AckPolicy.Explicit | ||
}); | ||
console.log("Consumer 1"); | ||
console.log(" Start"); | ||
console.log(` pending messages: ${ci1.num_pending}`); | ||
console.log(` messages with ack pending: ${ci1.num_ack_pending}`); | ||
|
||
const consumer1 = await js.consumers.get(stream, "consumer1"); | ||
|
||
try { | ||
const m = await consumer1.next(); | ||
if (m) { | ||
let ci1 = await consumer1.info(false); | ||
console.log(" After received but before ack"); | ||
console.log(` pending messages: ${ci1.num_pending}`); | ||
console.log(` messages with ack pending: ${ci1.num_ack_pending}`); | ||
|
||
m.ack() | ||
ci1 = await consumer1.info(false); | ||
console.log(" After ack"); | ||
console.log(` pending messages: ${ci1.num_pending}`); | ||
console.log(` messages with ack pending: ${ci1.num_ack_pending}`); | ||
} | ||
} catch (err) { | ||
console.log(`consume failed: ${err.message}`); | ||
} | ||
|
||
|
||
// Consumer 2 will use ackAck() | ||
let ci2 = await jsm.consumers.add(stream, { | ||
name: "consumer2", | ||
filter_subject: subject, | ||
ack_policy: AckPolicy.Explicit | ||
}); | ||
console.log("Consumer 2"); | ||
console.log(" Start"); | ||
console.log(` pending messages: ${ci2.num_pending}`); | ||
console.log(` messages with ack pending: ${ci2.num_ack_pending}`); | ||
|
||
const consumer2 = await js.consumers.get(stream, "consumer2"); | ||
|
||
try { | ||
const m = await consumer2.next(); | ||
if (m) { | ||
let ci2 = await consumer2.info(false); | ||
console.log(" After received but before ack"); | ||
console.log(` pending messages: ${ci2.num_pending}`); | ||
console.log(` messages with ack pending: ${ci2.num_ack_pending}`); | ||
|
||
await m.ackAck() | ||
ci2 = await consumer2.info(false); | ||
console.log(" After ack"); | ||
console.log(` pending messages: ${ci2.num_pending}`); | ||
console.log(` messages with ack pending: ${ci2.num_ack_pending}`); | ||
} | ||
} catch (err) { | ||
console.log(`consume failed: ${err.message}`); | ||
} | ||
|
||
await nc.close(); |