Skip to content

Commit

Permalink
[C# DotNet2] List Subjects (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 10, 2024
1 parent b49bae6 commit 8e3cec1
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 38 deletions.
59 changes: 21 additions & 38 deletions examples/jetstream/ack-ack/dotnet2/Main.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
// Install NuGet packages `NATS.Net` and `Microsoft.Extensions.Logging.Console`.
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Configuration;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.KeyValueStore;

using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

// `NATS_URL` environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Expand All @@ -17,7 +11,6 @@
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
Expand Down Expand Up @@ -56,60 +49,50 @@
// The second consumer will AckSync which confirms that ack was handled.

// Consumer 1, regular ack
Console.WriteLine("Consumer 1");
var consumer1 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName1));
logger.LogInformation(
"Consumer 1, Start # pending messages: {}, messages with ack pending: {}",
consumer1.Info.NumPending,
consumer1.Info.NumAckPending);
Console.WriteLine(" Start");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");

var next = await consumer1.NextAsync<string>();

// refresh the consumer to update it's state
await consumer1.RefreshAsync();
logger.LogInformation(
"Consumer 1, After received but before ack # pending messages: {}, messages with ack pending: {}",
consumer1.Info.NumPending,
consumer1.Info.NumAckPending);
Console.WriteLine(" After received but before ack");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");

if (next is { } msg1)
{
await msg1.AckAsync();
}

// refresh the consumer to update it's state
await consumer1.RefreshAsync();
logger.LogInformation(
"Consumer 1, After ack # pending messages: {}, messages with ack pending: {}",
consumer1.Info.NumPending,
consumer1.Info.NumAckPending);
Console.WriteLine(" After ack");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");

// Consumer 2 Double Ack
var consumer2 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName2));

logger.LogInformation(
"Consumer 2, Start # pending messages: {}, messages with ack pending: {}",
consumer2.Info.NumPending,
consumer2.Info.NumAckPending);
Console.WriteLine("Consumer 2");
Console.WriteLine(" Start");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");

next = await consumer2.NextAsync<string>();

// refresh the consumer to update it's state
await consumer2.RefreshAsync();
logger.LogInformation(
"Consumer 2, After received but before ack # pending messages: {}, messages with ack pending: {}",
consumer2.Info.NumPending,
consumer2.Info.NumAckPending);
Console.WriteLine(" After received but before ack");
Console.WriteLine($" pending messages: {consumer2.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer2.Info.NumAckPending}");

if (next is { } msg2)
{
await msg2.AckAsync(new AckOpts { DoubleAck = true });
}

// refresh the consumer to update it's state
await consumer2.RefreshAsync();
logger.LogInformation(
"Consumer 2, After ack # pending messages: {}, messages with ack pending: {}",
consumer2.Info.NumPending,
consumer2.Info.NumAckPending);

// That's it!
logger.LogInformation("Bye!");
Console.WriteLine(" After ack");
Console.WriteLine($" pending messages: {consumer2.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer2.Info.NumAckPending}");
98 changes: 98 additions & 0 deletions examples/jetstream/list-subjects/dotnet2/Main.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Install NuGet packages `NATS.Net` and `Microsoft.Extensions.Logging.Console`.
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

// `NATS_URL` environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

// Connect to NATS server. Since connection is disposable at the end of our scope we should flush
// our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);

// ### Stream Setup
var stream = "list-subjects";

// Remove the stream first!, so we have a clean starting point.
try
{
await js.DeleteStreamAsync(stream);
}
catch (NatsJSApiException e) when (e is { Error.Code: 404 })
{
}

// Create the stream with a variety of subjects
var streamConfig = new StreamConfig(stream, ["plain", "greater.>", "star.*"])
{
Storage = StreamConfigStorage.Memory,
};
await js.CreateStreamAsync(streamConfig);

// ### GetStreamAsync with StreamInfoRequest
// Get the subjects via the GetStreamAsync call.
// Since this is "state" there are no subjects in the state unless
// there are messages in the subject.
// To get the subjects map, you must provide a SubjectsFilter
// Use the &gt; to filter for all subjects
var jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = ">" });
Console.WriteLine($"Before publishing any messages, there are 0 subjects: {jsStream.Info.State.Subjects?.Count}");

// Publish a message
await js.PublishAsync("plain", "plain-data");

jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = ">" });
Console.WriteLine("After publishing a message to a subject, it appears in state:");
if (jsStream.Info.State.Subjects != null)
{
foreach (var (subject, count) in jsStream.Info.State.Subjects)
{
Console.WriteLine($" Subject '{subject}', Count {count}");
}
}

// Publish some more messages, this time against wildcard subjects
await js.PublishAsync("greater.A", "gtA");
await js.PublishAsync("greater.A.B", "gtAB");
await js.PublishAsync("greater.A.B.C", "gtABC");
await js.PublishAsync("greater.B.B.B", "gtBBB");
await js.PublishAsync("star.1", "star1");
await js.PublishAsync("star.2", "star2");

jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = ">" });
Console.WriteLine("Wildcard subjects show the actual subject, not the template:");
if (jsStream.Info.State.Subjects != null)
{
foreach (var (subject, count) in jsStream.Info.State.Subjects)
{
Console.WriteLine($" Subject '{subject}', Count {count}");
}
}

// ### Specific Subject Filtering
// You can filter for a more specific subject
jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = "greater.>" });
Console.WriteLine("Filtering the subject returns only matching entries ['greater.>']");
if (jsStream.Info.State.Subjects != null)
{
foreach (var (subject, count) in jsStream.Info.State.Subjects)
{
Console.WriteLine($" Subject '{subject}', Count {count}");
}
}

jsStream = await js.GetStreamAsync(stream, new StreamInfoRequest() { SubjectsFilter = "greater.A.>" });
Console.WriteLine("Filtering the subject returns only matching entries ['greater.A.>']");
if (jsStream.Info.State.Subjects != null)
{
foreach (var (subject, count) in jsStream.Info.State.Subjects)
{
Console.WriteLine($" Subject '{subject}', Count {count}");
}
}
15 changes: 15 additions & 0 deletions examples/jetstream/list-subjects/dotnet2/list-subjects.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NATS.Net" Version="2.3.1"/>
<PackageReference Include="NATS.Client.Serializers.Json" Version="2.0.0"/>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
</ItemGroup>

</Project>

1 comment on commit 8e3cec1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for nats-by-example ready!

✅ Preview
https://nats-by-example-qnqplp8hd-connecteverything.vercel.app

Built with commit 8e3cec1.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.