diff --git a/docs/events/projections/async-daemon.md b/docs/events/projections/async-daemon.md
index d2c984e91e..b1b3971538 100644
--- a/docs/events/projections/async-daemon.md
+++ b/docs/events/projections/async-daemon.md
@@ -483,3 +483,48 @@ using multi-tenancy through a database per tenant. On these spans will be these
There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping`
that just emits and update every time that Marten has to "skip" stale events.
+
+## Querying for Non Stale Data
+
+There are some potential benefits to running projections asynchronously, namely:
+
+* Avoiding concurrent updates to aggregated documents so that the results are accurate, especially when the aggregation is "multi-stream"
+* Putting the work of building aggregates into a background process so you don't take the performance "hit" of doing that work during requests from a client
+
+All that being said, using asynchronous projections means you're going into the realm of [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency), and sometimes
+that's really inconvenient when your users or clients expect up to date information about the projected aggregate data.
+
+Not to worry though, because Marten will allow you to "wait" for an asynchronous projection to catch up so that you
+can query the latest information as all the events captured at the time of the query are processed through the asynchronous
+projection like so:
+
+
+
+```cs
+var builder = Host.CreateApplicationBuilder();
+builder.Services.AddMarten(opts =>
+{
+ opts.Connection(builder.Configuration.GetConnectionString("marten"));
+ opts.Projections.Add(ProjectionLifecycle.Async);
+}).AddAsyncDaemon(DaemonMode.HotCold);
+
+using var host = builder.Build();
+await host.StartAsync();
+
+// DocumentStore() is an extension method in Marten just
+// as a convenience method for test automation
+await using var session = host.DocumentStore().LightweightSession();
+
+// This query operation will first "wait" for the asynchronous projection building the
+// Trip aggregate document to catch up to at least the highest event sequence number assigned
+// at the time this method is called
+var latest = await session.QueryForNonStaleData(5.Seconds())
+ .OrderByDescending(x => x.Started)
+ .Take(10)
+ .ToListAsync();
+```
+snippet source | anchor
+
+
+Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to
+be both cautious with using this in general, and also cautious especially with the timeout setting.
diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md
index 3b23d9965f..c9933c01fb 100644
--- a/docs/scenarios/command_handler_workflow.md
+++ b/docs/scenarios/command_handler_workflow.md
@@ -170,7 +170,7 @@ builder.Services.AddMarten(opts =>
// need identity map mechanics in your commands or query handlers
.UseLightweightSessions();
```
-snippet source | anchor
+snippet source | anchor
It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization
diff --git a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs
new file mode 100644
index 0000000000..a56bb21840
--- /dev/null
+++ b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs
@@ -0,0 +1,161 @@
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using EventSourcingTests.Examples;
+using EventSourcingTests.FetchForWriting;
+using EventSourcingTests.Projections;
+using JasperFx.Core;
+using Marten;
+using Marten.Events;
+using Marten.Events.Daemon;
+using Marten.Events.Daemon.Resiliency;
+using Marten.Events.Projections;
+using Marten.Testing.Harness;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Hosting;
+using Shouldly;
+using Xunit;
+
+namespace EventSourcingTests.Aggregation;
+
+public class querying_with_non_stale_data : OneOffConfigurationsContext
+{
+ public querying_with_non_stale_data()
+ {
+ StoreOptions(opts =>
+ {
+ opts.Projections.Add(ProjectionLifecycle.Async);
+ opts.Projections.Snapshot(SnapshotLifecycle.Async);
+ });
+ }
+
+ [Fact]
+ public void can_find_the_shards_for_an_aggregate()
+ {
+ theStore.Options.Projections.AsyncShardsPublishingType(typeof(Lap))
+ .Single().Identity.ShouldBe("Lap:All");
+
+ theStore.Options.Projections.AsyncShardsPublishingType(typeof(SimpleAggregate))
+ .Single().Identity.ShouldBe("SimpleAggregate:All");
+ }
+
+ [Fact]
+ public async Task try_to_fetch_statistics_for_async_shards_smoke_tests()
+ {
+ theSession.Events.StartStream(new AEvent(), new BEvent());
+ theSession.Events.StartStream(new CEvent(), new BEvent());
+ theSession.Events.StartStream(new DEvent(), new AEvent());
+ theSession.Events.StartStream(new DEvent(), new CEvent());
+ await theSession.SaveChangesAsync();
+
+ var progressions =
+ await theStore.Storage.Database.FetchProjectionProgressFor([new ShardName("SimpleAggregate", "All"), ShardName.HighWaterMark]);
+
+ using var daemon = await theStore.BuildProjectionDaemonAsync();
+ await daemon.StartAllAsync();
+ await daemon.WaitForNonStaleData(5.Seconds());
+
+ var all = await theStore.Storage.Database.AllProjectionProgress();
+ all.Count.ShouldBe(3);
+
+ progressions =
+ await theStore.Storage.Database.FetchProjectionProgressFor([new ShardName("SimpleAggregate", "All"), ShardName.HighWaterMark]);
+
+
+ progressions.Count.ShouldBe(2);
+
+ foreach (var progression in progressions)
+ {
+ progression.Sequence.ShouldBeGreaterThan(0);
+ }
+
+ }
+
+ [Fact]
+ public async Task try_to_use_wait_for_non_stale_data_by_aggregate_type()
+ {
+ theSession.Events.StartStream(new AEvent(), new BEvent());
+ theSession.Events.StartStream(new CEvent(), new BEvent());
+ theSession.Events.StartStream(new DEvent(), new AEvent());
+ theSession.Events.StartStream(new DEvent(), new CEvent());
+ await theSession.SaveChangesAsync();
+
+ var waiter = Task.Run(async () =>
+ await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(SimpleAggregate), 5.Seconds(),
+ CancellationToken.None));
+
+ using var daemon = await theStore.BuildProjectionDaemonAsync();
+ await daemon.StartAllAsync();
+ await daemon.WaitForNonStaleData(5.Seconds());
+
+ await waiter;
+
+ var all = await theStore.Storage.Database.AllProjectionProgress();
+ all.Count.ShouldBe(3);
+
+ foreach (var progression in all)
+ {
+ progression.Sequence.ShouldBeGreaterThan(0);
+ }
+
+ }
+
+ [Fact]
+ public async Task try_to_query_for_non_stale_data_by_aggregate_type()
+ {
+ var task = Task.Run(async () =>
+ {
+ using var session = theStore.LightweightSession();
+
+ for (int i = 0; i < 20; i++)
+ {
+ session.Events.StartStream(new AEvent(), new BEvent());
+ session.Events.StartStream(new CEvent(), new BEvent());
+ session.Events.StartStream(new DEvent(), new AEvent());
+ var streamId = session.Events.StartStream(new DEvent(), new CEvent());
+ await session.SaveChangesAsync();
+ }
+ });
+
+ using var daemon = await theStore.BuildProjectionDaemonAsync();
+ await daemon.StartAllAsync();
+
+ await Task.Delay(1.Seconds());
+
+ var items = await theSession.QueryForNonStaleData(30.Seconds()).ToListAsync();
+ items.Count.ShouldBeGreaterThan(0);
+
+ await task;
+ }
+
+ public static async Task ExampleUsage()
+ {
+ #region sample_using_query_for_non_stale_data
+
+ var builder = Host.CreateApplicationBuilder();
+ builder.Services.AddMarten(opts =>
+ {
+ opts.Connection(builder.Configuration.GetConnectionString("marten"));
+ opts.Projections.Add(ProjectionLifecycle.Async);
+ }).AddAsyncDaemon(DaemonMode.HotCold);
+
+ using var host = builder.Build();
+ await host.StartAsync();
+
+ // DocumentStore() is an extension method in Marten just
+ // as a convenience method for test automation
+ await using var session = host.DocumentStore().LightweightSession();
+
+ // This query operation will first "wait" for the asynchronous projection building the
+ // Trip aggregate document to catch up to at least the highest event sequence number assigned
+ // at the time this method is called
+ var latest = await session.QueryForNonStaleData(5.Seconds())
+ .OrderByDescending(x => x.Started)
+ .Take(10)
+ .ToListAsync();
+
+ #endregion
+ }
+}
+
+
diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs
index a1fa54b60a..406d5cfa05 100644
--- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs
+++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs
@@ -130,4 +130,48 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
$"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}");
}
}
+
+ private static bool isComplete(this Dictionary tracking, long highWaterMark)
+ {
+ return tracking.Values.All(x => x >= highWaterMark);
+ }
+
+ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type aggregationType, TimeSpan timeout, CancellationToken token)
+ {
+ // Number of active projection shards, plus the high water mark
+ var shards = database.As().Options.Projections.AsyncShardsPublishingType(aggregationType);
+ if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");
+
+ var tracking = new Dictionary();
+ foreach (var shard in shards)
+ {
+ tracking[shard.Identity] = 0;
+ }
+
+ var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false);
+
+ if (tracking.isComplete(highWaterMark)) return;
+
+ using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(token);
+ cancellationSource.CancelAfter(timeout);
+
+ while (!cancellationSource.Token.IsCancellationRequested)
+ {
+ var current = await database.FetchProjectionProgressFor(shards, cancellationSource.Token).ConfigureAwait(false);
+ foreach (var state in current)
+ {
+ tracking[state.ShardName] = state.Sequence;
+ }
+
+ if (tracking.isComplete(highWaterMark)) return;
+
+ await Task.Delay(100.Milliseconds(), cancellationSource.Token).ConfigureAwait(false);
+ }
+
+ if (cancellationSource.IsCancellationRequested)
+ {
+ throw new TimeoutException(
+ $"The projections timed out before reaching the initial sequence of {highWaterMark}");
+ }
+ }
}
diff --git a/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs b/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs
index 7b4f135753..3f11d8ea22 100644
--- a/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs
+++ b/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs
@@ -1,4 +1,4 @@
-using Marten.Linq;
+using System.Linq;
using Marten.Linq.SqlGeneration;
using Weasel.Postgresql;
@@ -13,8 +13,17 @@ public ProjectionProgressStatement(EventGraph events)
_events = events;
}
+ ///
+ /// If set, filter the projection results to just this shard
+ ///
public ShardName Name { get; set; }
+
+ ///
+ /// If set, filter the projection results to these shard names
+ ///
+ public ShardName[]? Names { get; set; }
+
protected override void configure(ICommandBuilder builder)
{
if (_events.UseOptimizedProjectionRebuilds)
@@ -32,5 +41,12 @@ protected override void configure(ICommandBuilder builder)
builder.Append(" where name = ");
builder.AppendParameter(Name.Identity);
}
+
+ if (Names != null)
+ {
+ builder.Append(" where name = ANY(");
+ builder.AppendParameter(Names.Select(x => x.Identity).ToArray());
+ builder.Append(")");
+ }
}
}
diff --git a/src/Marten/Events/Daemon/ShardName.cs b/src/Marten/Events/Daemon/ShardName.cs
index b2f7bea430..7a1562a82b 100644
--- a/src/Marten/Events/Daemon/ShardName.cs
+++ b/src/Marten/Events/Daemon/ShardName.cs
@@ -22,6 +22,11 @@ public ShardName(string projectionName, string key, uint version)
Identity = $"{projectionName}:{key}";
}
+ if (projectionName == ShardState.HighWaterMark)
+ {
+ Identity = ShardState.HighWaterMark;
+ }
+
}
public ShardName(string projectionName, string key) : this(projectionName, key, 1)
@@ -50,6 +55,7 @@ public ShardName(string projectionName): this(projectionName, All, 1)
public string Identity { get; }
public uint Version { get; } = 1;
+ public static ShardName HighWaterMark { get; } = new(ShardState.HighWaterMark);
public override string ToString()
{
diff --git a/src/Marten/Events/Projections/ProjectionOptions.cs b/src/Marten/Events/Projections/ProjectionOptions.cs
index 71e6124638..eda3d8178d 100644
--- a/src/Marten/Events/Projections/ProjectionOptions.cs
+++ b/src/Marten/Events/Projections/ProjectionOptions.cs
@@ -535,6 +535,12 @@ internal IEnumerable AllPublishedTypes()
{
return All.Where(x => x.Lifecycle != ProjectionLifecycle.Live).SelectMany(x => x.PublishedTypes()).Distinct();
}
+
+ internal ShardName[] AsyncShardsPublishingType(Type aggregationType)
+ {
+ var sources = All.Where(x => x.Lifecycle == ProjectionLifecycle.Async && x.PublishedTypes().Contains(aggregationType)).Select(x => x.ProjectionName).ToArray();
+ return _asyncShards.Value.Values.Where(x => sources.Contains(x.Name.ProjectionName)).Select(x => x.Name).ToArray();
+ }
}
public class DuplicateSubscriptionNamesException: MartenException
diff --git a/src/Marten/IQuerySession.cs b/src/Marten/IQuerySession.cs
index 2275c96af1..9f11775fbc 100644
--- a/src/Marten/IQuerySession.cs
+++ b/src/Marten/IQuerySession.cs
@@ -165,6 +165,17 @@ public interface IQuerySession: IDisposable, IAsyncDisposable
#endregion
+ ///
+ /// If you are querying for data against a projected event aggregation that is updated asynchronously
+ /// through the async daemon, this method will ensure that you are querying against the latest events appended
+ /// to the system by waiting for the aggregate to catch up to the current "high water mark" of the event store
+ /// at the time this query is executed.
+ ///
+ ///
+ ///
+ ///
+ IMartenQueryable QueryForNonStaleData(TimeSpan timeout);
+
///
/// Queries the document storage table for the document type T by supplied SQL. See
/// https://martendb.io/documents/querying/sql.html for more information on usage.
@@ -763,4 +774,6 @@ Task MetadataForAsync(T entity,
///
///
Task QueryByPlanAsync(IQueryPlan plan, CancellationToken token = default);
+
+
}
diff --git a/src/Marten/Internal/Sessions/QuerySession.Querying.cs b/src/Marten/Internal/Sessions/QuerySession.Querying.cs
index 244d822110..cba5919c49 100644
--- a/src/Marten/Internal/Sessions/QuerySession.Querying.cs
+++ b/src/Marten/Internal/Sessions/QuerySession.Querying.cs
@@ -1,4 +1,5 @@
#nullable enable
+using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -22,6 +23,14 @@ public IMartenQueryable Query()
return new MartenLinqQueryable(this);
}
+ public IMartenQueryable QueryForNonStaleData(TimeSpan timeout)
+ {
+ var queryable = new MartenLinqQueryable(this);
+ queryable.MartenProvider.Waiter = new WaitForAggregate(timeout);
+
+ return queryable;
+ }
+
public IReadOnlyList Query(string sql, params object[] parameters)
{
assertNotDisposed();
diff --git a/src/Marten/Linq/MartenLinqQueryProvider.cs b/src/Marten/Linq/MartenLinqQueryProvider.cs
index cedbd42b83..5cc61de885 100644
--- a/src/Marten/Linq/MartenLinqQueryProvider.cs
+++ b/src/Marten/Linq/MartenLinqQueryProvider.cs
@@ -7,6 +7,7 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
+using Marten.Events;
using Marten.Exceptions;
using Marten.Internal.Sessions;
using Marten.Linq.Parsing;
@@ -16,6 +17,8 @@
namespace Marten.Linq;
+internal record WaitForAggregate(TimeSpan Timeout);
+
internal class MartenLinqQueryProvider: IQueryProvider
{
private readonly QuerySession _session;
@@ -28,6 +31,8 @@ public MartenLinqQueryProvider(QuerySession session, Type type)
public Type SourceType { get; }
+ internal WaitForAggregate? Waiter { get; set; }
+
internal QueryStatistics Statistics { get; set; } = null!;
public IQueryable CreateQuery(Expression expression)
@@ -70,6 +75,11 @@ internal async ValueTask EnsureStorageExistsAsync(LinqQueryParser parser,
{
await _session.Database.EnsureStorageExistsAsync(documentType, cancellationToken).ConfigureAwait(false);
}
+
+ if (Waiter != null)
+ {
+ await _session.Database.WaitForNonStaleProjectionDataAsync(SourceType, Waiter.Timeout, cancellationToken).ConfigureAwait(false);
+ }
}
diff --git a/src/Marten/Storage/IMartenDatabase.cs b/src/Marten/Storage/IMartenDatabase.cs
index dbd18f1b70..caf2c9c66b 100644
--- a/src/Marten/Storage/IMartenDatabase.cs
+++ b/src/Marten/Storage/IMartenDatabase.cs
@@ -97,6 +97,18 @@ Task FetchEventStoreStatistics(
Task> AllProjectionProgress(
CancellationToken token = default);
+ ///
+ /// Check the current progress of all asynchronous projections
+ ///
+ ///
+ ///
+ /// Specify the database containing this tenant id. If omitted, this method uses the default
+ /// database
+ ///
+ ///
+ Task> FetchProjectionProgressFor(ShardName[] names,
+ CancellationToken token = default);
+
///
/// Check the current progress of a single projection or projection shard
///
@@ -119,6 +131,13 @@ Task ProjectionProgressFor(ShardName name,
///
///
Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token);
+
+ ///
+ /// Fetch the highest assigned event sequence number in this database
+ ///
+ ///
+ ///
+ Task FetchHighestEventSequenceNumber(CancellationToken token = default);
}
public enum ConnectionUsage
diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs
index 0c6ff0eb45..1b8dcb0caa 100644
--- a/src/Marten/Storage/MartenDatabase.EventStorage.cs
+++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs
@@ -38,6 +38,24 @@ public partial class MartenDatabase
}
}
+ ///
+ /// Fetch the highest assigned event sequence number in this database
+ ///
+ ///
+ ///
+ public async Task FetchHighestEventSequenceNumber(CancellationToken token = default)
+ {
+ await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false);
+ await using var conn = CreateConnection();
+ await conn.OpenAsync(token).ConfigureAwait(false);
+ var highest = (long)await conn
+ .CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;")
+ .ExecuteScalarAsync(token).ConfigureAwait(false);
+
+ return highest;
+ }
+
+
///
/// Fetch the current size of the event store tables, including the current value
/// of the event sequence number
@@ -118,6 +136,24 @@ public async Task> AllProjectionProgress(
return await handler.HandleAsync(reader, null, token).ConfigureAwait(false);
}
+ public async Task> FetchProjectionProgressFor(ShardName[] names, CancellationToken token = default)
+ {
+ await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false);
+
+ var handler = (IQueryHandler>)new ListQueryHandler(
+ new ProjectionProgressStatement(Options.EventGraph){Names = names},
+ new ShardStateSelector(Options.EventGraph));
+
+ await using var conn = CreateConnection();
+ await conn.OpenAsync(token).ConfigureAwait(false);
+
+ var builder = new CommandBuilder();
+ handler.ConfigureCommand(builder, null);
+
+ await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false);
+ return await handler.HandleAsync(reader, null, token).ConfigureAwait(false);
+ }
+
///
/// Check the current progress of a single projection or projection shard
///
diff --git a/src/Marten/Storage/StandinDatabase.cs b/src/Marten/Storage/StandinDatabase.cs
index dec6507491..37ef930391 100644
--- a/src/Marten/Storage/StandinDatabase.cs
+++ b/src/Marten/Storage/StandinDatabase.cs
@@ -218,6 +218,11 @@ public Task> AllProjectionProgress(CancellationToken t
throw new NotImplementedException();
}
+ public async Task> FetchProjectionProgressFor(ShardName[] names, CancellationToken token = default)
+ {
+ throw new NotImplementedException();
+ }
+
public Task ProjectionProgressFor(ShardName name, CancellationToken token = default)
{
throw new NotImplementedException();
@@ -228,6 +233,11 @@ public Task ProjectionProgressFor(ShardName name, CancellationToken token
throw new NotImplementedException();
}
+ public async Task FetchHighestEventSequenceNumber(CancellationToken token = default)
+ {
+ throw new NotImplementedException();
+ }
+
public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite)
{
throw new NotImplementedException();