From 27fcc964626ac9e3ff44fe4061e34c27ce7715ef Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 23 Jul 2024 14:24:50 -0400 Subject: [PATCH 1/5] [Deno] Example for List subjects for a specific stream --- examples/jetstream/list-subjects/deno/main.js | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 examples/jetstream/list-subjects/deno/main.js diff --git a/examples/jetstream/list-subjects/deno/main.js b/examples/jetstream/list-subjects/deno/main.js new file mode 100644 index 00000000..a603b3e7 --- /dev/null +++ b/examples/jetstream/list-subjects/deno/main.js @@ -0,0 +1,80 @@ +import { + StorageType, + connect +} from "https://deno.land/x/nats@v1.28.0/src/mod.ts"; + +const servers = Deno.env.get("NATS_URL")?.split(","); + +const nc = await connect({ servers }); + +const jsm = await nc.jetstreamManager(); +const js = nc.jetstream(); + +// Create a stream +// (remove the stream first so we have a clean starting point) +try { + await jsm.streams.delete("subjects"); +} catch (err) { + if (err.code != 404) { + console.error(err.message); + } +} + +// Create a stream with a few subjects +let si = await jsm.streams.add({ + name: "subjects", + subjects: ["plain", "greater.>", "star.*"], + storage: StorageType.Memory, +}); + +// ### Get stream info with StreamInfoRequestOptions +// Get the subjects via the streams.info call. +// Since this is "state" there are no subjects in the state unless +// there are messages in the subject. +si = await jsm.streams.info("subjects", {subjects_filter: ">"}); + +const count = si.state.subjects + ? Object.getOwnPropertyNames(si.state.subjects).length + : 0; +console.log(`Before publishing any messages, there are 0 subjects: ${count}`) + +// console.log(`Before publishing any messages, there are 0 subjects: ${si.state.num_subjects}`) + +// Publish a message +await js.publish("plain") + +si = await jsm.streams.info("subjects", {subjects_filter: ">"}); +console.log("After publishing a message to a subject, it appears in state:") + +if (si.state.subjects) { + let subjects = {}; + subjects = Object.assign(subjects, si.state.subjects); + for (const key of Object.keys(subjects)) { + console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + } +} + +// Publish some more messages, this time against wildcard subjects +await js.publish("greater.A", "gtA-1"); +await js.publish("greater.A", "gtA-2"); +await js.publish("greater.A.B", "gtAB-1"); +await js.publish("greater.A.B", "gtAB-2"); +await js.publish("greater.A.B.C", "gtABC"); +await js.publish("greater.B.B.B", "gtBBB"); +await js.publish("star.1", "star1-1"); +await js.publish("star.1", "star1-2"); +await js.publish("star.2", "star2"); + +// Get all subjects +si = await jsm.streams.info("subjects", {subjects_filter: ">"}); +console.log("Wildcard subjects show the actual subject, not the template."); + +if (si.state.subjects) { + let subjects = {}; + subjects = Object.assign(subjects, si.state.subjects); + for (const key of Object.keys(subjects)) { + console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + } +} + +await nc.close(); From 9a0b6bcb2881519276ddc3d3585124b680819a47 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 23 Jul 2024 14:46:46 -0400 Subject: [PATCH 2/5] [Deno] Example for List subjects for a specific stream --- examples/jetstream/list-subjects/deno/main.js | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/examples/jetstream/list-subjects/deno/main.js b/examples/jetstream/list-subjects/deno/main.js index a603b3e7..b57a24a7 100644 --- a/examples/jetstream/list-subjects/deno/main.js +++ b/examples/jetstream/list-subjects/deno/main.js @@ -77,4 +77,28 @@ if (si.state.subjects) { } } +// ### Subject Filtering +// Instead of allSubjects, you can filter for a specific subject +si = await jsm.streams.info("subjects", {subjects_filter: "greater.>"}); +console.log("Filtering the subject returns only matching entries ['greater.>']"); + +if (si.state.subjects) { + let subjects = {}; + subjects = Object.assign(subjects, si.state.subjects); + for (const key of Object.keys(subjects)) { + console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + } +} + +si = await jsm.streams.info("subjects", {subjects_filter: "greater.A.>"}); +console.log("Filtering the subject returns only matching entries ['greater.A.>']"); + +if (si.state.subjects) { + let subjects = {}; + subjects = Object.assign(subjects, si.state.subjects); + for (const key of Object.keys(subjects)) { + console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + } +} + await nc.close(); From 669630c830068ecf0d4290851903b38a9775081f Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 23 Jul 2024 15:58:19 -0400 Subject: [PATCH 3/5] random stream and subject names --- examples/jetstream/list-subjects/deno/main.js | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/examples/jetstream/list-subjects/deno/main.js b/examples/jetstream/list-subjects/deno/main.js index b57a24a7..3d4e16f4 100644 --- a/examples/jetstream/list-subjects/deno/main.js +++ b/examples/jetstream/list-subjects/deno/main.js @@ -1,6 +1,6 @@ import { - StorageType, - connect + connect, + nuid } from "https://deno.land/x/nats@v1.28.0/src/mod.ts"; const servers = Deno.env.get("NATS_URL")?.split(","); @@ -10,40 +10,40 @@ const nc = await connect({ servers }); const jsm = await nc.jetstreamManager(); const js = nc.jetstream(); -// Create a stream -// (remove the stream first so we have a clean starting point) -try { - await jsm.streams.delete("subjects"); -} catch (err) { - if (err.code != 404) { - console.error(err.message); - } -} +const randomName: () => string = function (): string { + return nuid.next().substring(18) +}; + +// Create a stream with a few random subjects +const plainSubj = `PLAIN_${randomName()}`; +const gtSubj = `GT_${randomName()}`; +const starSubj = `STAR_${randomName()}`; +const name = `EVENTS_${randomName()}`; -// Create a stream with a few subjects let si = await jsm.streams.add({ - name: "subjects", - subjects: ["plain", "greater.>", "star.*"], - storage: StorageType.Memory, + name, + subjects: [plainSubj, `${gtSubj}.>`, `${starSubj}.*`] }); +console.log(`Stream: ${si.config.name} has these subjects: '${si.config.subjects}'`) + // ### Get stream info with StreamInfoRequestOptions // Get the subjects via the streams.info call. // Since this is "state" there are no subjects in the state unless // there are messages in the subject. -si = await jsm.streams.info("subjects", {subjects_filter: ">"}); +si = await jsm.streams.info(name, {subjects_filter: ">"}); const count = si.state.subjects ? Object.getOwnPropertyNames(si.state.subjects).length : 0; -console.log(`Before publishing any messages, there are 0 subjects: ${count}`) +console.log(`Before publishing any messages, there should be 0 subjects in the state: ${count}`) // console.log(`Before publishing any messages, there are 0 subjects: ${si.state.num_subjects}`) // Publish a message -await js.publish("plain") +await js.publish(plainSubj) -si = await jsm.streams.info("subjects", {subjects_filter: ">"}); +si = await jsm.streams.info(name, {subjects_filter: ">"}); console.log("After publishing a message to a subject, it appears in state:") if (si.state.subjects) { @@ -55,18 +55,18 @@ if (si.state.subjects) { } // Publish some more messages, this time against wildcard subjects -await js.publish("greater.A", "gtA-1"); -await js.publish("greater.A", "gtA-2"); -await js.publish("greater.A.B", "gtAB-1"); -await js.publish("greater.A.B", "gtAB-2"); -await js.publish("greater.A.B.C", "gtABC"); -await js.publish("greater.B.B.B", "gtBBB"); -await js.publish("star.1", "star1-1"); -await js.publish("star.1", "star1-2"); -await js.publish("star.2", "star2"); +await js.publish(`${gtSubj}.A`, "gtA-1"); +await js.publish(`${gtSubj}.A`, "gtA-2"); +await js.publish(`${gtSubj}.A.B`, "gtAB-1"); +await js.publish(`${gtSubj}.A.B`, "gtAB-2"); +await js.publish(`${gtSubj}.A.B.C`, "gtABC"); +await js.publish(`${gtSubj}.B.B.B`, "gtBBB"); +await js.publish(`${starSubj}.1`, "star1-1"); +await js.publish(`${starSubj}.1`, "star1-2"); +await js.publish(`${starSubj}.2`, "star2"); // Get all subjects -si = await jsm.streams.info("subjects", {subjects_filter: ">"}); +si = await jsm.streams.info(name, {subjects_filter: ">"}); console.log("Wildcard subjects show the actual subject, not the template."); if (si.state.subjects) { @@ -79,8 +79,8 @@ if (si.state.subjects) { // ### Subject Filtering // Instead of allSubjects, you can filter for a specific subject -si = await jsm.streams.info("subjects", {subjects_filter: "greater.>"}); -console.log("Filtering the subject returns only matching entries ['greater.>']"); +si = await jsm.streams.info(name, {subjects_filter: `${gtSubj}.>`}); +console.log(`Filtering the subject returns only matching entries ['${gtSubj}.>']`); if (si.state.subjects) { let subjects = {}; @@ -90,8 +90,8 @@ if (si.state.subjects) { } } -si = await jsm.streams.info("subjects", {subjects_filter: "greater.A.>"}); -console.log("Filtering the subject returns only matching entries ['greater.A.>']"); +si = await jsm.streams.info(name, {subjects_filter: `${gtSubj}.A.>`}); +console.log(`Filtering the subject returns only matching entries ['${gtSubj}.A>']`); if (si.state.subjects) { let subjects = {}; From 45ed742a8b3fffc38675a30628268e248e23cfbe Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 23 Jul 2024 16:07:57 -0400 Subject: [PATCH 4/5] better code --- examples/jetstream/list-subjects/deno/main.js | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/examples/jetstream/list-subjects/deno/main.js b/examples/jetstream/list-subjects/deno/main.js index 3d4e16f4..9f311f23 100644 --- a/examples/jetstream/list-subjects/deno/main.js +++ b/examples/jetstream/list-subjects/deno/main.js @@ -47,10 +47,8 @@ si = await jsm.streams.info(name, {subjects_filter: ">"}); console.log("After publishing a message to a subject, it appears in state:") if (si.state.subjects) { - let subjects = {}; - subjects = Object.assign(subjects, si.state.subjects); - for (const key of Object.keys(subjects)) { - console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + for (const [key, value] of Object.entries(si.state.subjects)) { + console.log(` subject '${key}' has ${value} message(s)`) } } @@ -70,10 +68,8 @@ si = await jsm.streams.info(name, {subjects_filter: ">"}); console.log("Wildcard subjects show the actual subject, not the template."); if (si.state.subjects) { - let subjects = {}; - subjects = Object.assign(subjects, si.state.subjects); - for (const key of Object.keys(subjects)) { - console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + for (const [key, value] of Object.entries(si.state.subjects)) { + console.log(` subject '${key}' has ${value} message(s)`) } } @@ -83,10 +79,8 @@ si = await jsm.streams.info(name, {subjects_filter: `${gtSubj}.>`}); console.log(`Filtering the subject returns only matching entries ['${gtSubj}.>']`); if (si.state.subjects) { - let subjects = {}; - subjects = Object.assign(subjects, si.state.subjects); - for (const key of Object.keys(subjects)) { - console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + for (const [key, value] of Object.entries(si.state.subjects)) { + console.log(` subject '${key}' has ${value} message(s)`) } } @@ -94,10 +88,8 @@ si = await jsm.streams.info(name, {subjects_filter: `${gtSubj}.A.>`}); console.log(`Filtering the subject returns only matching entries ['${gtSubj}.A>']`); if (si.state.subjects) { - let subjects = {}; - subjects = Object.assign(subjects, si.state.subjects); - for (const key of Object.keys(subjects)) { - console.log(` subject '${key}' has ${si.state.subjects[key]} message(s)`) + for (const [key, value] of Object.entries(si.state.subjects)) { + console.log(` subject '${key}' has ${value} message(s)`) } } From ed17871a98a85b649caaf102a9b7cc7429e01cef Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 23 Jul 2024 16:11:56 -0400 Subject: [PATCH 5/5] better code --- examples/jetstream/list-subjects/deno/main.js | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/examples/jetstream/list-subjects/deno/main.js b/examples/jetstream/list-subjects/deno/main.js index 9f311f23..83c0db61 100644 --- a/examples/jetstream/list-subjects/deno/main.js +++ b/examples/jetstream/list-subjects/deno/main.js @@ -10,15 +10,11 @@ const nc = await connect({ servers }); const jsm = await nc.jetstreamManager(); const js = nc.jetstream(); -const randomName: () => string = function (): string { - return nuid.next().substring(18) -}; - // Create a stream with a few random subjects -const plainSubj = `PLAIN_${randomName()}`; -const gtSubj = `GT_${randomName()}`; -const starSubj = `STAR_${randomName()}`; -const name = `EVENTS_${randomName()}`; +const plainSubj = `PLAIN_${nuid.next().substring(18)}`; +const gtSubj = `GT_${nuid.next().substring(18)}`; +const starSubj = `STAR_${nuid.next().substring(18)}`; +const name = `EVENTS_${nuid.next().substring(18)}`; let si = await jsm.streams.add({ name,