Skip to content

Commit

Permalink
Fix for quick append + mandatory stream type behavior. Closes GH-3518
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Oct 29, 2024
1 parent dcf019f commit 2369d37
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
33 changes: 33 additions & 0 deletions src/EventSourcingTests/mandatory_stream_type_behavior.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using EventSourcingTests.FetchForWriting;
using JasperFx.Core;
using Marten;
using Marten.Events;
Expand Down Expand Up @@ -88,6 +89,38 @@ await Should.ThrowAsync<NonExistentStreamException>(async () =>
});
}

[Theory]
[InlineData(EventAppendMode.Rich, StreamIdentity.AsGuid)]
[InlineData(EventAppendMode.Rich, StreamIdentity.AsString)]
[InlineData(EventAppendMode.Quick, StreamIdentity.AsGuid)]
[InlineData(EventAppendMode.Quick, StreamIdentity.AsString)]
public async Task happy_path_append_event_after_the_stream_exists(EventAppendMode mode, StreamIdentity identity)
{
StoreOptions(opts =>
{
opts.Events.AppendMode = mode;
opts.Events.StreamIdentity = identity;
opts.Events.UseMandatoryStreamTypeDeclaration = true;
});

if (identity == StreamIdentity.AsGuid)
{
var streamId = Guid.NewGuid();
theSession.Events.StartStream<SimpleAggregate>(streamId, new BEvent());
await theSession.SaveChangesAsync();
theSession.Events.Append(streamId, new AEvent());
await theSession.SaveChangesAsync();
}
else
{
var streamId = Guid.NewGuid().ToString();
theSession.Events.StartStream<SimpleAggregate>(streamId, new BEvent());
await theSession.SaveChangesAsync();
theSession.Events.Append(streamId, new AEvent());
await theSession.SaveChangesAsync();
}
}

[Fact]
public async Task happy_path_usage_with_guid()
{
Expand Down
18 changes: 16 additions & 2 deletions src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,21 @@ public void Postprocess(DbDataReader reader, IList<Exception> exceptions)
{
var values = reader.GetFieldValue<long[]>(0);

var finalVersion = values[0];
foreach (var e in Stream.Events.Reverse())
{
e.Version = finalVersion;
finalVersion--;
}

// Ignore the first value
for (int i = 1; i < values.Length; i++)
{
// Only setting the sequence to aid in tombstone processing
Stream.Events[i - 1].Sequence = values[i];
}

if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 0)
if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 1)
{
throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid
? Stream.Id
Expand Down Expand Up @@ -119,14 +126,21 @@ public async Task PostprocessAsync(DbDataReader reader, IList<Exception> excepti
{
var values = await reader.GetFieldValueAsync<long[]>(0, token).ConfigureAwait(false);

var finalVersion = values[0];
foreach (var e in Stream.Events.Reverse())
{
e.Version = finalVersion;
finalVersion--;
}

// Ignore the first value
for (int i = 1; i < values.Length; i++)
{
// Only setting the sequence to aid in tombstone processing
Stream.Events[i - 1].Sequence = values[i];
}

if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 0)
if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 1)
{
throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid
? Stream.Id
Expand Down

0 comments on commit 2369d37

Please sign in to comment.