diff --git a/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs b/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs index e994dfcb09..46582cecc1 100644 --- a/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs +++ b/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs @@ -19,15 +19,18 @@ namespace EventSourcingTests.Aggregation; public class CustomProjectionTests { [Theory] - [InlineData(true, EventAppendMode.Quick, true)] - [InlineData(true, EventAppendMode.Rich, false)] - [InlineData(false, EventAppendMode.Rich, false)] - [InlineData(false, EventAppendMode.Quick, false)] - public void configure_mapping(bool isSingleGrouper, EventAppendMode appendMode, bool useVersionFromStream) + [InlineData(true, EventAppendMode.Quick, ProjectionLifecycle.Inline, true)] + [InlineData(true, EventAppendMode.Quick, ProjectionLifecycle.Async, false)] + [InlineData(true, EventAppendMode.Quick, ProjectionLifecycle.Live, false)] + [InlineData(true, EventAppendMode.Rich, ProjectionLifecycle.Inline, false)] + [InlineData(false, EventAppendMode.Rich, ProjectionLifecycle.Inline, false)] + [InlineData(false, EventAppendMode.Quick, ProjectionLifecycle.Inline, false)] + public void configure_mapping(bool isSingleGrouper, EventAppendMode appendMode, ProjectionLifecycle lifecycle, bool useVersionFromStream) { var projection = isSingleGrouper ? (IAggregateProjection)new MySingleStreamProjection() : new MyCustomProjection(); var mapping = DocumentMapping.For(); mapping.StoreOptions.Events.AppendMode = appendMode; + projection.Lifecycle = lifecycle; projection.ConfigureAggregateMapping(mapping, mapping.StoreOptions); diff --git a/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs b/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs index 7859c5cdd9..415797cd6e 100644 --- a/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs +++ b/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs @@ -1,6 +1,7 @@ using Marten; using Marten.Events; using Marten.Events.Aggregation; +using Marten.Events.Projections; using Marten.Schema; using Marten.Testing.Documents; using Shouldly; @@ -17,6 +18,7 @@ public void set_mapping_to_UseVersionFromMatchingStream_when_quick_append() var mapping = DocumentMapping.For(); mapping.StoreOptions.EventGraph.AppendMode = EventAppendMode.Quick; + projection.Lifecycle = ProjectionLifecycle.Inline; projection.ConfigureAggregateMapping(mapping, mapping.StoreOptions); diff --git a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs index bb130e3dab..891d858dad 100644 --- a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs +++ b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs @@ -430,14 +430,14 @@ public void capture_events_to_an_existing_stream_and_fetch_the_events_back_in_an } [Fact] - public void assert_on_max_event_id_on_event_stream_append() + public async Task assert_on_max_event_id_on_event_stream_append() { var id = "Sixteen"; var started = new QuestStarted(); using var session = theStore.LightweightSession(); session.Events.StartStream(id, started); - session.SaveChanges(); + await session.SaveChangesAsync(); var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; var departed = new MembersDeparted { Members = new[] { "Thom" } }; @@ -446,6 +446,6 @@ public void assert_on_max_event_id_on_event_stream_append() // would be 3 after the append operation. session.Events.Append(id, 3, joined, departed); - session.SaveChanges(); + await session.SaveChangesAsync(); } } diff --git a/src/Marten/Events/Aggregation/CustomProjection.cs b/src/Marten/Events/Aggregation/CustomProjection.cs index 09642eee34..8b2288250a 100644 --- a/src/Marten/Events/Aggregation/CustomProjection.cs +++ b/src/Marten/Events/Aggregation/CustomProjection.cs @@ -270,7 +270,7 @@ public virtual TDoc ApplyMetadata(TDoc aggregate, IEvent lastEvent) public void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions storeOptions) { mapping.UseVersionFromMatchingStream = - storeOptions.Events.AppendMode == EventAppendMode.Quick && Slicer is ISingleStreamSlicer; + Lifecycle == ProjectionLifecycle.Inline && storeOptions.Events.AppendMode == EventAppendMode.Quick && Slicer is ISingleStreamSlicer; } } diff --git a/src/Marten/Events/Aggregation/SingleStreamProjection.cs b/src/Marten/Events/Aggregation/SingleStreamProjection.cs index 7013e4492f..ce9b189418 100644 --- a/src/Marten/Events/Aggregation/SingleStreamProjection.cs +++ b/src/Marten/Events/Aggregation/SingleStreamProjection.cs @@ -4,6 +4,7 @@ using System.ComponentModel; using JasperFx.CodeGeneration; using JasperFx.Core.Reflection; +using Marten.Events.Projections; using Marten.Schema; namespace Marten.Events.Aggregation; @@ -40,7 +41,7 @@ protected sealed override object buildEventSlicer(StoreOptions documentMapping) public override void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions storeOptions) { - mapping.UseVersionFromMatchingStream = storeOptions.Events.AppendMode == EventAppendMode.Quick; + mapping.UseVersionFromMatchingStream = Lifecycle == ProjectionLifecycle.Inline && storeOptions.Events.AppendMode == EventAppendMode.Quick; } protected sealed override Type baseTypeForAggregationRuntime() diff --git a/src/Marten/Events/Operations/AppendEventOperationBase.cs b/src/Marten/Events/Operations/AppendEventOperationBase.cs index d0cf9b7990..e6e9efbdf9 100644 --- a/src/Marten/Events/Operations/AppendEventOperationBase.cs +++ b/src/Marten/Events/Operations/AppendEventOperationBase.cs @@ -16,6 +16,11 @@ public AppendEventOperationBase(StreamAction stream, IEvent e) { Stream = stream; Event = e; + + if (e.Version == 0) + { + throw new ArgumentOutOfRangeException(nameof(e), "Version cannot be 0"); + } } public StreamAction Stream { get; } diff --git a/src/Marten/Events/StreamAction.cs b/src/Marten/Events/StreamAction.cs index 4e44f52be9..d651e18600 100644 --- a/src/Marten/Events/StreamAction.cs +++ b/src/Marten/Events/StreamAction.cs @@ -279,26 +279,29 @@ internal void PrepareEvents(long currentVersion, EventGraph graph, Queue s AggregateTypeName = graph.AggregateAliasFor(AggregateType); } - var i = currentVersion; - // Augment the events before checking expected versions, this allows the sequence/etc to properly be set on the resulting tombstone events - foreach (var @event in _events) + if (graph.AppendMode == EventAppendMode.Rich) { - @event.Version = ++i; - if (@event.Id == Guid.Empty) + applyRichMetadata(currentVersion, graph, sequences, session, timestamp); + } + else + { + applyQuickMetadata(graph, session, timestamp); + + if (ActionType == StreamActionType.Start) { - @event.Id = CombGuidIdGeneration.NewGuid(); + ExpectedVersionOnServer = 0; } - if (sequences.TryDequeue(out var sequence)) + // In this case, we "know" what the event versions should be, so just set them now + if (ExpectedVersionOnServer.HasValue) { - @event.Sequence = sequence; + var i = ExpectedVersionOnServer.Value; + foreach (var e in Events) + { + e.Version = ++i; + } } - - @event.TenantId = session.TenantId; - @event.Timestamp = timestamp; - - ProcessMetadata(@event, graph, session); } if (currentVersion != 0) @@ -328,6 +331,46 @@ internal void PrepareEvents(long currentVersion, EventGraph graph, Queue s Version = Events.Last().Version; } + private void applyQuickMetadata(EventGraph graph, IMartenSession session, DateTimeOffset timestamp) + { + foreach (var @event in _events) + { + if (@event.Id == Guid.Empty) + { + @event.Id = CombGuidIdGeneration.NewGuid(); + } + + @event.TenantId = session.TenantId; + @event.Timestamp = timestamp; + + ProcessMetadata(@event, graph, session); + } + } + + private void applyRichMetadata(long currentVersion, EventGraph graph, Queue sequences, IMartenSession session, + DateTimeOffset timestamp) + { + var i = currentVersion; + foreach (var @event in _events) + { + @event.Version = ++i; + if (@event.Id == Guid.Empty) + { + @event.Id = CombGuidIdGeneration.NewGuid(); + } + + if (sequences.TryDequeue(out var sequence)) + { + @event.Sequence = sequence; + } + + @event.TenantId = session.TenantId; + @event.Timestamp = timestamp; + + ProcessMetadata(@event, graph, session); + } + } + internal static StreamAction ForReference(Guid streamId, string tenantId) { return new StreamAction(streamId, StreamActionType.Append) { TenantId = tenantId }; diff --git a/src/Marten/Storage/UpsertFunction.cs b/src/Marten/Storage/UpsertFunction.cs index 65cf1827fd..40f5fb9999 100644 --- a/src/Marten/Storage/UpsertFunction.cs +++ b/src/Marten/Storage/UpsertFunction.cs @@ -24,6 +24,10 @@ internal class UpsertFunction: Function public readonly IList Arguments = new List(); protected readonly string _primaryKeyFields; + protected readonly string _versionSourceTable; + protected readonly string _versionColumnName; + protected readonly string _tenantVersionWhereClause; + protected readonly string _andTenantVersionWhereClause; public UpsertFunction(DocumentMapping mapping, DbObjectName identifier = null, bool disableConcurrency = false): base(identifier ?? mapping.UpsertFunction) @@ -82,11 +86,28 @@ public UpsertFunction(DocumentMapping mapping, DbObjectName identifier = null, b Arguments.Add(new RevisionArgument()); } + if (_mapping.UseVersionFromMatchingStream) + { + _versionSourceTable = $"{_mapping.StoreOptions.Events.DatabaseSchemaName}.mt_streams"; + _versionColumnName = "version"; + } + else + { + _versionSourceTable = _tableName.QualifiedName; + _versionColumnName = "mt_version"; + } + if (mapping.TenancyStyle == TenancyStyle.Conjoined) { Arguments.Add(new TenantIdArgument()); _tenantWhereClause = $"{_tableName.QualifiedName}.{TenantIdColumn.Name} = {TenantIdArgument.ArgName}"; _andTenantWhereClause = $" and {_tenantWhereClause}"; + + if (_mapping.UseVersionFromMatchingStream) + { + _tenantVersionWhereClause = $"{_versionSourceTable}.{TenantIdColumn.Name} = {TenantIdArgument.ArgName}"; + _andTenantVersionWhereClause = $" and {_tenantVersionWhereClause}"; + } } _primaryKeyFields = table.Columns.Where(x => x.IsPrimaryKey).Select(x => x.Name).Join(", "); @@ -163,6 +184,10 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s string inserts, string valueList, string updates) { + var revisionModification = _mapping.UseVersionFromMatchingStream + ? "revision = current_version;" + : "revision = current_version + 1;"; + if (_mapping.Metadata.Revision.Enabled) { writer.WriteLine($@" @@ -175,9 +200,9 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s BEGIN if revision = 0 then - SELECT mt_version FROM {_tableName.QualifiedName} into current_version WHERE id = docId {_andTenantWhereClause}; + SELECT {_versionColumnName} into current_version FROM {_versionSourceTable} WHERE id = docId {_andTenantVersionWhereClause}; if current_version is not null then - revision = current_version + 1; + {revisionModification} else revision = 1; end if; @@ -187,7 +212,7 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s ON CONFLICT ({_primaryKeyFields}) DO UPDATE SET {updates}; - SELECT mt_version FROM {_tableName.QualifiedName} into final_version WHERE id = docId {_andTenantWhereClause}; + SELECT mt_version into final_version FROM {_tableName.QualifiedName} WHERE id = docId {_andTenantWhereClause}; RETURN final_version; END; $function$;