Skip to content

Commit

Permalink
Boom! Revisioning is working correctly with quick append event capture.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Jul 19, 2024
1 parent 70c352e commit 74174cb
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 26 deletions.
13 changes: 8 additions & 5 deletions src/EventSourcingTests/Aggregation/CustomProjectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ namespace EventSourcingTests.Aggregation;
public class CustomProjectionTests
{
[Theory]
[InlineData(true, EventAppendMode.Quick, true)]
[InlineData(true, EventAppendMode.Rich, false)]
[InlineData(false, EventAppendMode.Rich, false)]
[InlineData(false, EventAppendMode.Quick, false)]
public void configure_mapping(bool isSingleGrouper, EventAppendMode appendMode, bool useVersionFromStream)
[InlineData(true, EventAppendMode.Quick, ProjectionLifecycle.Inline, true)]
[InlineData(true, EventAppendMode.Quick, ProjectionLifecycle.Async, false)]
[InlineData(true, EventAppendMode.Quick, ProjectionLifecycle.Live, false)]
[InlineData(true, EventAppendMode.Rich, ProjectionLifecycle.Inline, false)]
[InlineData(false, EventAppendMode.Rich, ProjectionLifecycle.Inline, false)]
[InlineData(false, EventAppendMode.Quick, ProjectionLifecycle.Inline, false)]
public void configure_mapping(bool isSingleGrouper, EventAppendMode appendMode, ProjectionLifecycle lifecycle, bool useVersionFromStream)
{
var projection = isSingleGrouper ? (IAggregateProjection)new MySingleStreamProjection() : new MyCustomProjection();
var mapping = DocumentMapping.For<MyAggregate>();
mapping.StoreOptions.Events.AppendMode = appendMode;
projection.Lifecycle = lifecycle;

projection.ConfigureAggregateMapping(mapping, mapping.StoreOptions);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Schema;
using Marten.Testing.Documents;
using Shouldly;
Expand All @@ -17,6 +18,7 @@ public void set_mapping_to_UseVersionFromMatchingStream_when_quick_append()
var mapping = DocumentMapping.For<User>();

mapping.StoreOptions.EventGraph.AppendMode = EventAppendMode.Quick;
projection.Lifecycle = ProjectionLifecycle.Inline;

projection.ConfigureAggregateMapping(mapping, mapping.StoreOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,14 @@ public void capture_events_to_an_existing_stream_and_fetch_the_events_back_in_an
}

[Fact]
public void assert_on_max_event_id_on_event_stream_append()
public async Task assert_on_max_event_id_on_event_stream_append()
{
var id = "Sixteen";
var started = new QuestStarted();

using var session = theStore.LightweightSession();
session.Events.StartStream<Quest>(id, started);
session.SaveChanges();
await session.SaveChangesAsync();

var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } };
var departed = new MembersDeparted { Members = new[] { "Thom" } };
Expand All @@ -446,6 +446,6 @@ public void assert_on_max_event_id_on_event_stream_append()
// would be 3 after the append operation.
session.Events.Append(id, 3, joined, departed);

session.SaveChanges();
await session.SaveChangesAsync();
}
}
2 changes: 1 addition & 1 deletion src/Marten/Events/Aggregation/CustomProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public virtual TDoc ApplyMetadata(TDoc aggregate, IEvent lastEvent)
public void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions storeOptions)
{
mapping.UseVersionFromMatchingStream =
storeOptions.Events.AppendMode == EventAppendMode.Quick && Slicer is ISingleStreamSlicer;
Lifecycle == ProjectionLifecycle.Inline && storeOptions.Events.AppendMode == EventAppendMode.Quick && Slicer is ISingleStreamSlicer;
}
}

3 changes: 2 additions & 1 deletion src/Marten/Events/Aggregation/SingleStreamProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.ComponentModel;
using JasperFx.CodeGeneration;
using JasperFx.Core.Reflection;
using Marten.Events.Projections;
using Marten.Schema;

namespace Marten.Events.Aggregation;
Expand Down Expand Up @@ -40,7 +41,7 @@ protected sealed override object buildEventSlicer(StoreOptions documentMapping)

public override void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions storeOptions)
{
mapping.UseVersionFromMatchingStream = storeOptions.Events.AppendMode == EventAppendMode.Quick;
mapping.UseVersionFromMatchingStream = Lifecycle == ProjectionLifecycle.Inline && storeOptions.Events.AppendMode == EventAppendMode.Quick;
}

protected sealed override Type baseTypeForAggregationRuntime()
Expand Down
5 changes: 5 additions & 0 deletions src/Marten/Events/Operations/AppendEventOperationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public AppendEventOperationBase(StreamAction stream, IEvent e)
{
Stream = stream;
Event = e;

if (e.Version == 0)
{
throw new ArgumentOutOfRangeException(nameof(e), "Version cannot be 0");
}
}

public StreamAction Stream { get; }
Expand Down
69 changes: 56 additions & 13 deletions src/Marten/Events/StreamAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,26 +279,29 @@ internal void PrepareEvents(long currentVersion, EventGraph graph, Queue<long> s
AggregateTypeName = graph.AggregateAliasFor(AggregateType);
}

var i = currentVersion;

// Augment the events before checking expected versions, this allows the sequence/etc to properly be set on the resulting tombstone events
foreach (var @event in _events)
if (graph.AppendMode == EventAppendMode.Rich)
{
@event.Version = ++i;
if (@event.Id == Guid.Empty)
applyRichMetadata(currentVersion, graph, sequences, session, timestamp);
}
else
{
applyQuickMetadata(graph, session, timestamp);

if (ActionType == StreamActionType.Start)
{
@event.Id = CombGuidIdGeneration.NewGuid();
ExpectedVersionOnServer = 0;
}

if (sequences.TryDequeue(out var sequence))
// In this case, we "know" what the event versions should be, so just set them now
if (ExpectedVersionOnServer.HasValue)
{
@event.Sequence = sequence;
var i = ExpectedVersionOnServer.Value;
foreach (var e in Events)
{
e.Version = ++i;
}
}

@event.TenantId = session.TenantId;
@event.Timestamp = timestamp;

ProcessMetadata(@event, graph, session);
}

if (currentVersion != 0)
Expand Down Expand Up @@ -328,6 +331,46 @@ internal void PrepareEvents(long currentVersion, EventGraph graph, Queue<long> s
Version = Events.Last().Version;
}

private void applyQuickMetadata(EventGraph graph, IMartenSession session, DateTimeOffset timestamp)
{
foreach (var @event in _events)
{
if (@event.Id == Guid.Empty)
{
@event.Id = CombGuidIdGeneration.NewGuid();
}

@event.TenantId = session.TenantId;
@event.Timestamp = timestamp;

ProcessMetadata(@event, graph, session);
}
}

private void applyRichMetadata(long currentVersion, EventGraph graph, Queue<long> sequences, IMartenSession session,
DateTimeOffset timestamp)
{
var i = currentVersion;
foreach (var @event in _events)
{
@event.Version = ++i;
if (@event.Id == Guid.Empty)
{
@event.Id = CombGuidIdGeneration.NewGuid();
}

if (sequences.TryDequeue(out var sequence))
{
@event.Sequence = sequence;
}

@event.TenantId = session.TenantId;
@event.Timestamp = timestamp;

ProcessMetadata(@event, graph, session);
}
}

internal static StreamAction ForReference(Guid streamId, string tenantId)
{
return new StreamAction(streamId, StreamActionType.Append) { TenantId = tenantId };
Expand Down
31 changes: 28 additions & 3 deletions src/Marten/Storage/UpsertFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ internal class UpsertFunction: Function

public readonly IList<UpsertArgument> Arguments = new List<UpsertArgument>();
protected readonly string _primaryKeyFields;
protected readonly string _versionSourceTable;
protected readonly string _versionColumnName;
protected readonly string _tenantVersionWhereClause;
protected readonly string _andTenantVersionWhereClause;

public UpsertFunction(DocumentMapping mapping, DbObjectName identifier = null, bool disableConcurrency = false):
base(identifier ?? mapping.UpsertFunction)
Expand Down Expand Up @@ -82,11 +86,28 @@ public UpsertFunction(DocumentMapping mapping, DbObjectName identifier = null, b
Arguments.Add(new RevisionArgument());
}

if (_mapping.UseVersionFromMatchingStream)
{
_versionSourceTable = $"{_mapping.StoreOptions.Events.DatabaseSchemaName}.mt_streams";
_versionColumnName = "version";
}
else
{
_versionSourceTable = _tableName.QualifiedName;
_versionColumnName = "mt_version";
}

if (mapping.TenancyStyle == TenancyStyle.Conjoined)
{
Arguments.Add(new TenantIdArgument());
_tenantWhereClause = $"{_tableName.QualifiedName}.{TenantIdColumn.Name} = {TenantIdArgument.ArgName}";
_andTenantWhereClause = $" and {_tenantWhereClause}";

if (_mapping.UseVersionFromMatchingStream)
{
_tenantVersionWhereClause = $"{_versionSourceTable}.{TenantIdColumn.Name} = {TenantIdArgument.ArgName}";
_andTenantVersionWhereClause = $" and {_tenantVersionWhereClause}";
}
}

_primaryKeyFields = table.Columns.Where(x => x.IsPrimaryKey).Select(x => x.Name).Join(", ");
Expand Down Expand Up @@ -163,6 +184,10 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s
string inserts,
string valueList, string updates)
{
var revisionModification = _mapping.UseVersionFromMatchingStream
? "revision = current_version;"
: "revision = current_version + 1;";

if (_mapping.Metadata.Revision.Enabled)
{
writer.WriteLine($@"
Expand All @@ -175,9 +200,9 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s
BEGIN
if revision = 0 then
SELECT mt_version FROM {_tableName.QualifiedName} into current_version WHERE id = docId {_andTenantWhereClause};
SELECT {_versionColumnName} into current_version FROM {_versionSourceTable} WHERE id = docId {_andTenantVersionWhereClause};
if current_version is not null then
revision = current_version + 1;
{revisionModification}
else
revision = 1;
end if;
Expand All @@ -187,7 +212,7 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s
ON CONFLICT ({_primaryKeyFields})
DO UPDATE SET {updates};
SELECT mt_version FROM {_tableName.QualifiedName} into final_version WHERE id = docId {_andTenantWhereClause};
SELECT mt_version into final_version FROM {_tableName.QualifiedName} WHERE id = docId {_andTenantWhereClause};
RETURN final_version;
END;
$function$;
Expand Down

0 comments on commit 74174cb

Please sign in to comment.