From 26d8211a7e78b82a25518f03f1ecb4e6bf3253a2 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 24 Sep 2024 14:00:38 -0500 Subject: [PATCH] Raising side effects even when a known delete event kicks in. Closes GH-3427 --- .../side_effects_in_aggregations.cs | 54 +++++++++++++++++++ .../Events/Aggregation/AggregationRuntime.cs | 3 ++ 2 files changed, 57 insertions(+) diff --git a/src/DaemonTests/side_effects_in_aggregations.cs b/src/DaemonTests/side_effects_in_aggregations.cs index 94a5a0bfce..6730729c45 100644 --- a/src/DaemonTests/side_effects_in_aggregations.cs +++ b/src/DaemonTests/side_effects_in_aggregations.cs @@ -83,6 +83,47 @@ public async Task add_events_single_stream_guid_identifier() state.Version.ShouldBe(6); } + [Fact] + public async Task calls_side_effects_when_there_is_a_delete_event() + { + var outbox = new RecordingMessageOutbox(); + + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Async); + opts.Logger(new TestOutputMartenLogger(_output)); + opts.Events.MessageOutbox = outbox; + }); + + await theStore.Advanced.Clean.DeleteAllDocumentsAsync(); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + + var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), + new AEvent(), new AEvent()); + await theSession.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + + theSession.Events.Append(streamId, new EEvent()); + await theSession.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + + // Should be deleted by now + var version1 = await theSession.LoadAsync(streamId); + version1.ShouldBeNull(); + + outbox + .Batches + .SelectMany(x => x.Messages) + .OfType().Single().Id.ShouldBe(streamId); + + } + [Fact] public async Task add_events_single_stream_guid_identifier_when_starting_a_stream() { @@ -317,6 +358,11 @@ public async Task publishing_messages_in_continuous_mode() public class Projection1: SingleStreamProjection { + public Projection1() + { + DeleteEvent(); + } + public void Apply(SideEffects1 aggregate, AEvent _) { aggregate.A++; @@ -334,7 +380,13 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven slice.AppendEvent(new BEvent()); } + if (slice.Events().OfType>().Any()) + { + slice.PublishMessage(new WasDeleted(slice.Events().First().StreamId)); + } + return new ValueTask(); + } } @@ -348,6 +400,8 @@ public class SideEffects1: IRevisioned public int Version { get; set; } } +public record WasDeleted(Guid Id); + public class Projection2: SingleStreamProjection { public void Apply(SideEffects2 aggregate, AEvent _) diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index 175430423b..72c6b58cb5 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -66,6 +66,9 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, if (Projection.MatchesAnyDeleteType(slice)) { var operation = Storage.DeleteForId(slice.Id, slice.Tenant.TenantId); + + await processPossibleSideEffects(session, slice).ConfigureAwait(false); + session.QueueOperation(operation); return; }