From 8e3cec1dd4a8df1a177e87e9bfac4246f00f7b1c Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Wed, 10 Jul 2024 14:51:57 -0400 Subject: [PATCH] [C# DotNet2] List Subjects (#221) --- examples/jetstream/ack-ack/dotnet2/Main.cs | 59 ++++------- .../jetstream/list-subjects/dotnet2/Main.cs | 98 +++++++++++++++++++ .../dotnet2/list-subjects.csproj | 15 +++ 3 files changed, 134 insertions(+), 38 deletions(-) create mode 100644 examples/jetstream/list-subjects/dotnet2/Main.cs create mode 100644 examples/jetstream/list-subjects/dotnet2/list-subjects.csproj diff --git a/examples/jetstream/ack-ack/dotnet2/Main.cs b/examples/jetstream/ack-ack/dotnet2/Main.cs index 92220fce..861ff256 100644 --- a/examples/jetstream/ack-ack/dotnet2/Main.cs +++ b/examples/jetstream/ack-ack/dotnet2/Main.cs @@ -1,13 +1,7 @@ // Install NuGet packages `NATS.Net` and `Microsoft.Extensions.Logging.Console`. -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Configuration; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; -using NATS.Client.KeyValueStore; - -using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); -var logger = loggerFactory.CreateLogger("NATS-by-Example"); // `NATS_URL` environment variable can be used to pass the locations of the NATS servers. var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222"; @@ -17,7 +11,6 @@ var opts = new NatsOpts { Url = url, - LoggerFactory = loggerFactory, Name = "NATS-by-Example", }; await using var nats = new NatsConnection(opts); @@ -56,60 +49,50 @@ // The second consumer will AckSync which confirms that ack was handled. // Consumer 1, regular ack +Console.WriteLine("Consumer 1"); var consumer1 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName1)); -logger.LogInformation( - "Consumer 1, Start # pending messages: {}, messages with ack pending: {}", - consumer1.Info.NumPending, - consumer1.Info.NumAckPending); +Console.WriteLine(" Start"); +Console.WriteLine($" pending messages: {consumer1.Info.NumPending}"); +Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}"); var next = await consumer1.NextAsync(); // refresh the consumer to update it's state await consumer1.RefreshAsync(); -logger.LogInformation( - "Consumer 1, After received but before ack # pending messages: {}, messages with ack pending: {}", - consumer1.Info.NumPending, - consumer1.Info.NumAckPending); +Console.WriteLine(" After received but before ack"); +Console.WriteLine($" pending messages: {consumer1.Info.NumPending}"); +Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}"); if (next is { } msg1) { await msg1.AckAsync(); } -// refresh the consumer to update it's state await consumer1.RefreshAsync(); -logger.LogInformation( - "Consumer 1, After ack # pending messages: {}, messages with ack pending: {}", - consumer1.Info.NumPending, - consumer1.Info.NumAckPending); +Console.WriteLine(" After ack"); +Console.WriteLine($" pending messages: {consumer1.Info.NumPending}"); +Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}"); +// Consumer 2 Double Ack var consumer2 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName2)); - -logger.LogInformation( - "Consumer 2, Start # pending messages: {}, messages with ack pending: {}", - consumer2.Info.NumPending, - consumer2.Info.NumAckPending); +Console.WriteLine("Consumer 2"); +Console.WriteLine(" Start"); +Console.WriteLine($" pending messages: {consumer1.Info.NumPending}"); +Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}"); next = await consumer2.NextAsync(); -// refresh the consumer to update it's state await consumer2.RefreshAsync(); -logger.LogInformation( - "Consumer 2, After received but before ack # pending messages: {}, messages with ack pending: {}", - consumer2.Info.NumPending, - consumer2.Info.NumAckPending); +Console.WriteLine(" After received but before ack"); +Console.WriteLine($" pending messages: {consumer2.Info.NumPending}"); +Console.WriteLine($" messages with ack pending: {consumer2.Info.NumAckPending}"); if (next is { } msg2) { await msg2.AckAsync(new AckOpts { DoubleAck = true }); } -// refresh the consumer to update it's state await consumer2.RefreshAsync(); -logger.LogInformation( - "Consumer 2, After ack # pending messages: {}, messages with ack pending: {}", - consumer2.Info.NumPending, - consumer2.Info.NumAckPending); - -// That's it! -logger.LogInformation("Bye!"); +Console.WriteLine(" After ack"); +Console.WriteLine($" pending messages: {consumer2.Info.NumPending}"); +Console.WriteLine($" messages with ack pending: {consumer2.Info.NumAckPending}"); diff --git a/examples/jetstream/list-subjects/dotnet2/Main.cs b/examples/jetstream/list-subjects/dotnet2/Main.cs new file mode 100644 index 00000000..43337e17 --- /dev/null +++ b/examples/jetstream/list-subjects/dotnet2/Main.cs @@ -0,0 +1,98 @@ +// Install NuGet packages `NATS.Net` and `Microsoft.Extensions.Logging.Console`. +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; + +// `NATS_URL` environment variable can be used to pass the locations of the NATS servers. +var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222"; + +// Connect to NATS server. Since connection is disposable at the end of our scope we should flush +// our buffers and close connection cleanly. +var opts = new NatsOpts +{ + Url = url, + Name = "NATS-by-Example", +}; +await using var nats = new NatsConnection(opts); +var js = new NatsJSContext(nats); + +// ### Stream Setup +var stream = "list-subjects"; + +// Remove the stream first!, so we have a clean starting point. +try +{ + await js.DeleteStreamAsync(stream); +} +catch (NatsJSApiException e) when (e is { Error.Code: 404 }) +{ +} + +// Create the stream with a variety of subjects +var streamConfig = new StreamConfig(stream, ["plain", "greater.>", "star.*"]) +{ + Storage = StreamConfigStorage.Memory, +}; +await js.CreateStreamAsync(streamConfig); + +// ### GetStreamAsync with StreamInfoRequest +// Get the subjects via the GetStreamAsync call. +// Since this is "state" there are no subjects in the state unless +// there are messages in the subject. +// To get the subjects map, you must provide a SubjectsFilter +// Use the > to filter for all subjects +var jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = ">" }); +Console.WriteLine($"Before publishing any messages, there are 0 subjects: {jsStream.Info.State.Subjects?.Count}"); + +// Publish a message +await js.PublishAsync("plain", "plain-data"); + +jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = ">" }); +Console.WriteLine("After publishing a message to a subject, it appears in state:"); +if (jsStream.Info.State.Subjects != null) +{ + foreach (var (subject, count) in jsStream.Info.State.Subjects) + { + Console.WriteLine($" Subject '{subject}', Count {count}"); + } +} + +// Publish some more messages, this time against wildcard subjects +await js.PublishAsync("greater.A", "gtA"); +await js.PublishAsync("greater.A.B", "gtAB"); +await js.PublishAsync("greater.A.B.C", "gtABC"); +await js.PublishAsync("greater.B.B.B", "gtBBB"); +await js.PublishAsync("star.1", "star1"); +await js.PublishAsync("star.2", "star2"); + +jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = ">" }); +Console.WriteLine("Wildcard subjects show the actual subject, not the template:"); +if (jsStream.Info.State.Subjects != null) +{ + foreach (var (subject, count) in jsStream.Info.State.Subjects) + { + Console.WriteLine($" Subject '{subject}', Count {count}"); + } +} + +// ### Specific Subject Filtering +// You can filter for a more specific subject +jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = "greater.>" }); +Console.WriteLine("Filtering the subject returns only matching entries ['greater.>']"); +if (jsStream.Info.State.Subjects != null) +{ + foreach (var (subject, count) in jsStream.Info.State.Subjects) + { + Console.WriteLine($" Subject '{subject}', Count {count}"); + } +} + +jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = "greater.A.>" }); +Console.WriteLine("Filtering the subject returns only matching entries ['greater.A.>']"); +if (jsStream.Info.State.Subjects != null) +{ + foreach (var (subject, count) in jsStream.Info.State.Subjects) + { + Console.WriteLine($" Subject '{subject}', Count {count}"); + } +} diff --git a/examples/jetstream/list-subjects/dotnet2/list-subjects.csproj b/examples/jetstream/list-subjects/dotnet2/list-subjects.csproj new file mode 100644 index 00000000..fa179966 --- /dev/null +++ b/examples/jetstream/list-subjects/dotnet2/list-subjects.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + enable + + + + + + + + +