From 1edfaff240bc4796f873bda86fe3553e3cec9280 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 17 Dec 2024 17:09:59 -0600 Subject: [PATCH] docs and final work on querying non stale projection data. Closes GH-3489 --- docs/events/projections/async-daemon.md | 45 +++++++++++++++++++ docs/scenarios/command_handler_workflow.md | 2 +- .../querying_with_non_stale_data.cs | 34 ++++++++++++++ .../AsyncProjectionTestingExtensions.cs | 15 +------ src/Marten/Storage/IMartenDatabase.cs | 7 +++ .../Storage/MartenDatabase.EventStorage.cs | 18 ++++++++ src/Marten/Storage/StandinDatabase.cs | 5 +++ 7 files changed, 111 insertions(+), 15 deletions(-) diff --git a/docs/events/projections/async-daemon.md b/docs/events/projections/async-daemon.md index d2c984e91e..b1b3971538 100644 --- a/docs/events/projections/async-daemon.md +++ b/docs/events/projections/async-daemon.md @@ -483,3 +483,48 @@ using multi-tenancy through a database per tenant. On these spans will be these There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping` that just emits and update every time that Marten has to "skip" stale events. + +## Querying for Non Stale Data + +There are some potential benefits to running projections asynchronously, namely: + +* Avoiding concurrent updates to aggregated documents so that the results are accurate, especially when the aggregation is "multi-stream" +* Putting the work of building aggregates into a background process so you don't take the performance "hit" of doing that work during requests from a client + +All that being said, using asynchronous projections means you're going into the realm of [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency), and sometimes +that's really inconvenient when your users or clients expect up to date information about the projected aggregate data. + +Not to worry though, because Marten will allow you to "wait" for an asynchronous projection to catch up so that you +can query the latest information as all the events captured at the time of the query are processed through the asynchronous +projection like so: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection(builder.Configuration.GetConnectionString("marten")); + opts.Projections.Add(ProjectionLifecycle.Async); +}).AddAsyncDaemon(DaemonMode.HotCold); + +using var host = builder.Build(); +await host.StartAsync(); + +// DocumentStore() is an extension method in Marten just +// as a convenience method for test automation +await using var session = host.DocumentStore().LightweightSession(); + +// This query operation will first "wait" for the asynchronous projection building the +// Trip aggregate document to catch up to at least the highest event sequence number assigned +// at the time this method is called +var latest = await session.QueryForNonStaleData(5.Seconds()) + .OrderByDescending(x => x.Started) + .Take(10) + .ToListAsync(); +``` +snippet source | anchor + + +Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to +be both cautious with using this in general, and also cautious especially with the timeout setting. diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md index 3b23d9965f..c9933c01fb 100644 --- a/docs/scenarios/command_handler_workflow.md +++ b/docs/scenarios/command_handler_workflow.md @@ -170,7 +170,7 @@ builder.Services.AddMarten(opts => // need identity map mechanics in your commands or query handlers .UseLightweightSessions(); ``` -snippet source | anchor +snippet source | anchor It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization diff --git a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs index ca76bb38b6..a56bb21840 100644 --- a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs +++ b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs @@ -8,8 +8,11 @@ using Marten; using Marten.Events; using Marten.Events.Daemon; +using Marten.Events.Daemon.Resiliency; using Marten.Events.Projections; using Marten.Testing.Harness; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; using Shouldly; using Xunit; @@ -124,4 +127,35 @@ public async Task try_to_query_for_non_stale_data_by_aggregate_type() await task; } + + public static async Task ExampleUsage() + { + #region sample_using_query_for_non_stale_data + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + opts.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.HotCold); + + using var host = builder.Build(); + await host.StartAsync(); + + // DocumentStore() is an extension method in Marten just + // as a convenience method for test automation + await using var session = host.DocumentStore().LightweightSession(); + + // This query operation will first "wait" for the asynchronous projection building the + // Trip aggregate document to catch up to at least the highest event sequence number assigned + // at the time this method is called + var latest = await session.QueryForNonStaleData(5.Seconds()) + .OrderByDescending(x => x.Started) + .Take(10) + .ToListAsync(); + + #endregion + } } + + diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index a1eee8407c..406d5cfa05 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -142,26 +142,13 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase var shards = database.As().Options.Projections.AsyncShardsPublishingType(aggregationType); if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}"); - var all = shards.Concat([ShardName.HighWaterMark]).ToArray(); var tracking = new Dictionary(); foreach (var shard in shards) { tracking[shard.Identity] = 0; } - long highWaterMark = long.MaxValue; - var initial = await database.FetchProjectionProgressFor(all, token).ConfigureAwait(false); - foreach (var state in initial) - { - if (state.ShardName == ShardState.HighWaterMark) - { - highWaterMark = state.Sequence; - } - else - { - tracking[state.ShardName] = state.Sequence; - } - } + var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false); if (tracking.isComplete(highWaterMark)) return; diff --git a/src/Marten/Storage/IMartenDatabase.cs b/src/Marten/Storage/IMartenDatabase.cs index 5ac23f40ce..caf2c9c66b 100644 --- a/src/Marten/Storage/IMartenDatabase.cs +++ b/src/Marten/Storage/IMartenDatabase.cs @@ -131,6 +131,13 @@ Task ProjectionProgressFor(ShardName name, /// /// Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token); + + /// + /// Fetch the highest assigned event sequence number in this database + /// + /// + /// + Task FetchHighestEventSequenceNumber(CancellationToken token = default); } public enum ConnectionUsage diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs index 76e20574d6..1b8dcb0caa 100644 --- a/src/Marten/Storage/MartenDatabase.EventStorage.cs +++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs @@ -38,6 +38,24 @@ public partial class MartenDatabase } } + /// + /// Fetch the highest assigned event sequence number in this database + /// + /// + /// + public async Task FetchHighestEventSequenceNumber(CancellationToken token = default) + { + await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); + await using var conn = CreateConnection(); + await conn.OpenAsync(token).ConfigureAwait(false); + var highest = (long)await conn + .CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;") + .ExecuteScalarAsync(token).ConfigureAwait(false); + + return highest; + } + + /// /// Fetch the current size of the event store tables, including the current value /// of the event sequence number diff --git a/src/Marten/Storage/StandinDatabase.cs b/src/Marten/Storage/StandinDatabase.cs index ced69a0f1e..37ef930391 100644 --- a/src/Marten/Storage/StandinDatabase.cs +++ b/src/Marten/Storage/StandinDatabase.cs @@ -233,6 +233,11 @@ public Task ProjectionProgressFor(ShardName name, CancellationToken token throw new NotImplementedException(); } + public async Task FetchHighestEventSequenceNumber(CancellationToken token = default) + { + throw new NotImplementedException(); + } + public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite) { throw new NotImplementedException();