Skip to content

Commit

Permalink
Ability to target IEvent<T> in multi-stream grouping when an event ma…
Browse files Browse the repository at this point in the history
…y apply to multiple aggregates. Closes GH-3424
  • Loading branch information
jeremydmiller committed Sep 24, 2024
1 parent 214fe47 commit 7b095fb
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Shouldly;
Expand All @@ -15,7 +16,9 @@ public class UserGroupsAssignmentProjection: MultiStreamProjection<UserGroupsAss
public UserGroupsAssignmentProjection()
{
Identity<UserRegistered>(x => x.UserId);
Identities<MultipleUsersAssignedToGroup>(x => x.UserIds);

// You can now use IEvent<T> as well as declaring this against the core event type
Identities<IEvent<MultipleUsersAssignedToGroup>>(x => x.Data.UserIds);
}

public void Apply(UserRegistered @event, UserGroupsAssignment view)
Expand Down
10 changes: 10 additions & 0 deletions src/Marten/Events/Aggregation/EventSlicer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ public EventSlicer<TDoc, TId> Identity<TEvent>(Func<TEvent, TId> identityFunc)

public EventSlicer<TDoc, TId> Identities<TEvent>(Func<TEvent, IReadOnlyList<TId>> identitiesFunc)
{
var eventType = typeof(TEvent);
// Check if we are actually dealing with an IEvent<EventType>
if (eventType.IsGenericType && eventType.GetGenericTypeDefinition() == typeof(IEvent<>))
{
var actualEventType = eventType.GetGenericArguments().First();
var eventGrouperType = typeof(MultiStreamGrouperWithMetadata<,>).MakeGenericType( typeof(TId), actualEventType);
_groupers.Add((IGrouper<TId>) Activator.CreateInstance(eventGrouperType, identitiesFunc));
return this;
}

_groupers.Add(new MultiStreamGrouper<TId, TEvent>(identitiesFunc));
return this;
}
Expand Down
8 changes: 8 additions & 0 deletions src/Marten/Events/Aggregation/IEventGrouping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,12 @@ public interface IEventGrouping<TId>
/// <typeparam name="TSource"></typeparam>
/// <typeparam name="TChild"></typeparam>
void FanOutOnEach<TSource, TChild>(Func<TSource, IEnumerable<TChild>> fanOutFunc);

/// <summary>
/// Add events to the grouping based on the outer IEvent<TEvent> envelope type
/// </summary>
/// <param name="singleIdSource"></param>
/// <param name="events"></param>
/// <typeparam name="TEvent"></typeparam>
void AddEventsWithMetadata<TEvent>(Func<IEvent<TEvent>, IEnumerable<TId>> multipleIdSource, IEnumerable<IEvent> events);
}
23 changes: 15 additions & 8 deletions src/Marten/Events/Aggregation/TenantSliceGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Marten.Events.Aggregation;

public interface ITenantSliceGroup<TId>: IEventGrouping<TId>, IDisposable
{

}

/// <summary>
Expand Down Expand Up @@ -50,12 +51,7 @@ public TenantSliceGroup(Tenant tenant, IEnumerable<EventSlice<TDoc, TId>> slices

public void AddEvents<TEvent>(Func<TEvent, TId> singleIdSource, IEnumerable<IEvent> events)
{
var matching = events.Where(x => x.Data is TEvent);
foreach (var @event in matching)
{
var id = singleIdSource((TEvent)@event.Data);
AddEvent(id, @event);
}
AddEventsWithMetadata<TEvent>(e => singleIdSource(e.Data), events);
}

/// <summary>
Expand All @@ -81,8 +77,19 @@ public void FanOutOnEach<TSource, TChild>(Func<TSource, IEnumerable<TChild>> fan

public void AddEvents<TEvent>(Func<TEvent, IEnumerable<TId>> multipleIdSource, IEnumerable<IEvent> events)
{
var matching = events.Where(x => x.Data is TEvent)
.SelectMany(@event => multipleIdSource(@event.Data.As<TEvent>()).Select(id => (id, @event)));
AddEventsWithMetadata<TEvent>(e => multipleIdSource(e.Data), events);
}

/// <summary>
/// Add events to the grouping based on the outer IEvent<TEvent> envelope type
/// </summary>
/// <param name="singleIdSource"></param>
/// <param name="events"></param>
/// <typeparam name="TEvent"></typeparam>
public void AddEventsWithMetadata<TEvent>(Func<IEvent<TEvent>, IEnumerable<TId>> multipleIdSource, IEnumerable<IEvent> events)
{
var matching = events.OfType<IEvent<TEvent>>()
.SelectMany(e => multipleIdSource(e).Select(id => (id, @event: e)));

var groups = matching.GroupBy(x => x.id);
foreach (var group in groups) AddEvents(group.Key, group.Select(x => x.@event));
Expand Down
21 changes: 21 additions & 0 deletions src/Marten/Events/Projections/MultiStreamGrouper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,24 @@ public void Apply(IEnumerable<IEvent> events, ITenantSliceGroup<TId> grouping)
grouping.AddEvents(_func, events);
}
}

/// <summary>
/// This type of grouper potentially sorts one event into multiple aggregates
/// </summary>
/// <typeparam name="TId"></typeparam>
/// <typeparam name="TEvent"></typeparam>
internal class MultiStreamGrouperWithMetadata<TId, TEvent>: IGrouper<TId>
{
private readonly Func<IEvent<TEvent>, IReadOnlyList<TId>> _func;

public MultiStreamGrouperWithMetadata(Func<IEvent<TEvent>, IReadOnlyList<TId>> expression)
{
_func = expression;
}

public void Apply(IEnumerable<IEvent> events, ITenantSliceGroup<TId> grouping)
{
grouping.AddEventsWithMetadata(_func, events);
}
}

0 comments on commit 7b095fb

Please sign in to comment.