diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index 6d6c2e9307..10f1926305 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -67,7 +67,8 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, } var aggregate = slice.Aggregate; - if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline) + // do not load if sliced by stream and the stream does not yet exist + if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline && (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start)) { aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false); } diff --git a/src/Marten/Events/Aggregation/ByStreamId.cs b/src/Marten/Events/Aggregation/ByStreamId.cs index 2c96aafa5f..92884e7720 100644 --- a/src/Marten/Events/Aggregation/ByStreamId.cs +++ b/src/Marten/Events/Aggregation/ByStreamId.cs @@ -22,7 +22,7 @@ public ValueTask>> SliceInlineActions(IQuer return new ValueTask>>(streams.Select(s => { var tenant = new Tenant(s.TenantId, querySession.Database); - return new EventSlice(s.Id, tenant, s.Events); + return new EventSlice(s.Id, tenant, s.Events){ActionType = s.ActionType}; }).ToList()); } diff --git a/src/Marten/Events/Aggregation/ByStreamKey.cs b/src/Marten/Events/Aggregation/ByStreamKey.cs index e1a15abb6b..4136f7f768 100644 --- a/src/Marten/Events/Aggregation/ByStreamKey.cs +++ b/src/Marten/Events/Aggregation/ByStreamKey.cs @@ -19,7 +19,7 @@ public ValueTask>> SliceInlineActions(IQu return new ValueTask>>(streams.Select(s => { var tenant = new Tenant(s.TenantId, querySession.Database); - return new EventSlice(s.Key!, tenant, s.Events); + return new EventSlice(s.Key!, tenant, s.Events){ActionType = s.ActionType}; }).ToList()); } diff --git a/src/Marten/Events/Aggregation/CustomProjection.cs b/src/Marten/Events/Aggregation/CustomProjection.cs index 1b81e2cebf..d65dc839ac 100644 --- a/src/Marten/Events/Aggregation/CustomProjection.cs +++ b/src/Marten/Events/Aggregation/CustomProjection.cs @@ -1,9 +1,7 @@ using System; using System.Collections.Generic; -using System.ComponentModel; using System.Threading; using System.Threading.Tasks; -using JasperFx.CodeGeneration; using JasperFx.Core.Reflection; using Marten.Events.Daemon; using Marten.Events.Daemon.Internals; @@ -64,7 +62,11 @@ async Task IProjection.ApplyAsync(IDocumentOperations operations, IReadOnlyList< { var tenantedSession = martenSession.UseTenancyBasedOnSliceAndStorage(storage, slice); - slice.Aggregate = await storage.LoadAsync(slice.Id, tenantedSession, cancellation).ConfigureAwait(false); + // do not load if sliced by stream and the stream does not yet exist + if (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start) + { + slice.Aggregate = await storage.LoadAsync(slice.Id, tenantedSession, cancellation).ConfigureAwait(false); + } await ApplyChangesAsync(tenantedSession, slice, cancellation).ConfigureAwait(false); } } diff --git a/src/Marten/Events/Aggregation/EventSlice.cs b/src/Marten/Events/Aggregation/EventSlice.cs index 5e62c3b5df..25a24f94d2 100644 --- a/src/Marten/Events/Aggregation/EventSlice.cs +++ b/src/Marten/Events/Aggregation/EventSlice.cs @@ -38,12 +38,22 @@ public EventSlice(TId id, IQuerySession querySession, IEnumerable? event { } + private readonly StreamActionType? _actionType; + /// /// Is this action the start of a new stream or appending /// to an existing stream? /// - public StreamActionType ActionType => _events[0].Version == 1 ? StreamActionType.Start : StreamActionType.Append; - + /// + /// Default's to determining from the version of the first event on + /// stream, but can be overridden so that the value works with + /// QuickAppend + /// + public StreamActionType ActionType + { + get => _actionType ?? (_events[0].Version == 1 ? StreamActionType.Start : StreamActionType.Append); + init => _actionType = value; + } /// /// The aggregate identity