Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter event types #3415

Merged
merged 2 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ const config: UserConfig<DefaultTheme.Config> = {
text: 'Aggregate Projections', link: '/events/projections/aggregate-projections', items: [
{ text: 'Live Aggregations', link: '/events/projections/live-aggregates' },
{ text: 'Multi-Stream Projections', link: '/events/projections/multi-stream-projections' },
{ text: 'Custom Aggregations', link: '/events/projections/custom-aggregates' },]
{ text: 'Explicit Aggregations', link: '/events/projections/custom-aggregates' },]
},
{ text: 'Event Projections', link: '/events/projections/event-projections' },
{ text: 'Custom Projections', link: '/events/projections/custom' },
Expand Down
23 changes: 19 additions & 4 deletions docs/events/projections/custom-aggregates.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Custom Aggregations
# Explicit Aggregations

Once in awhile users are hitting use cases or desired functionality for aggregation projections that just don't fit in well to our [`SingleStreamProjection<T>`](/events/projections/aggregate-projections) or [`MultiStreamProjection<TDoc, TId>`](/events/projections/multi-stream-projections) models. Not to worry though, because
Marten V5.0 introduces the new `CustomAggregation<T>` base type that will let you define aggregation projections with explicit user code while still taking advantage of some of the parallelization
optimizations that were built for the previous aggregation types running in the [async daemon](/events/projections/async-daemon);
The original concept for Marten projections was the conventional method model (`Apply()` / `Create()` / `ShouldDelete()` methods), but we
quickly found out that the workflow generated from these methods just isn't sufficient for many user needs. At the same time,
other users just prefer explicit code anyway, so Marten provides the `CustomProjection<TDoc, TId>` base class as a way to
configure custom projections that use explicit code for the actual work of building projected, aggregate documents from
raw events.

Alright, let's jump right into an example. Two of the drivers for this feature were for aggregations to document types that were [soft-deleted](/documents/deletes.html#soft-deletes) or aggregations where some events should only apply to the aggregate document if the document already existed. To illustrate this with a contrived example, let's say that we've got these event types:

Expand Down Expand Up @@ -136,3 +138,16 @@ All aggregations in Marten come in two parts:

`CustomAggregate` supports aggregating by the stream identity as shown above. You can also use all the same customizable grouping functionality as
the older [MultiStreamProjection](/events/projections/multi-stream-projections) subclass.

## Simple Workflows <Badge type="tip" text="7.28" />

The base class can be used for strictly live aggregations. If all you're doing is using this
mechanism for `Live` aggregation, or have a simple workflow where the aggregate is always
going to be built strictly from the event data, you can override _only_ the `Apply()` method
as shown below:

snippet: sample_using_simple_explicit_code_for_live_aggregation

Note that this usage is valid for all possible projection lifecycles now (`Live`, `Inline`, and `Async`).


9 changes: 8 additions & 1 deletion docs/events/querying.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void can_query_against_event_type()

You can use any Linq operator that Marten supports to query against event data. We think that this functionality is probably more useful for diagnostics or troubleshooting rather than something you would routinely use to support your application. We recommend that you favor event projection views over querying within the raw event table.

With Marten 1.0, you can issue queries with Marten's full Linq support against the raw event data with this method:
You can issue queries with Marten's full Linq support against the raw event data with this method:

<!-- snippet: sample_example_of_querying_for_event_data -->
<a id='snippet-sample_example_of_querying_for_event_data'></a>
Expand All @@ -309,3 +309,10 @@ public void example_of_querying_for_event_data(IDocumentSession session, Guid st
<!-- endSnippet -->

This mechanism will allow you to query by any property of the `IEvent` interface shown above.

## Filter by Event Types <Badge type="tip" text="7.28" />

You can limit the types of events returned by the LINQ query through the `EventTypesAre()` extension method
in a `Where()` clause as shown below:

snippet: sample_using_event_types_are
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Internal.Sessions;
Expand Down Expand Up @@ -130,8 +133,11 @@ public async Task ForEventsAppendedToTenantedSession_CustomProjection()
singleCompanyLocation.Id.ShouldBe(companyLocationId);
singleCompanyLocation.Name.ShouldBe(companyLocationName);
}

}



public record Event;

public record ResourceCreatedEvent(string Name, Guid OrganisationId): Event;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Internal.Sessions;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Projections;

public class using_explicit_code_for_live_aggregation : OneOffConfigurationsContext
{

[Fact]
public async Task using_a_custom_projection_for_live_aggregation()
{
StoreOptions(opts =>
{
opts.Projections.Add(new ExplicitCounter(), ProjectionLifecycle.Live);
});

var streamId = theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new AEvent(), new BEvent(), new CEvent(), new CEvent(), new CEvent()).Id;
await theSession.SaveChangesAsync();

var aggregate = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
aggregate.ACount.ShouldBe(2);
aggregate.BCount.ShouldBe(1);
aggregate.CCount.ShouldBe(3);
aggregate.Id.ShouldBe(streamId);
}
}

#region sample_using_simple_explicit_code_for_live_aggregation

public class ExplicitCounter: CustomProjection<SimpleAggregate, Guid>
{
public override SimpleAggregate Apply(SimpleAggregate snapshot, IReadOnlyList<IEvent> events)
{
snapshot ??= new SimpleAggregate();
foreach (var e in events.Select(x => x.Data))
{
if (e is AEvent) snapshot.ACount++;
if (e is BEvent) snapshot.BCount++;
if (e is CEvent) snapshot.CCount++;
if (e is DEvent) snapshot.DCount++;
}

// You have to explicitly return the new value
// of the aggregated document no matter what!
return snapshot;
}
}

#endregion
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten;
using Marten.Events;
using Marten.Testing.Harness;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities;
using Shouldly;
using Weasel.Core;
using Xunit;
using Xunit.Abstractions;

namespace EventSourcingTests;

public class query_against_event_documents_Tests: OneOffConfigurationsContext
public class querying_event_data_with_linq: OneOffConfigurationsContext
{
private readonly ITestOutputHelper _output;
private readonly MembersJoined joined1 = new MembersJoined { Members = new string[] { "Rand", "Matt", "Perrin", "Thom" } };
Expand Down Expand Up @@ -228,11 +231,37 @@ public void can_search_by_stream()
.Count(x => x.StreamId == stream1).ShouldBe(2);
}

[Fact]
public async Task can_search_by_event_types()
{
theSession.Events.StartStream<Quest>(joined1, departed1);
theSession.Events.StartStream<Quest>(joined2, departed2);

theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new BEvent(), new CEvent(), new DEvent());
theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new BEvent(), new DEvent(), new DEvent());
theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new CEvent(), new CEvent(), new DEvent());

await theSession.SaveChangesAsync();

#region sample_using_event_types_are

var raw = await theSession.Events.QueryAllRawEvents()
.Where(x => x.EventTypesAre(typeof(CEvent), typeof(DEvent)))
.ToListAsync();

#endregion

foreach (var e in raw)
{
((e.Data is CEvent) || (e.Data is DEvent)).ShouldBeTrue();
}
}

/*
* MORE!!!
* Async everything
*/
public query_against_event_documents_Tests(ITestOutputHelper output)
public querying_event_data_with_linq(ITestOutputHelper output)
{
_output = output;
theStore.Advanced.Clean.DeleteAllEventData();
Expand Down
2 changes: 2 additions & 0 deletions src/Marten/Events/AggregateToExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ public static async Task<T> AggregateToAsync<T>(this IQueryable<IEvent> queryabl
{
return await AggregateToAsync(queryable.As<IMartenQueryable<IEvent>>(), state, token).ConfigureAwait(false);
}


}
70 changes: 66 additions & 4 deletions src/Marten/Events/Aggregation/CustomProjection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core.Reflection;
Expand All @@ -18,11 +19,17 @@ namespace Marten.Events.Aggregation;

/// <summary>
/// Helpful as a base class for more custom aggregation projections that are not supported
/// by the Single/MultipleStreamProjections
/// by the Single/MultipleStreamProjections -- or if you'd just prefer to use explicit code
/// </summary>
/// <typeparam name="TDoc"></typeparam>
/// <typeparam name="TId"></typeparam>
public abstract class CustomProjection<TDoc, TId>: ProjectionBase, IAggregationRuntime<TDoc, TId>, IProjectionSource, IAggregateProjection, IAggregateProjectionWithSideEffects<TDoc>
public abstract class CustomProjection<TDoc, TId>:
ProjectionBase,
IAggregationRuntime<TDoc, TId>,
IProjectionSource,
IAggregateProjection,
IAggregateProjectionWithSideEffects<TDoc>,
ILiveAggregator<TDoc>
{
private IDocumentStorage<TDoc, TId> _storage;

Expand Down Expand Up @@ -107,9 +114,44 @@ async ValueTask<EventRangeGroup> IAggregationRuntime.GroupEvents(DocumentStore s
/// <param name="cancellation"></param>
/// <param name="lifecycle"></param>
/// <returns></returns>
public abstract ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice<TDoc, TId> slice,
public virtual async ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice<TDoc, TId> slice,
CancellationToken cancellation,
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline);
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
{
if (!slice.Events().Any()) return;

var snapshot = slice.Aggregate;
snapshot = await BuildAsync(session, snapshot, slice.Events()).ConfigureAwait(false);
ApplyMetadata(snapshot, slice.Events().Last());

slice.Aggregate = snapshot;
session.Store(snapshot);
}

/// <summary>
/// Override if the aggregation always updates the aggregate from new events, but may
/// require data lookup to update the snapshot
/// </summary>
/// <param name="session"></param>
/// <param name="snapshot"></param>
/// <param name="events"></param>
/// <returns></returns>
public virtual ValueTask<TDoc> BuildAsync(IQuerySession session, TDoc? snapshot, IReadOnlyList<IEvent> events)
{
return new ValueTask<TDoc>(Apply(snapshot, events));
}

/// <summary>
/// Override if the aggregation always updates the aggregate from new events and you
/// don't need to do any other kind of data lookup. Simplest possible way to use this
/// </summary>
/// <param name="snapshot"></param>
/// <param name="events"></param>
/// <returns></returns>
public virtual TDoc Apply(TDoc? snapshot, IReadOnlyList<IEvent> events)
{
throw new NotImplementedException("Did you forget to implement this method?");
}

public IAggregateVersioning Versioning { get; set; }

Expand Down Expand Up @@ -289,5 +331,25 @@ public void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions stor
mapping.UseVersionFromMatchingStream =
Lifecycle == ProjectionLifecycle.Inline && storeOptions.Events.AppendMode == EventAppendMode.Quick && Slicer is ISingleStreamSlicer;
}

TDoc ILiveAggregator<TDoc>.Build(IReadOnlyList<IEvent> events, IQuerySession session, TDoc snapshot)
{
throw new NotSupportedException("It's not supported to do a synchronous, live aggregation with a custom projection");
}

async ValueTask<TDoc> ILiveAggregator<TDoc>.BuildAsync(IReadOnlyList<IEvent> events, IQuerySession session, TDoc snapshot, CancellationToken cancellation)
{
if (!events.Any()) return default;

var documentSessionBase = session.As<DocumentSessionBase>();

var slice = new EventSlice<TDoc, TId>(default, session, events);
await ApplyChangesAsync(documentSessionBase, slice, cancellation).ConfigureAwait(false);

ApplyMetadata(slice.Aggregate, events.Last());

return slice.Aggregate;
}
}


42 changes: 42 additions & 0 deletions src/Marten/Events/LinqExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using JasperFx.Core.Reflection;
using Marten.Linq.Members;
using Marten.Linq.Parsing;
using Marten.Linq.Parsing.Methods;
using Weasel.Postgresql.SqlGeneration;

namespace Marten.Events;

public static class LinqExtensions
{
/// <summary>
/// LINQ filter to select only a specified set of event types
/// </summary>
/// <param name="e"></param>
/// <param name="types"></param>
/// <returns></returns>
public static bool EventTypesAre(this IEvent e, params Type[] types)
{
return e.Data.GetType().IsOneOf(types);
}
}

internal class EventTypesAreParser: IMethodCallParser
{
public bool Matches(MethodCallExpression expression)
{
return expression.Method.Name == nameof(LinqExtensions.EventTypesAre) && expression.Method.DeclaringType == typeof(LinqExtensions);
}

public ISqlFragment Parse(IQueryableMemberCollection memberCollection, IReadOnlyStoreOptions options,
MethodCallExpression expression)
{
var types = (Type[])expression.Arguments.Last().Value();
var typeNames = types.Select(x => options.Events.As<EventGraph>().EventMappingFor(x).EventTypeName).ToArray();

var queryableMember = memberCollection.MemberFor(nameof(IEvent.EventTypeName));
return new IsOneOfFilter(queryableMember, new CommandParameter(typeNames));
}
}
14 changes: 12 additions & 2 deletions src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ public void Add(
{
if (lifecycle == ProjectionLifecycle.Live)
{
throw new ArgumentOutOfRangeException(nameof(lifecycle),
$"{nameof(ProjectionLifecycle.Live)} cannot be used for IProjection");
if (!projection.GetType().Closes(typeof(ILiveAggregator<>)))
{
throw new ArgumentOutOfRangeException(nameof(lifecycle),
$"{nameof(ProjectionLifecycle.Live)} cannot be used for IProjection");
}
}

if (projection is ProjectionBase p)
Expand Down Expand Up @@ -417,6 +420,13 @@ internal ILiveAggregator<T> AggregatorFor<T>() where T : class
return (ILiveAggregator<T>)aggregator;
}

aggregator = All.OfType<ILiveAggregator<T>>().FirstOrDefault();
if (aggregator != null)
{
_liveAggregators = _liveAggregators.AddOrUpdate(typeof(T), aggregator);
return (ILiveAggregator<T>)aggregator;
}

var source = tryFindProjectionSourceForAggregateType<T>();
source.AssembleAndAssertValidity();

Expand Down
Loading
Loading