Skip to content

Commit

Permalink
Initial version of query for non stale aggregate data.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Dec 17, 2024
1 parent ac44ecc commit dc7b005
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 3 deletions.
29 changes: 29 additions & 0 deletions src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using EventSourcingTests.FetchForWriting;
using EventSourcingTests.Projections;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Marten.Events.Daemon;
using Marten.Events.Projections;
Expand Down Expand Up @@ -95,4 +96,32 @@ await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(Simple
}

}

[Fact]
public async Task try_to_query_for_non_stale_data_by_aggregate_type()
{
var task = Task.Run(async () =>
{
using var session = theStore.LightweightSession();

for (int i = 0; i < 20; i++)
{
session.Events.StartStream(new AEvent(), new BEvent());
session.Events.StartStream(new CEvent(), new BEvent());
session.Events.StartStream(new DEvent(), new AEvent());
var streamId = session.Events.StartStream(new DEvent(), new CEvent());
await session.SaveChangesAsync();
}
});

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await Task.Delay(1.Seconds());

var items = await theSession.QueryForNonStaleData<SimpleAggregate>(30.Seconds()).ToListAsync();
items.Count.ShouldBeGreaterThan(0);

await task;
}
}
4 changes: 1 addition & 3 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
{
// Number of active projection shards, plus the high water mark
var shards = database.As<MartenDatabase>().Options.Projections.AsyncShardsPublishingType(aggregationType);
if (!shards.Any())
throw new InvalidOperationException(
$"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");
if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");

var all = shards.Concat([ShardName.HighWaterMark]).ToArray();
var tracking = new Dictionary<string, long>();
Expand Down
13 changes: 13 additions & 0 deletions src/Marten/IQuerySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ public interface IQuerySession: IDisposable, IAsyncDisposable

#endregion

/// <summary>
/// If you are querying for data against a projected event aggregation that is updated asynchronously
/// through the async daemon, this method will ensure that you are querying against the latest events appended
/// to the system by waiting for the aggregate to catch up to the current "high water mark" of the event store
/// at the time this query is executed.
/// </summary>
/// <param name="timeout"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
IMartenQueryable<T> QueryForNonStaleData<T>(TimeSpan timeout);

/// <summary>
/// Queries the document storage table for the document type T by supplied SQL. See
/// https://martendb.io/documents/querying/sql.html for more information on usage.
Expand Down Expand Up @@ -763,4 +774,6 @@ Task<DocumentMetadata> MetadataForAsync<T>(T entity,
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<T> QueryByPlanAsync<T>(IQueryPlan<T> plan, CancellationToken token = default);


}
9 changes: 9 additions & 0 deletions src/Marten/Internal/Sessions/QuerySession.Querying.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -22,6 +23,14 @@ public IMartenQueryable<T> Query<T>()
return new MartenLinqQueryable<T>(this);
}

public IMartenQueryable<T> QueryForNonStaleData<T>(TimeSpan timeout)
{
var queryable = new MartenLinqQueryable<T>(this);
queryable.MartenProvider.Waiter = new WaitForAggregate(timeout);

return queryable;
}

public IReadOnlyList<T> Query<T>(string sql, params object[] parameters)
{
assertNotDisposed();
Expand Down
10 changes: 10 additions & 0 deletions src/Marten/Linq/MartenLinqQueryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Exceptions;
using Marten.Internal.Sessions;
using Marten.Linq.Parsing;
Expand All @@ -16,6 +17,8 @@

namespace Marten.Linq;

internal record WaitForAggregate(TimeSpan Timeout);

internal class MartenLinqQueryProvider: IQueryProvider
{
private readonly QuerySession _session;
Expand All @@ -28,6 +31,8 @@ public MartenLinqQueryProvider(QuerySession session, Type type)

public Type SourceType { get; }

internal WaitForAggregate? Waiter { get; set; }

internal QueryStatistics Statistics { get; set; } = null!;

public IQueryable CreateQuery(Expression expression)
Expand Down Expand Up @@ -70,6 +75,11 @@ internal async ValueTask EnsureStorageExistsAsync(LinqQueryParser parser,
{
await _session.Database.EnsureStorageExistsAsync(documentType, cancellationToken).ConfigureAwait(false);
}

if (Waiter != null)
{
await _session.Database.WaitForNonStaleProjectionDataAsync(SourceType, Waiter.Timeout, cancellationToken).ConfigureAwait(false);
}
}


Expand Down

0 comments on commit dc7b005

Please sign in to comment.