diff --git a/docs/events/projections/async-daemon.md b/docs/events/projections/async-daemon.md index d2c984e91e..b1b3971538 100644 --- a/docs/events/projections/async-daemon.md +++ b/docs/events/projections/async-daemon.md @@ -483,3 +483,48 @@ using multi-tenancy through a database per tenant. On these spans will be these There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping` that just emits and update every time that Marten has to "skip" stale events. + +## Querying for Non Stale Data + +There are some potential benefits to running projections asynchronously, namely: + +* Avoiding concurrent updates to aggregated documents so that the results are accurate, especially when the aggregation is "multi-stream" +* Putting the work of building aggregates into a background process so you don't take the performance "hit" of doing that work during requests from a client + +All that being said, using asynchronous projections means you're going into the realm of [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency), and sometimes +that's really inconvenient when your users or clients expect up to date information about the projected aggregate data. + +Not to worry though, because Marten will allow you to "wait" for an asynchronous projection to catch up so that you +can query the latest information as all the events captured at the time of the query are processed through the asynchronous +projection like so: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection(builder.Configuration.GetConnectionString("marten")); + opts.Projections.Add(ProjectionLifecycle.Async); +}).AddAsyncDaemon(DaemonMode.HotCold); + +using var host = builder.Build(); +await host.StartAsync(); + +// DocumentStore() is an extension method in Marten just +// as a convenience method for test automation +await using var session = host.DocumentStore().LightweightSession(); + +// This query operation will first "wait" for the asynchronous projection building the +// Trip aggregate document to catch up to at least the highest event sequence number assigned +// at the time this method is called +var latest = await session.QueryForNonStaleData(5.Seconds()) + .OrderByDescending(x => x.Started) + .Take(10) + .ToListAsync(); +``` +snippet source | anchor + + +Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to +be both cautious with using this in general, and also cautious especially with the timeout setting. diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md index 3b23d9965f..c9933c01fb 100644 --- a/docs/scenarios/command_handler_workflow.md +++ b/docs/scenarios/command_handler_workflow.md @@ -170,7 +170,7 @@ builder.Services.AddMarten(opts => // need identity map mechanics in your commands or query handlers .UseLightweightSessions(); ``` -snippet source | anchor +snippet source | anchor It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization diff --git a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs new file mode 100644 index 0000000000..a56bb21840 --- /dev/null +++ b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs @@ -0,0 +1,161 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.Examples; +using EventSourcingTests.FetchForWriting; +using EventSourcingTests.Projections; +using JasperFx.Core; +using Marten; +using Marten.Events; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Resiliency; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Aggregation; + +public class querying_with_non_stale_data : OneOffConfigurationsContext +{ + public querying_with_non_stale_data() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Async); + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + } + + [Fact] + public void can_find_the_shards_for_an_aggregate() + { + theStore.Options.Projections.AsyncShardsPublishingType(typeof(Lap)) + .Single().Identity.ShouldBe("Lap:All"); + + theStore.Options.Projections.AsyncShardsPublishingType(typeof(SimpleAggregate)) + .Single().Identity.ShouldBe("SimpleAggregate:All"); + } + + [Fact] + public async Task try_to_fetch_statistics_for_async_shards_smoke_tests() + { + theSession.Events.StartStream(new AEvent(), new BEvent()); + theSession.Events.StartStream(new CEvent(), new BEvent()); + theSession.Events.StartStream(new DEvent(), new AEvent()); + theSession.Events.StartStream(new DEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var progressions = + await theStore.Storage.Database.FetchProjectionProgressFor([new ShardName("SimpleAggregate", "All"), ShardName.HighWaterMark]); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(5.Seconds()); + + var all = await theStore.Storage.Database.AllProjectionProgress(); + all.Count.ShouldBe(3); + + progressions = + await theStore.Storage.Database.FetchProjectionProgressFor([new ShardName("SimpleAggregate", "All"), ShardName.HighWaterMark]); + + + progressions.Count.ShouldBe(2); + + foreach (var progression in progressions) + { + progression.Sequence.ShouldBeGreaterThan(0); + } + + } + + [Fact] + public async Task try_to_use_wait_for_non_stale_data_by_aggregate_type() + { + theSession.Events.StartStream(new AEvent(), new BEvent()); + theSession.Events.StartStream(new CEvent(), new BEvent()); + theSession.Events.StartStream(new DEvent(), new AEvent()); + theSession.Events.StartStream(new DEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var waiter = Task.Run(async () => + await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(SimpleAggregate), 5.Seconds(), + CancellationToken.None)); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(5.Seconds()); + + await waiter; + + var all = await theStore.Storage.Database.AllProjectionProgress(); + all.Count.ShouldBe(3); + + foreach (var progression in all) + { + progression.Sequence.ShouldBeGreaterThan(0); + } + + } + + [Fact] + public async Task try_to_query_for_non_stale_data_by_aggregate_type() + { + var task = Task.Run(async () => + { + using var session = theStore.LightweightSession(); + + for (int i = 0; i < 20; i++) + { + session.Events.StartStream(new AEvent(), new BEvent()); + session.Events.StartStream(new CEvent(), new BEvent()); + session.Events.StartStream(new DEvent(), new AEvent()); + var streamId = session.Events.StartStream(new DEvent(), new CEvent()); + await session.SaveChangesAsync(); + } + }); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + await Task.Delay(1.Seconds()); + + var items = await theSession.QueryForNonStaleData(30.Seconds()).ToListAsync(); + items.Count.ShouldBeGreaterThan(0); + + await task; + } + + public static async Task ExampleUsage() + { + #region sample_using_query_for_non_stale_data + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + opts.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.HotCold); + + using var host = builder.Build(); + await host.StartAsync(); + + // DocumentStore() is an extension method in Marten just + // as a convenience method for test automation + await using var session = host.DocumentStore().LightweightSession(); + + // This query operation will first "wait" for the asynchronous projection building the + // Trip aggregate document to catch up to at least the highest event sequence number assigned + // at the time this method is called + var latest = await session.QueryForNonStaleData(5.Seconds()) + .OrderByDescending(x => x.Started) + .Take(10) + .ToListAsync(); + + #endregion + } +} + + diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index a1fa54b60a..406d5cfa05 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -130,4 +130,48 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase $"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}"); } } + + private static bool isComplete(this Dictionary tracking, long highWaterMark) + { + return tracking.Values.All(x => x >= highWaterMark); + } + + public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type aggregationType, TimeSpan timeout, CancellationToken token) + { + // Number of active projection shards, plus the high water mark + var shards = database.As().Options.Projections.AsyncShardsPublishingType(aggregationType); + if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}"); + + var tracking = new Dictionary(); + foreach (var shard in shards) + { + tracking[shard.Identity] = 0; + } + + var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false); + + if (tracking.isComplete(highWaterMark)) return; + + using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(token); + cancellationSource.CancelAfter(timeout); + + while (!cancellationSource.Token.IsCancellationRequested) + { + var current = await database.FetchProjectionProgressFor(shards, cancellationSource.Token).ConfigureAwait(false); + foreach (var state in current) + { + tracking[state.ShardName] = state.Sequence; + } + + if (tracking.isComplete(highWaterMark)) return; + + await Task.Delay(100.Milliseconds(), cancellationSource.Token).ConfigureAwait(false); + } + + if (cancellationSource.IsCancellationRequested) + { + throw new TimeoutException( + $"The projections timed out before reaching the initial sequence of {highWaterMark}"); + } + } } diff --git a/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs b/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs index 7b4f135753..3f11d8ea22 100644 --- a/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs +++ b/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs @@ -1,4 +1,4 @@ -using Marten.Linq; +using System.Linq; using Marten.Linq.SqlGeneration; using Weasel.Postgresql; @@ -13,8 +13,17 @@ public ProjectionProgressStatement(EventGraph events) _events = events; } + /// + /// If set, filter the projection results to just this shard + /// public ShardName Name { get; set; } + + /// + /// If set, filter the projection results to these shard names + /// + public ShardName[]? Names { get; set; } + protected override void configure(ICommandBuilder builder) { if (_events.UseOptimizedProjectionRebuilds) @@ -32,5 +41,12 @@ protected override void configure(ICommandBuilder builder) builder.Append(" where name = "); builder.AppendParameter(Name.Identity); } + + if (Names != null) + { + builder.Append(" where name = ANY("); + builder.AppendParameter(Names.Select(x => x.Identity).ToArray()); + builder.Append(")"); + } } } diff --git a/src/Marten/Events/Daemon/ShardName.cs b/src/Marten/Events/Daemon/ShardName.cs index b2f7bea430..7a1562a82b 100644 --- a/src/Marten/Events/Daemon/ShardName.cs +++ b/src/Marten/Events/Daemon/ShardName.cs @@ -22,6 +22,11 @@ public ShardName(string projectionName, string key, uint version) Identity = $"{projectionName}:{key}"; } + if (projectionName == ShardState.HighWaterMark) + { + Identity = ShardState.HighWaterMark; + } + } public ShardName(string projectionName, string key) : this(projectionName, key, 1) @@ -50,6 +55,7 @@ public ShardName(string projectionName): this(projectionName, All, 1) public string Identity { get; } public uint Version { get; } = 1; + public static ShardName HighWaterMark { get; } = new(ShardState.HighWaterMark); public override string ToString() { diff --git a/src/Marten/Events/Projections/ProjectionOptions.cs b/src/Marten/Events/Projections/ProjectionOptions.cs index 71e6124638..eda3d8178d 100644 --- a/src/Marten/Events/Projections/ProjectionOptions.cs +++ b/src/Marten/Events/Projections/ProjectionOptions.cs @@ -535,6 +535,12 @@ internal IEnumerable AllPublishedTypes() { return All.Where(x => x.Lifecycle != ProjectionLifecycle.Live).SelectMany(x => x.PublishedTypes()).Distinct(); } + + internal ShardName[] AsyncShardsPublishingType(Type aggregationType) + { + var sources = All.Where(x => x.Lifecycle == ProjectionLifecycle.Async && x.PublishedTypes().Contains(aggregationType)).Select(x => x.ProjectionName).ToArray(); + return _asyncShards.Value.Values.Where(x => sources.Contains(x.Name.ProjectionName)).Select(x => x.Name).ToArray(); + } } public class DuplicateSubscriptionNamesException: MartenException diff --git a/src/Marten/IQuerySession.cs b/src/Marten/IQuerySession.cs index 2275c96af1..9f11775fbc 100644 --- a/src/Marten/IQuerySession.cs +++ b/src/Marten/IQuerySession.cs @@ -165,6 +165,17 @@ public interface IQuerySession: IDisposable, IAsyncDisposable #endregion + /// + /// If you are querying for data against a projected event aggregation that is updated asynchronously + /// through the async daemon, this method will ensure that you are querying against the latest events appended + /// to the system by waiting for the aggregate to catch up to the current "high water mark" of the event store + /// at the time this query is executed. + /// + /// + /// + /// + IMartenQueryable QueryForNonStaleData(TimeSpan timeout); + /// /// Queries the document storage table for the document type T by supplied SQL. See /// https://martendb.io/documents/querying/sql.html for more information on usage. @@ -763,4 +774,6 @@ Task MetadataForAsync(T entity, /// /// Task QueryByPlanAsync(IQueryPlan plan, CancellationToken token = default); + + } diff --git a/src/Marten/Internal/Sessions/QuerySession.Querying.cs b/src/Marten/Internal/Sessions/QuerySession.Querying.cs index 244d822110..cba5919c49 100644 --- a/src/Marten/Internal/Sessions/QuerySession.Querying.cs +++ b/src/Marten/Internal/Sessions/QuerySession.Querying.cs @@ -1,4 +1,5 @@ #nullable enable +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -22,6 +23,14 @@ public IMartenQueryable Query() return new MartenLinqQueryable(this); } + public IMartenQueryable QueryForNonStaleData(TimeSpan timeout) + { + var queryable = new MartenLinqQueryable(this); + queryable.MartenProvider.Waiter = new WaitForAggregate(timeout); + + return queryable; + } + public IReadOnlyList Query(string sql, params object[] parameters) { assertNotDisposed(); diff --git a/src/Marten/Linq/MartenLinqQueryProvider.cs b/src/Marten/Linq/MartenLinqQueryProvider.cs index cedbd42b83..5cc61de885 100644 --- a/src/Marten/Linq/MartenLinqQueryProvider.cs +++ b/src/Marten/Linq/MartenLinqQueryProvider.cs @@ -7,6 +7,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Marten.Events; using Marten.Exceptions; using Marten.Internal.Sessions; using Marten.Linq.Parsing; @@ -16,6 +17,8 @@ namespace Marten.Linq; +internal record WaitForAggregate(TimeSpan Timeout); + internal class MartenLinqQueryProvider: IQueryProvider { private readonly QuerySession _session; @@ -28,6 +31,8 @@ public MartenLinqQueryProvider(QuerySession session, Type type) public Type SourceType { get; } + internal WaitForAggregate? Waiter { get; set; } + internal QueryStatistics Statistics { get; set; } = null!; public IQueryable CreateQuery(Expression expression) @@ -70,6 +75,11 @@ internal async ValueTask EnsureStorageExistsAsync(LinqQueryParser parser, { await _session.Database.EnsureStorageExistsAsync(documentType, cancellationToken).ConfigureAwait(false); } + + if (Waiter != null) + { + await _session.Database.WaitForNonStaleProjectionDataAsync(SourceType, Waiter.Timeout, cancellationToken).ConfigureAwait(false); + } } diff --git a/src/Marten/Storage/IMartenDatabase.cs b/src/Marten/Storage/IMartenDatabase.cs index dbd18f1b70..caf2c9c66b 100644 --- a/src/Marten/Storage/IMartenDatabase.cs +++ b/src/Marten/Storage/IMartenDatabase.cs @@ -97,6 +97,18 @@ Task FetchEventStoreStatistics( Task> AllProjectionProgress( CancellationToken token = default); + /// + /// Check the current progress of all asynchronous projections + /// + /// + /// + /// Specify the database containing this tenant id. If omitted, this method uses the default + /// database + /// + /// + Task> FetchProjectionProgressFor(ShardName[] names, + CancellationToken token = default); + /// /// Check the current progress of a single projection or projection shard /// @@ -119,6 +131,13 @@ Task ProjectionProgressFor(ShardName name, /// /// Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token); + + /// + /// Fetch the highest assigned event sequence number in this database + /// + /// + /// + Task FetchHighestEventSequenceNumber(CancellationToken token = default); } public enum ConnectionUsage diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs index 0c6ff0eb45..1b8dcb0caa 100644 --- a/src/Marten/Storage/MartenDatabase.EventStorage.cs +++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs @@ -38,6 +38,24 @@ public partial class MartenDatabase } } + /// + /// Fetch the highest assigned event sequence number in this database + /// + /// + /// + public async Task FetchHighestEventSequenceNumber(CancellationToken token = default) + { + await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); + await using var conn = CreateConnection(); + await conn.OpenAsync(token).ConfigureAwait(false); + var highest = (long)await conn + .CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;") + .ExecuteScalarAsync(token).ConfigureAwait(false); + + return highest; + } + + /// /// Fetch the current size of the event store tables, including the current value /// of the event sequence number @@ -118,6 +136,24 @@ public async Task> AllProjectionProgress( return await handler.HandleAsync(reader, null, token).ConfigureAwait(false); } + public async Task> FetchProjectionProgressFor(ShardName[] names, CancellationToken token = default) + { + await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); + + var handler = (IQueryHandler>)new ListQueryHandler( + new ProjectionProgressStatement(Options.EventGraph){Names = names}, + new ShardStateSelector(Options.EventGraph)); + + await using var conn = CreateConnection(); + await conn.OpenAsync(token).ConfigureAwait(false); + + var builder = new CommandBuilder(); + handler.ConfigureCommand(builder, null); + + await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); + return await handler.HandleAsync(reader, null, token).ConfigureAwait(false); + } + /// /// Check the current progress of a single projection or projection shard /// diff --git a/src/Marten/Storage/StandinDatabase.cs b/src/Marten/Storage/StandinDatabase.cs index dec6507491..37ef930391 100644 --- a/src/Marten/Storage/StandinDatabase.cs +++ b/src/Marten/Storage/StandinDatabase.cs @@ -218,6 +218,11 @@ public Task> AllProjectionProgress(CancellationToken t throw new NotImplementedException(); } + public async Task> FetchProjectionProgressFor(ShardName[] names, CancellationToken token = default) + { + throw new NotImplementedException(); + } + public Task ProjectionProgressFor(ShardName name, CancellationToken token = default) { throw new NotImplementedException(); @@ -228,6 +233,11 @@ public Task ProjectionProgressFor(ShardName name, CancellationToken token throw new NotImplementedException(); } + public async Task FetchHighestEventSequenceNumber(CancellationToken token = default) + { + throw new NotImplementedException(); + } + public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite) { throw new NotImplementedException();