Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non stale querying of aggregate projections running asynchronously #3601

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/events/projections/async-daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,48 @@ using multi-tenancy through a database per tenant. On these spans will be these

There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping`
that just emits and update every time that Marten has to "skip" stale events.

## Querying for Non Stale Data

There are some potential benefits to running projections asynchronously, namely:

* Avoiding concurrent updates to aggregated documents so that the results are accurate, especially when the aggregation is "multi-stream"
* Putting the work of building aggregates into a background process so you don't take the performance "hit" of doing that work during requests from a client

All that being said, using asynchronous projections means you're going into the realm of [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency), and sometimes
that's really inconvenient when your users or clients expect up to date information about the projected aggregate data.

Not to worry though, because Marten will allow you to "wait" for an asynchronous projection to catch up so that you
can query the latest information as all the events captured at the time of the query are processed through the asynchronous
projection like so:

<!-- snippet: sample_using_query_for_non_stale_data -->
<a id='snippet-sample_using_query_for_non_stale_data'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("marten"));
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
}).AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

// DocumentStore() is an extension method in Marten just
// as a convenience method for test automation
await using var session = host.DocumentStore().LightweightSession();

// This query operation will first "wait" for the asynchronous projection building the
// Trip aggregate document to catch up to at least the highest event sequence number assigned
// at the time this method is called
var latest = await session.QueryForNonStaleData<Trip>(5.Seconds())
.OrderByDescending(x => x.Started)
.Take(10)
.ToListAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs#L133-L157' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_query_for_non_stale_data' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to
be both cautious with using this in general, and also cautious especially with the timeout setting.
2 changes: 1 addition & 1 deletion docs/scenarios/command_handler_workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ builder.Services.AddMarten(opts =>
// need identity map mechanics in your commands or query handlers
.UseLightweightSessions();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs#L532-L550' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_use_identity_map_for_inline_aggregates' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs#L611-L629' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_use_identity_map_for_inline_aggregates' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization
Expand Down
161 changes: 161 additions & 0 deletions src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Examples;
using EventSourcingTests.FetchForWriting;
using EventSourcingTests.Projections;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Aggregation;

public class querying_with_non_stale_data : OneOffConfigurationsContext
{
public querying_with_non_stale_data()
{
StoreOptions(opts =>
{
opts.Projections.Add<LapMultiStreamProjection>(ProjectionLifecycle.Async);
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async);
});
}

[Fact]
public void can_find_the_shards_for_an_aggregate()
{
theStore.Options.Projections.AsyncShardsPublishingType(typeof(Lap))
.Single().Identity.ShouldBe("Lap:All");

theStore.Options.Projections.AsyncShardsPublishingType(typeof(SimpleAggregate))
.Single().Identity.ShouldBe("SimpleAggregate:All");
}

[Fact]
public async Task try_to_fetch_statistics_for_async_shards_smoke_tests()
{
theSession.Events.StartStream(new AEvent(), new BEvent());
theSession.Events.StartStream(new CEvent(), new BEvent());
theSession.Events.StartStream(new DEvent(), new AEvent());
theSession.Events.StartStream(new DEvent(), new CEvent());
await theSession.SaveChangesAsync();

var progressions =
await theStore.Storage.Database.FetchProjectionProgressFor([new ShardName("SimpleAggregate", "All"), ShardName.HighWaterMark]);

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(5.Seconds());

var all = await theStore.Storage.Database.AllProjectionProgress();
all.Count.ShouldBe(3);

progressions =
await theStore.Storage.Database.FetchProjectionProgressFor([new ShardName("SimpleAggregate", "All"), ShardName.HighWaterMark]);


progressions.Count.ShouldBe(2);

foreach (var progression in progressions)
{
progression.Sequence.ShouldBeGreaterThan(0);
}

}

[Fact]
public async Task try_to_use_wait_for_non_stale_data_by_aggregate_type()
{
theSession.Events.StartStream(new AEvent(), new BEvent());
theSession.Events.StartStream(new CEvent(), new BEvent());
theSession.Events.StartStream(new DEvent(), new AEvent());
theSession.Events.StartStream(new DEvent(), new CEvent());
await theSession.SaveChangesAsync();

var waiter = Task.Run(async () =>
await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(SimpleAggregate), 5.Seconds(),
CancellationToken.None));

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(5.Seconds());

await waiter;

var all = await theStore.Storage.Database.AllProjectionProgress();
all.Count.ShouldBe(3);

foreach (var progression in all)
{
progression.Sequence.ShouldBeGreaterThan(0);
}

}

[Fact]
public async Task try_to_query_for_non_stale_data_by_aggregate_type()
{
var task = Task.Run(async () =>
{
using var session = theStore.LightweightSession();

for (int i = 0; i < 20; i++)
{
session.Events.StartStream(new AEvent(), new BEvent());
session.Events.StartStream(new CEvent(), new BEvent());
session.Events.StartStream(new DEvent(), new AEvent());
var streamId = session.Events.StartStream(new DEvent(), new CEvent());
await session.SaveChangesAsync();
}
});

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await Task.Delay(1.Seconds());

var items = await theSession.QueryForNonStaleData<SimpleAggregate>(30.Seconds()).ToListAsync();
items.Count.ShouldBeGreaterThan(0);

await task;
}

public static async Task ExampleUsage()
{
#region sample_using_query_for_non_stale_data

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("marten"));
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
}).AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

// DocumentStore() is an extension method in Marten just
// as a convenience method for test automation
await using var session = host.DocumentStore().LightweightSession();

// This query operation will first "wait" for the asynchronous projection building the
// Trip aggregate document to catch up to at least the highest event sequence number assigned
// at the time this method is called
var latest = await session.QueryForNonStaleData<Trip>(5.Seconds())
.OrderByDescending(x => x.Started)
.Take(10)
.ToListAsync();

#endregion
}
}


44 changes: 44 additions & 0 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,48 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
$"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}");
}
}

private static bool isComplete(this Dictionary<string, long> tracking, long highWaterMark)
{
return tracking.Values.All(x => x >= highWaterMark);
}

public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type aggregationType, TimeSpan timeout, CancellationToken token)
{
// Number of active projection shards, plus the high water mark
var shards = database.As<MartenDatabase>().Options.Projections.AsyncShardsPublishingType(aggregationType);
if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");

var tracking = new Dictionary<string, long>();
foreach (var shard in shards)
{
tracking[shard.Identity] = 0;
}

var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false);

if (tracking.isComplete(highWaterMark)) return;

using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(token);
cancellationSource.CancelAfter(timeout);

while (!cancellationSource.Token.IsCancellationRequested)
{
var current = await database.FetchProjectionProgressFor(shards, cancellationSource.Token).ConfigureAwait(false);
foreach (var state in current)
{
tracking[state.ShardName] = state.Sequence;
}

if (tracking.isComplete(highWaterMark)) return;

await Task.Delay(100.Milliseconds(), cancellationSource.Token).ConfigureAwait(false);
}

if (cancellationSource.IsCancellationRequested)
{
throw new TimeoutException(
$"The projections timed out before reaching the initial sequence of {highWaterMark}");
}
}
}
18 changes: 17 additions & 1 deletion src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Marten.Linq;
using System.Linq;
using Marten.Linq.SqlGeneration;
using Weasel.Postgresql;

Expand All @@ -13,8 +13,17 @@ public ProjectionProgressStatement(EventGraph events)
_events = events;
}

/// <summary>
/// If set, filter the projection results to just this shard
/// </summary>
public ShardName Name { get; set; }


/// <summary>
/// If set, filter the projection results to these shard names
/// </summary>
public ShardName[]? Names { get; set; }

protected override void configure(ICommandBuilder builder)
{
if (_events.UseOptimizedProjectionRebuilds)
Expand All @@ -32,5 +41,12 @@ protected override void configure(ICommandBuilder builder)
builder.Append(" where name = ");
builder.AppendParameter(Name.Identity);
}

if (Names != null)
{
builder.Append(" where name = ANY(");
builder.AppendParameter(Names.Select(x => x.Identity).ToArray());
builder.Append(")");
}
}
}
6 changes: 6 additions & 0 deletions src/Marten/Events/Daemon/ShardName.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public ShardName(string projectionName, string key, uint version)
Identity = $"{projectionName}:{key}";
}

if (projectionName == ShardState.HighWaterMark)
{
Identity = ShardState.HighWaterMark;
}

}

public ShardName(string projectionName, string key) : this(projectionName, key, 1)
Expand Down Expand Up @@ -50,6 +55,7 @@ public ShardName(string projectionName): this(projectionName, All, 1)
public string Identity { get; }

public uint Version { get; } = 1;
public static ShardName HighWaterMark { get; } = new(ShardState.HighWaterMark);

public override string ToString()
{
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,12 @@ internal IEnumerable<Type> AllPublishedTypes()
{
return All.Where(x => x.Lifecycle != ProjectionLifecycle.Live).SelectMany(x => x.PublishedTypes()).Distinct();
}

internal ShardName[] AsyncShardsPublishingType(Type aggregationType)
{
var sources = All.Where(x => x.Lifecycle == ProjectionLifecycle.Async && x.PublishedTypes().Contains(aggregationType)).Select(x => x.ProjectionName).ToArray();
return _asyncShards.Value.Values.Where(x => sources.Contains(x.Name.ProjectionName)).Select(x => x.Name).ToArray();
}
}

public class DuplicateSubscriptionNamesException: MartenException
Expand Down
13 changes: 13 additions & 0 deletions src/Marten/IQuerySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ public interface IQuerySession: IDisposable, IAsyncDisposable

#endregion

/// <summary>
/// If you are querying for data against a projected event aggregation that is updated asynchronously
/// through the async daemon, this method will ensure that you are querying against the latest events appended
/// to the system by waiting for the aggregate to catch up to the current "high water mark" of the event store
/// at the time this query is executed.
/// </summary>
/// <param name="timeout"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
IMartenQueryable<T> QueryForNonStaleData<T>(TimeSpan timeout);

/// <summary>
/// Queries the document storage table for the document type T by supplied SQL. See
/// https://martendb.io/documents/querying/sql.html for more information on usage.
Expand Down Expand Up @@ -763,4 +774,6 @@ Task<DocumentMetadata> MetadataForAsync<T>(T entity,
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<T> QueryByPlanAsync<T>(IQueryPlan<T> plan, CancellationToken token = default);


}
9 changes: 9 additions & 0 deletions src/Marten/Internal/Sessions/QuerySession.Querying.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -22,6 +23,14 @@ public IMartenQueryable<T> Query<T>()
return new MartenLinqQueryable<T>(this);
}

public IMartenQueryable<T> QueryForNonStaleData<T>(TimeSpan timeout)
{
var queryable = new MartenLinqQueryable<T>(this);
queryable.MartenProvider.Waiter = new WaitForAggregate(timeout);

return queryable;
}

public IReadOnlyList<T> Query<T>(string sql, params object[] parameters)
{
assertNotDisposed();
Expand Down
Loading
Loading