diff --git a/src/EventSourcingTests/Bugs/document_and_event_operations_within_page.cs b/src/EventSourcingTests/Bugs/document_and_event_operations_within_page.cs new file mode 100644 index 0000000000..c9d5527bff --- /dev/null +++ b/src/EventSourcingTests/Bugs/document_and_event_operations_within_page.cs @@ -0,0 +1,131 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using EventSourcingTests.Projections; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Storage; +using Marten.Testing.Harness; +using Xunit; + +namespace EventSourcingTests.Bugs; + +public class document_and_event_operations_within_page : BugIntegrationContext +{ + [Fact] + public async Task failure_due_to_ordering_change() + { + theStore.Options.Events.MetadataConfig.HeadersEnabled = true; + theStore.Options.Events.TenancyStyle = TenancyStyle.Conjoined; + theStore.Options.Events.StreamIdentity = StreamIdentity.AsGuid; + + theStore.Options.Projections.Add(ProjectionLifecycle.Inline); + theStore.Options.Projections.Add(ProjectionLifecycle.Inline); + theStore.Options.Projections.Add(ProjectionLifecycle.Inline); + + using var session = theStore.LightweightSession("tenant"); + var mrCreated = new SamplesRolledUpCreated(Guid.NewGuid()); + session.Events.StartStream(mrCreated.Id, mrCreated); + await session.SaveChangesAsync(); + + for (var count = 1; count <= 2; count++) + { + var sampleAdded = new SampleAdded(mrCreated.Id, 23); + session.Events.Append(sampleAdded.Id, sampleAdded); + await session.SaveChangesAsync(); + } + + session.Events.Append(mrCreated.Id, new SamplesRolledUpPublished(mrCreated.Id)); + await session.SaveChangesAsync(); + } +} + + +public record SamplesRolledUp(Guid Id, List Samples, bool Published); + + +public record SamplesRolledUpCreated(Guid Id); + +public record SamplesRolledUpPublished(Guid Id); + +public class SamplesRolledUpProjection: SingleStreamProjection +{ + public SamplesRolledUpProjection() + { + CreateEvent(x => new SamplesRolledUp(x.Id, new List(), false)); + ProjectEvent((x, y) => + { + var existing = x.Samples; + existing.Add(y.Id); + return x with { Samples = existing }; + }); + ProjectEvent(x => x with { Published = true }); + } +} + + +public record SampleView(Guid Id, int Value, bool Published); + +public record SampleAdded(Guid Id, int Value); + +public class SampleProjection : MultiStreamProjection +{ + public SampleProjection() + { + CustomGrouping(new SampleGrouper()); + Identity(x => x.Id); + + CreateEvent(x=> new SampleView(x.Id, x.Value, false)); + + ProjectEvent(x=> x with { Published = true }); + + } +} + +public sealed class SampleGrouper: IAggregateGrouper +{ + public async Task Group(IQuerySession session, IEnumerable events, ITenantSliceGroup grouping) + { + var publishedEvents = events.OfType>().ToArray(); + + foreach (var published in publishedEvents) + { + var sample = + await session.Events.AggregateStreamAsync(published.Data.Id, published.Version); + + foreach (var sampleEvent in sample!.Samples) + { + grouping.AddEvents(sampleEvent, publishedEvents); + } + } + } +} + +public record SampleEventView(Guid Id); + +public class SampleEventProjection: EventProjection +{ + public SampleEventProjection() + { + Project(((added, operations) => + { + operations.Store(new SampleEventView(added.Id)); + })); + + ProjectAsync(((async (published, operations) => + { + var rolledUp = await operations.Events.AggregateStreamAsync(published.Id); + foreach (var rolledUpSample in rolledUp.Samples) + { + operations.Store(new SampleEventView(rolledUpSample)); + } + }))); + } +} + + + +