Skip to content

Commit

Permalink
JasperFx#3078 Providing a version for aggregating that is too high sh…
Browse files Browse the repository at this point in the history
…ould aggregate to null
  • Loading branch information
erdtsieck committed Mar 21, 2024
1 parent 61de7fc commit 6f6e18d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
62 changes: 62 additions & 0 deletions src/EventSourcingTests/Bugs/Bug_3078_providing_version.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Threading.Tasks;
using Marten;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Bugs;

public class Bug_3078_providing_version
{
[Fact]
public async Task too_high_should_aggregate_to_null()
{
// Given
var streamId = Guid.NewGuid();
var options = new StoreOptions();
options.Connection(ConnectionSource.ConnectionString);
options.Projections.LiveStreamAggregation<PolledAggregate>();

// When
var store = new DocumentStore(options);
var session = store.LightweightSession();
session.Events.Append(streamId, new PollingShouldFail(streamId));
await session.SaveChangesAsync();

// Then
var aggregate = await session.Events.AggregateStreamAsync<PolledAggregate>(streamId, 2);
aggregate.ShouldBeNull();
}

[Fact]
public async Task exactly_right_should_aggregate_just_fine()
{
// Given
var streamId = Guid.NewGuid();
var options = new StoreOptions();
options.Connection(ConnectionSource.ConnectionString);
options.Projections.LiveStreamAggregation<PolledAggregate>();

// When
var store = new DocumentStore(options);
var session = store.LightweightSession();
session.Events.Append(streamId, new PollingShouldFail(streamId), new PollingShouldNotFail());
await session.SaveChangesAsync();

// Then
var aggregate = await session.Events.AggregateStreamAsync<PolledAggregate>(streamId, 2);
aggregate.ShouldNotBeNull();
}
}

public record PollingShouldFail(Guid AggregateId);

public record PollingShouldNotFail();

public record PolledAggregate(Guid Id)
{
public static PolledAggregate Create(PollingShouldFail @event) => new(@event.AggregateId);

public PolledAggregate Apply(PollingShouldNotFail _) => this;
}
2 changes: 1 addition & 1 deletion src/Marten/Events/QueryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public async Task<IReadOnlyList<IEvent>> FetchStreamAsync(string streamKey, long
T? state = null, long fromVersion = 0, CancellationToken token = default) where T : class
{
var events = await FetchStreamAsync(streamId, version, timestamp, fromVersion, token).ConfigureAwait(false);
if (!events.Any())
if (!events.Any() || events.Last().Version < version)
{
return state;
}
Expand Down

0 comments on commit 6f6e18d

Please sign in to comment.