From 9ad868095b6e69e66df58bf0ab0790ee64e6c1b4 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 12 Dec 2024 11:55:59 -0600 Subject: [PATCH] New system "Archived" event for easier usage of event archiving. Closes GH-3578 --- docs/diagnostics.md | 2 +- docs/events/archiving.md | 133 +++++++++++++++++- docs/events/projections/read-aggregates.md | 36 ++--- docs/scenarios/command_handler_workflow.md | 12 +- .../Examples/OptimizedCommandHandling.cs | 29 +++- src/EventSourcingTests/archiving_events.cs | 80 +++++++++++ .../Events/Aggregation/CustomProjection.cs | 5 + src/Marten/Events/Archived.cs | 4 + .../Events/Daemon/AsyncProjectionShard.cs | 4 +- .../Events/Projections/ProjectionOptions.cs | 16 ++- 10 files changed, 289 insertions(+), 32 deletions(-) diff --git a/docs/diagnostics.md b/docs/diagnostics.md index ca940419c2..abab9fc68d 100644 --- a/docs/diagnostics.md +++ b/docs/diagnostics.md @@ -539,7 +539,7 @@ The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSessi // session to pipe Marten logging to the xUnit.Net output theSession.Logger = new TestOutputMartenLogger(_output); ``` -snippet source | anchor +snippet source | anchor ## Previewing the PostgreSQL Query Plan diff --git a/docs/events/archiving.md b/docs/events/archiving.md index 55b729a81a..343fd93a9e 100644 --- a/docs/events/archiving.md +++ b/docs/events/archiving.md @@ -31,7 +31,7 @@ public async Task SampleArchive(IDocumentSession session, string streamId) await session.SaveChangesAsync(); } ``` -snippet source | anchor +snippet source | anchor As in all cases with an `IDocumentSession`, you need to call `SaveChanges()` to commit the @@ -55,7 +55,7 @@ var events = await theSession.Events .Where(x => x.IsArchived) .ToListAsync(); ``` -snippet source | anchor +snippet source | anchor You can also query for all events both archived and not archived with `MaybeArchived()` @@ -67,7 +67,7 @@ like so: var events = await theSession.Events.QueryAllRawEvents() .Where(x => x.MaybeArchived()).ToListAsync(); ``` -snippet source | anchor +snippet source | anchor ## Hot/Cold Storage Partitioning @@ -112,4 +112,131 @@ table, copy all the existing data from the temp table to the new partitioned tab ## Archived Event +Marten has a built in event named `Archived` that can be appended to any event stream: + + +```cs +namespace Marten.Events; + +/// +/// The presence of this event marks a stream as "archived" when it is processed +/// by a single stream projection of any sort +/// +public record Archived(string Reason); +``` +snippet source | anchor + + +When this event is appended to an event stream *and* that event is processed through any type of single stream projection +for that event stream (snapshot or what we used to call a "self-aggregate", `SingleStreamProjection`, or `CustomProjection` with the `AggregateByStream` option), +Marten will automatically mark that entire event stream as archived as part of processing the projection. This applies for +both `Inline` and `Async` execution of projections. + +Let's try to make this concrete by building a simple order processing system that might include this +aggregate: + + + +```cs +public class Item +{ + public string Name { get; set; } + public bool Ready { get; set; } +} + +public class Order +{ + // This would be the stream id + public Guid Id { get; set; } + + // This is important, by Marten convention this would + // be the + public int Version { get; set; } + + public Order(OrderCreated created) + { + foreach (var item in created.Items) + { + Items[item.Name] = item; + } + } + + public void Apply(IEvent shipped) => Shipped = shipped.Timestamp; + public void Apply(ItemReady ready) => Items[ready.Name].Ready = true; + + public DateTimeOffset? Shipped { get; private set; } + + public Dictionary Items { get; set; } = new(); + + public bool IsReadyToShip() + { + return Shipped == null && Items.Values.All(x => x.Ready); + } +} +``` +snippet source | anchor + + +Next, let's say we're having the `Order` aggregate snapshotted so that it's updated every time new events +are captured like so: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection("some connection string"); + + // The Order aggregate is updated Inline inside the + // same transaction as the events being appended + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + + // Opt into an optimization for the inline aggregates + // used with FetchForWriting() + opts.Projections.UseIdentityMapForAggregates = true; +}) + +// This is also a performance optimization in Marten to disable the +// identity map tracking overall in Marten sessions if you don't +// need that tracking at runtime +.UseLightweightSessions(); +``` +snippet source | anchor + + +Now, let's say as a way to keep our application performing as well as possible, we'd like to be aggressive about archiving +shipped orders to keep the "hot" event storage table small. One way we can do that is to append the `Archived` event +as part of processing a command to ship an order like so: + + + +```cs +public static async Task HandleAsync(ShipOrder command, IDocumentSession session) +{ + var stream = await session.Events.FetchForWriting(command.OrderId); + var order = stream.Aggregate; + + if (!order.Shipped.HasValue) + { + // Mark it as shipped + stream.AppendOne(new OrderShipped()); + + // But also, the order is done, so let's mark it as archived too! + stream.AppendOne(new Archived("Shipped")); + + await session.SaveChangesAsync(); + } +} +``` +snippet source | anchor + + +If an `Order` hasn't already shipped, one of the outcomes of that command handler executing is that the entire event stream +for the `Order` will be marked as archived. + +::: info +This was originally conceived as a way to improve the Wolverine aggregate handler workflow usability while also encouraging +Marten users to take advantage of the event archiving feature. +::: diff --git a/docs/events/projections/read-aggregates.md b/docs/events/projections/read-aggregates.md index 339d71126c..4a18b24365 100644 --- a/docs/events/projections/read-aggregates.md +++ b/docs/events/projections/read-aggregates.md @@ -71,7 +71,7 @@ builder.Services.AddMarten(opts => opts.Projections.LiveStreamAggregation(); }); ``` -snippet source | anchor +snippet source | anchor Then we could use the `AggregateStreamAsync` API to read the current `Invoice` state for any @@ -88,7 +88,7 @@ public static async Task read_live_invoice( .Events.AggregateStreamAsync(invoiceId); } ``` -snippet source | anchor +snippet source | anchor ::: info @@ -110,7 +110,7 @@ builder.Services.AddMarten(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); ``` -snippet source | anchor +snippet source | anchor Then we can just treat the `Invoice` as any old Marten document (because it is) and use @@ -127,7 +127,7 @@ public static async Task read_inline_invoice( .LoadAsync(invoiceId); } ``` -snippet source | anchor +snippet source | anchor And lastly, if we wanted to run the `Invoice` snapshot updates as an asynchronous projection (maybe to take advantage @@ -145,7 +145,7 @@ builder.Services.AddMarten(opts => }) .AddAsyncDaemon(DaemonMode.HotCold); ``` -snippet source | anchor +snippet source | anchor We would still just the same `LoadAsync()` API, but you just hope that @@ -166,7 +166,7 @@ builder.Services.AddMarten(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); ``` -snippet source | anchor +snippet source | anchor ## FetchLatest @@ -196,7 +196,7 @@ public static async Task read_latest( .Events.FetchLatest(invoiceId); } ``` -snippet source | anchor +snippet source | anchor Just to understand how this API works, under the covers, if `Invoice` is registered as: @@ -225,7 +225,7 @@ builder.Services.AddMarten(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); ``` -snippet source | anchor +snippet source | anchor ::: warning @@ -247,10 +247,10 @@ fun, let's say we wrote a helper function like this: ```cs public static class MutationExtensions { - public static async Task Mutate(this IDocumentSession session, Guid id, Func decider, - CancellationToken token = default) where T : class + public static async Task MutateInvoice(this IDocumentSession session, Guid id, Func> decider, + CancellationToken token = default) { - var stream = await session.Events.FetchForWriting(id, token); + var stream = await session.Events.FetchForWriting(id, token); // Decide what new events should be appended based on the current // state of the aggregate and application logic @@ -260,11 +260,11 @@ public static class MutationExtensions // Persist any new events await session.SaveChangesAsync(token); - return await session.Events.FetchLatest(id, token); + return await session.Events.FetchLatest(id, token); } } ``` -snippet source | anchor +snippet source | anchor And used it for a command handler something like this: @@ -272,20 +272,20 @@ And used it for a command handler something like this: ```cs -public static Task Approve(IDocumentSession session, Guid invoiceId) +public static Task Approve(IDocumentSession session, Guid invoiceId) { - return session.Mutate(invoiceId, invoice => + return session.MutateInvoice(invoiceId, invoice => { if (invoice.Status != InvoiceStatus.Approved) { - return new object[] { new InvoiceApproved() }; + return [new InvoiceApproved()]; } - return Array.Empty(); + return []; }); } ``` -snippet source | anchor +snippet source | anchor Okay, so for some context, if using the full fledged `UseIdentityMapForAggregates` + `FetchForWriting`, then `FetchLatest` diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md index 3f5db3896d..3b23d9965f 100644 --- a/docs/scenarios/command_handler_workflow.md +++ b/docs/scenarios/command_handler_workflow.md @@ -127,7 +127,7 @@ public async Task Handle1(MarkItemReady command, IDocumentSession session) await session.SaveChangesAsync(); } ``` -snippet source | anchor +snippet source | anchor In this usage, `FetchForWriting()` is finding the current state of the stream based on the stream id we passed in. If the `Order` aggregate @@ -219,7 +219,7 @@ public async Task Handle2(MarkItemReady command, IDocumentSession session) await session.SaveChangesAsync(); } ``` -snippet source | anchor +snippet source | anchor In this case, Marten will throw a `ConcurrencyException` if the expected starting version being passed to `FetchForWriting()` has @@ -265,7 +265,7 @@ public async Task Handle3(MarkItemReady command, IDocumentSession session) await session.SaveChangesAsync(); } ``` -snippet source | anchor +snippet source | anchor Do note that the `FetchForExclusiveWriting()` command can time out if it is unable to achieve a lock in a timely manner. In this case, Marten will throw a `StreamLockedException`. The lock will be released when either `IDocumentSession.SaveChangesAsync()` is called or the `IDocumentSession` is disposed. @@ -303,7 +303,7 @@ public Task Handle4(MarkItemReady command, IDocumentSession session) }); } ``` -snippet source | anchor +snippet source | anchor ## Optimizing FetchForWriting with Inline Aggregates @@ -324,7 +324,7 @@ builder.Services.AddMarten(opts => // Opt into an optimization for the inline aggregates // used with FetchForWriting() - opts.Projections.UseIdentityMapForInlineAggregates = true; + opts.Projections.UseIdentityMapForAggregates = true; }) // This is also a performance optimization in Marten to disable the @@ -332,7 +332,7 @@ builder.Services.AddMarten(opts => // need that tracking at runtime .UseLightweightSessions(); ``` -snippet source | anchor +snippet source | anchor You can potentially gain some significant performance optimization by using the `UseIdentityMapForInlineAggregates` flag shown above. To be clear, this optimization mostly helps when you have the combination in a command handler that: diff --git a/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs b/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs index fcf396e837..37fa923234 100644 --- a/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs +++ b/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs @@ -60,6 +60,8 @@ public bool IsReadyToShip() #endregion + + public record MarkItemReady(Guid OrderId, string ItemName, int Version); public class ShipOrderHandler @@ -217,7 +219,7 @@ public static async Task bootstrap() // Opt into an optimization for the inline aggregates // used with FetchForWriting() - opts.Projections.UseIdentityMapForInlineAggregates = true; + opts.Projections.UseIdentityMapForAggregates = true; }) // This is also a performance optimization in Marten to disable the @@ -227,4 +229,29 @@ public static async Task bootstrap() #endregion } + + #region sample_handling_shiporder_and_emitting_archived_event + + public static async Task HandleAsync(ShipOrder command, IDocumentSession session) + { + var stream = await session.Events.FetchForWriting(command.OrderId); + var order = stream.Aggregate; + + if (!order.Shipped.HasValue) + { + // Mark it as shipped + stream.AppendOne(new OrderShipped()); + + // But also, the order is done, so let's mark it as archived too! + stream.AppendOne(new Archived("Shipped")); + + await session.SaveChangesAsync(); + } + } + + #endregion } + +public record ShipOrder(Guid OrderId); + + diff --git a/src/EventSourcingTests/archiving_events.cs b/src/EventSourcingTests/archiving_events.cs index 6eba5f7572..3952d83b37 100644 --- a/src/EventSourcingTests/archiving_events.cs +++ b/src/EventSourcingTests/archiving_events.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using EventSourcingTests.Aggregation; using EventSourcingTests.FetchForWriting; +using JasperFx.Core; using Marten; using Marten.Events; using Marten.Events.Aggregation; @@ -386,6 +388,55 @@ public async Task capture_archived_event_with_inline_projection_will_archive_the } + [Fact] + public async Task capture_archived_event_with_inline_custom_projection_will_archive_the_stream() + { + StoreOptions(opts => + { + opts.Projections.Add(new SimpleAggregateProjection2(), ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + theSession.Events.Append(streamId, new DEvent(), new Archived("Complete")); + await theSession.SaveChangesAsync(); + + // All the events should be archived + var events = await theSession.Events.QueryAllRawEvents() + .Where(x => x.MaybeArchived() && x.StreamId == streamId).ToListAsync(); + + events.All(x => x.IsArchived).ShouldBeTrue(); + } + + + [Fact] + public async Task capture_archived_event_with_async_projection_will_archive_the_stream() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + theSession.Events.Append(streamId, new DEvent(), new Archived("Complete")); + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(5.Seconds()); + + // All the events should be archived + var events = await theSession.Events.QueryAllRawEvents() + .Where(x => x.MaybeArchived() && x.StreamId == streamId).ToListAsync(); + + events.All(x => x.IsArchived).ShouldBeTrue(); + } + + [Fact] public async Task capture_archived_event_with_inline_projection_will_archive_the_stream_string_identified() { @@ -514,6 +565,35 @@ public SimpleAggregateProjection() public bool ShouldDelete(MaybeDeleted e) => e.ShouldDelete; } +public class SimpleAggregateProjection2: CustomProjection +{ + public SimpleAggregateProjection2() + { + AggregateByStream(); + } + + public override SimpleAggregate Apply(SimpleAggregate snapshot, IReadOnlyList events) + { + snapshot ??= new SimpleAggregate(); + + foreach (var @event in events) + { + switch (@event.Data) + { + case AEvent _: + snapshot.ACount++; + break; + + case BEvent _: + snapshot.BCount++; + break; + } + } + + return snapshot; + } +} + public record Deleted; public record MaybeDeleted(bool ShouldDelete); diff --git a/src/Marten/Events/Aggregation/CustomProjection.cs b/src/Marten/Events/Aggregation/CustomProjection.cs index 9d03f67eda..5434135d18 100644 --- a/src/Marten/Events/Aggregation/CustomProjection.cs +++ b/src/Marten/Events/Aggregation/CustomProjection.cs @@ -146,6 +146,11 @@ public virtual async ValueTask ApplyChangesAsync(DocumentSessionBase session, Ev slice.Aggregate = snapshot; session.Store(snapshot); + + if (Slicer is ISingleStreamSlicer singleStreamSlicer && slice.Events().OfType>().Any()) + { + singleStreamSlicer.ArchiveStream(session, slice.Id); + } } /// diff --git a/src/Marten/Events/Archived.cs b/src/Marten/Events/Archived.cs index d12c915898..85312902a7 100644 --- a/src/Marten/Events/Archived.cs +++ b/src/Marten/Events/Archived.cs @@ -1,3 +1,5 @@ +#region sample_Archived_event + namespace Marten.Events; /// @@ -5,3 +7,5 @@ namespace Marten.Events; /// by a single stream projection of any sort /// public record Archived(string Reason); + +#endregion diff --git a/src/Marten/Events/Daemon/AsyncProjectionShard.cs b/src/Marten/Events/Daemon/AsyncProjectionShard.cs index 441c646a90..7c71c059c5 100644 --- a/src/Marten/Events/Daemon/AsyncProjectionShard.cs +++ b/src/Marten/Events/Daemon/AsyncProjectionShard.cs @@ -60,7 +60,9 @@ public IEnumerable BuildFilters(DocumentStore store) { if (EventTypes.Any() && !EventTypes.Any(x => x.IsAbstract || x.IsInterface)) { - yield return new EventTypeFilter(store.Options.EventGraph, EventTypes); + // We want to explicitly add in the archived event + var allTypes = EventTypes.Concat([typeof(Archived)]).ToArray(); + yield return new EventTypeFilter(store.Options.EventGraph, allTypes); } if (StreamType != null) diff --git a/src/Marten/Events/Projections/ProjectionOptions.cs b/src/Marten/Events/Projections/ProjectionOptions.cs index 65151cbb22..71e6124638 100644 --- a/src/Marten/Events/Projections/ProjectionOptions.cs +++ b/src/Marten/Events/Projections/ProjectionOptions.cs @@ -93,10 +93,22 @@ internal IEnumerable allPlanners() /// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false. /// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten /// + [Obsolete("Prefer UseIdentityMapForAggregates")] public bool UseIdentityMapForInlineAggregates { - get => _options.Events.UseIdentityMapForInlineAggregates; - set => _options.Events.UseIdentityMapForInlineAggregates = value; + get => _options.Events.UseIdentityMapForAggregates; + set => _options.Events.UseIdentityMapForAggregates = value; + } + + /// + /// Opt into a performance optimization that directs Marten to always use the identity map for an + /// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false. + /// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten + /// + public bool UseIdentityMapForAggregates + { + get => _options.Events.UseIdentityMapForAggregates; + set => _options.Events.UseIdentityMapForAggregates = value; } internal bool HasAnyAsyncProjections()