Skip to content

Commit

Permalink
Raising side effects even when a known delete event kicks in. Closes G…
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Sep 24, 2024
1 parent 7b095fb commit 26d8211
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
54 changes: 54 additions & 0 deletions src/DaemonTests/side_effects_in_aggregations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,47 @@ public async Task add_events_single_stream_guid_identifier()
state.Version.ShouldBe(6);
}

[Fact]
public async Task calls_side_effects_when_there_is_a_delete_event()
{
var outbox = new RecordingMessageOutbox();

StoreOptions(opts =>
{
opts.Projections.Add<Projection1>(ProjectionLifecycle.Async);
opts.Logger(new TestOutputMartenLogger(_output));
opts.Events.MessageOutbox = outbox;
});

await theStore.Advanced.Clean.DeleteAllDocumentsAsync();
await theStore.Advanced.Clean.DeleteAllEventDataAsync();

var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

var streamId = Guid.NewGuid();
theSession.Events.StartStream<SideEffects1>(streamId, new AEvent(),
new AEvent(), new AEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());

theSession.Events.Append(streamId, new EEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());

// Should be deleted by now
var version1 = await theSession.LoadAsync<SideEffects1>(streamId);
version1.ShouldBeNull();

outbox
.Batches
.SelectMany(x => x.Messages)
.OfType<WasDeleted>().Single().Id.ShouldBe(streamId);

}

[Fact]
public async Task add_events_single_stream_guid_identifier_when_starting_a_stream()
{
Expand Down Expand Up @@ -317,6 +358,11 @@ public async Task publishing_messages_in_continuous_mode()

public class Projection1: SingleStreamProjection<SideEffects1>
{
public Projection1()
{
DeleteEvent<EEvent>();
}

public void Apply(SideEffects1 aggregate, AEvent _)
{
aggregate.A++;
Expand All @@ -334,7 +380,13 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven
slice.AppendEvent(new BEvent());
}

if (slice.Events().OfType<IEvent<EEvent>>().Any())
{
slice.PublishMessage(new WasDeleted(slice.Events().First().StreamId));
}

return new ValueTask();

}
}

Expand All @@ -348,6 +400,8 @@ public class SideEffects1: IRevisioned
public int Version { get; set; }
}

public record WasDeleted(Guid Id);

public class Projection2: SingleStreamProjection<SideEffects2>
{
public void Apply(SideEffects2 aggregate, AEvent _)
Expand Down
3 changes: 3 additions & 0 deletions src/Marten/Events/Aggregation/AggregationRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
if (Projection.MatchesAnyDeleteType(slice))
{
var operation = Storage.DeleteForId(slice.Id, slice.Tenant.TenantId);

await processPossibleSideEffects(session, slice).ConfigureAwait(false);

session.QueueOperation(operation);
return;
}
Expand Down

0 comments on commit 26d8211

Please sign in to comment.