From ff5dc739001a1fc1ff6cade62880d20228dd18ed Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 1 Jul 2024 13:52:41 -0500 Subject: [PATCH] "Quick" Append Events. Closes GH-3138 Docs on the quick append option Few more tests for metadata on the new quick append mode. More tests on the quick append First end to end runs of the quick append mechanism Roughed in QuickEventAppender and configuration logic for switching append modes First successful pass at code generation for the quick append function Lot more preparatory work for "quick append". Put back the old AppendEventFunction Refactored on the code generation for append event operations in preparation for the quick append Roughed in IncrementStreamVersionBy*** for later --- docs/events/appending.md | 20 + src/EventSourcingTests/EventGraphTests.cs | 24 + .../EventStoreSchemaV3ToV4Tests.cs | 119 --- .../QuickAppend/Examples.cs | 29 + .../QuestPartyWithStringIdentifier.cs | 52 ++ .../StringIdentifiedStreamsCollection.cs | 8 + .../StringIdentifiedStreamsFixture.cs | 19 + .../QuickAppend/quick_append_end_to_end.cs | 45 ++ ...d_event_capture_and_fetching_the_stream.cs | 676 ++++++++++++++++++ ...hing_the_stream_with_string_identifiers.cs | 451 ++++++++++++ ...ents_with_optimistic_or_exclusive_locks.cs | 286 ++++++++ .../quick_appending_events_workflow_specs.cs | 413 +++++++++++ .../Events/Archiving/IsArchivedColumn.cs | 7 +- .../EventDocumentStorageGenerator.cs | 113 ++- src/Marten/Events/EventDocumentStorage.cs | 19 + src/Marten/Events/EventGraph.FeatureSchema.cs | 4 +- src/Marten/Events/EventGraph.Processing.cs | 133 +--- src/Marten/Events/IEventAppender.cs | 15 + src/Marten/Events/IEventStorage.cs | 13 + src/Marten/Events/IEventStoreOptions.cs | 2 + .../Events/IReadOnlyEventStoreOptions.cs | 2 + .../Operations/AppendEventOperationBase.cs | 1 + .../Operations/IncrementStreamVersionById.cs | 46 ++ .../Operations/IncrementStreamVersionByKey.cs | 46 ++ .../QuickAppendEventsOperationBase.cs | 115 +++ ...amOperations.cs => UpdateStreamVersion.cs} | 1 + src/Marten/Events/QuickEventAppender.cs | 73 ++ src/Marten/Events/RichEventAppender.cs | 117 +++ .../Events/Schema/EventJsonDataColumn.cs | 7 +- src/Marten/Events/Schema/EventTableColumn.cs | 16 +- src/Marten/Events/Schema/EventTypeColumn.cs | 7 +- src/Marten/Events/Schema/EventsTable.cs | 6 +- src/Marten/Events/Schema/IEventTableColumn.cs | 11 +- .../Events/Schema/QuickAppendEventFunction.cs | 114 +++ src/Marten/Events/Schema/SequenceColumn.cs | 25 + src/Marten/Events/Schema/StreamIdColumn.cs | 7 +- src/Marten/Events/Schema/VersionColumn.cs | 14 + src/Marten/Events/StreamAction.cs | 6 +- .../DocumentSessionBase.SaveChanges.cs | 10 +- .../Storage/Metadata/CausationIdColumn.cs | 12 +- .../Storage/Metadata/CorrelationIdColumn.cs | 7 +- .../Storage/Metadata/DotNetTypeColumn.cs | 7 +- src/Marten/Storage/Metadata/HeadersColumn.cs | 7 +- src/Marten/Storage/Metadata/TenantIdColumn.cs | 7 +- 44 files changed, 2864 insertions(+), 248 deletions(-) delete mode 100644 src/EventSourcingTests/EventStoreSchemaV3ToV4Tests.cs create mode 100644 src/EventSourcingTests/QuickAppend/Examples.cs create mode 100644 src/EventSourcingTests/QuickAppend/QuestPartyWithStringIdentifier.cs create mode 100644 src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsCollection.cs create mode 100644 src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsFixture.cs create mode 100644 src/EventSourcingTests/QuickAppend/quick_append_end_to_end.cs create mode 100644 src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs create mode 100644 src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs create mode 100644 src/EventSourcingTests/QuickAppend/quick_append_events_with_optimistic_or_exclusive_locks.cs create mode 100644 src/EventSourcingTests/QuickAppend/quick_appending_events_workflow_specs.cs create mode 100644 src/Marten/Events/IEventAppender.cs create mode 100644 src/Marten/Events/Operations/IncrementStreamVersionById.cs create mode 100644 src/Marten/Events/Operations/IncrementStreamVersionByKey.cs create mode 100644 src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs rename src/Marten/Events/Operations/{UpdateStreamOperations.cs => UpdateStreamVersion.cs} (97%) create mode 100644 src/Marten/Events/QuickEventAppender.cs create mode 100644 src/Marten/Events/RichEventAppender.cs create mode 100644 src/Marten/Events/Schema/QuickAppendEventFunction.cs create mode 100644 src/Marten/Events/Schema/SequenceColumn.cs create mode 100644 src/Marten/Events/Schema/VersionColumn.cs diff --git a/docs/events/appending.md b/docs/events/appending.md index 83d3be2b1a..830eeeb54f 100644 --- a/docs/events/appending.md +++ b/docs/events/appending.md @@ -18,6 +18,26 @@ The event data is persisted to two tables: Events can be captured by either starting a new stream or by appending events to an existing stream. In addition, Marten has some tricks up its sleeve for dealing with concurrency issues that may result from multiple transactions trying to simultaneously append events to the same stream. +## "Rich" vs "Quick" Appends + +Before diving into starting new event streams or appending events to existing streams, just know that there are two different +modes of event appending you can use with Marten: + +snippet: sample_configuring_event_append_mode + +The classic `Rich` mode will append events in a two step process where the local session will first determine all possible +metadata for the events about to be appended such that inline projections can use event versions and the global event sequence +numbers at the time that the inline projections are created. + +The newer `Quick` mode eschews version and sequence metadata in favor of performing the event append and stream creation +operations with minimal overhead. The improved performance comes at the cost of not having the `IEvent.Version` and `IEvent.Sequence` +information available at the time that inline projections are executed. + +If using inline projections for a single stream (`SingleStreamProjection` or _snapshots_) and the `Quick` mode, the Marten team +highly recommends using the `IRevisioned` interface on your projected aggregate documents so that Marten can "move" the version +set by the database operations to the version of the projected documents loaded from the database later. Mapping a custom member +to the `Revision` metadata will work as well. + ## Starting a new Stream You can **optionally** start a new event stream against some kind of .Net type that theoretically marks the type of stream you're capturing. diff --git a/src/EventSourcingTests/EventGraphTests.cs b/src/EventSourcingTests/EventGraphTests.cs index acf71ba466..6c3e5e1576 100644 --- a/src/EventSourcingTests/EventGraphTests.cs +++ b/src/EventSourcingTests/EventGraphTests.cs @@ -82,6 +82,30 @@ public void has_any_is_true_with_any_events() theGraph.IsActive(null).ShouldBeTrue(); } + [Fact] + public void default_append_mode_is_rich() + { + theGraph.AppendMode.ShouldBe(EventAppendMode.Rich); + theGraph.EventAppender.ShouldBeOfType(); + } + + [Fact] + public void switch_to_quick() + { + theGraph.AppendMode = EventAppendMode.Quick; + theGraph.EventAppender.ShouldBeOfType(); + theGraph.AppendMode.ShouldBe(EventAppendMode.Quick); + } + + [Fact] + public void switch_to_quick_and_back_to_rich() + { + theGraph.AppendMode = EventAppendMode.Quick; + theGraph.AppendMode = EventAppendMode.Rich; + theGraph.AppendMode.ShouldBe(EventAppendMode.Rich); + theGraph.EventAppender.ShouldBeOfType(); + } + public class HouseRemodeling { public Guid Id { get; set; } diff --git a/src/EventSourcingTests/EventStoreSchemaV3ToV4Tests.cs b/src/EventSourcingTests/EventStoreSchemaV3ToV4Tests.cs deleted file mode 100644 index d08b2c68c4..0000000000 --- a/src/EventSourcingTests/EventStoreSchemaV3ToV4Tests.cs +++ /dev/null @@ -1,119 +0,0 @@ -using System; -using System.Threading.Tasks; -using Marten; -using Marten.Events; -using Marten.Testing; -using Marten.Testing.Harness; -using Npgsql; -using Shouldly; -using Weasel.Core; -using Weasel.Core.Migrations; -using Weasel.Postgresql; -using Xunit; - -namespace EventSourcingTests; - -public class EventStoreSchemaV3ToV4Tests : OneOffConfigurationsContext -{ - [Fact] - public async Task can_create_patch_for_event_store_schema_changes() - { - var store1 = Store(AutoCreate.All); - await store1.EnsureStorageExistsAsync(typeof(StreamAction)); - - SimulateEventStoreV3Schema(); - - // create another store and check if the schema can be be auto updated - using var store2 = Store(AutoCreate.CreateOrUpdate); - - var sql = (await store2.Storage.Database.CreateMigrationAsync()).UpdateSql(); - sql.ShouldContain($"alter table {_schemaName}.mt_events alter column version type bigint", Case.Insensitive); - sql.ShouldContain($"alter table {_schemaName}.mt_streams alter column version type bigint", Case.Insensitive); - sql.ShouldContain($"drop function if exists {_schemaName}.mt_append_event", Case.Insensitive); - } - - [Fact] - public async Task can_auto_update_event_store_schema_changes() - { - using var store1 = Store(AutoCreate.All); - await store1.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); - - SimulateEventStoreV3Schema(); - - // create another store and check if the schema can be be auto updated - using var store2 = Store(AutoCreate.CreateOrUpdate); - - await Should.ThrowAsync(async () => - { - await store2.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); - }); - - await Should.NotThrowAsync(async () => - { - await store2.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); - await store2.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); - }); - } - - [Fact] - public async Task should_not_have_v3_to_v4_patches_on_v4_schema() - { - using var store1 = Store(AutoCreate.All); - await store1.Tenancy.Default.Database.EnsureStorageExistsAsync(typeof(StreamAction)); - - // create another store and check if no v3 to v4 patches are generated - using var store2 = Store(AutoCreate.CreateOrUpdate); - - var sql = (await store2.Storage.Database.CreateMigrationAsync()).UpdateSql(); - sql.ShouldNotContain($"alter table {_schemaName}.mt_events alter column version type bigint", Case.Insensitive); - sql.ShouldNotContain($"alter table {_schemaName}.mt_streams alter column version type bigint", Case.Insensitive); - sql.ShouldNotContain($"drop function if exists {_schemaName}.mt_append_event", Case.Insensitive); - } - - private DocumentStore Store(AutoCreate autoCreate) - { - return DocumentStore.For(_ => - { - _.DatabaseSchemaName = _schemaName; - _.Connection(ConnectionSource.ConnectionString); - _.AutoCreateSchemaObjects = autoCreate; - _.EventGraph.EventMappingFor(); - }); - } - - private void SimulateEventStoreV3Schema() - { - // simulate to event store v3 schema with version fields as int - using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); - try - { - conn.Open(); - // version as integer in mt_events - conn.CreateCommand($"alter table {_schemaName}.mt_events alter column version type int") - .ExecuteNonQuery(); - - conn.CreateCommand($"alter table {_schemaName}.mt_events drop column is_archived") - .ExecuteNonQuery(); - - conn.CreateCommand($"alter table {_schemaName}.mt_streams drop column is_archived") - .ExecuteNonQuery(); - - // version as integer in mt_streams - conn.CreateCommand($"alter table {_schemaName}.mt_streams alter column version type int") - .ExecuteNonQuery(); - conn.CreateCommand($"create function {_schemaName}.mt_append_event(uuid, varchar, varchar, uuid[], varchar[], varchar[], jsonb[]) returns int language plpgsql as $$ begin return 1; end; $$;") - .ExecuteNonQuery(); - } - finally - { - conn.Close(); - } - } - - private readonly string _schemaName; - - public EventStoreSchemaV3ToV4Tests() - { - _schemaName = $"s_{Guid.NewGuid().ToString().Replace("-", "")}"; - } -} \ No newline at end of file diff --git a/src/EventSourcingTests/QuickAppend/Examples.cs b/src/EventSourcingTests/QuickAppend/Examples.cs new file mode 100644 index 0000000000..7ffa405331 --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/Examples.cs @@ -0,0 +1,29 @@ +using System.Threading.Tasks; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; + +namespace EventSourcingTests.QuickAppend; + +public class Examples +{ + public static async Task configure() + { + #region sample_configuring_event_append_mode + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + // This is the default Marten behavior from 4.0 on + opts.Events.AppendMode = EventAppendMode.Rich; + + // Lighter weight mode that should result in better + // performance, but with a loss of available metadata + // within inline projections + opts.Events.AppendMode = EventAppendMode.Quick; + }) + .UseNpgsqlDataSource(); + + #endregion + } +} diff --git a/src/EventSourcingTests/QuickAppend/QuestPartyWithStringIdentifier.cs b/src/EventSourcingTests/QuickAppend/QuestPartyWithStringIdentifier.cs new file mode 100644 index 0000000000..d379896e04 --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/QuestPartyWithStringIdentifier.cs @@ -0,0 +1,52 @@ +using System.Collections.Generic; +using System.Linq; +using JasperFx.Core; + +namespace EventSourcingTests.QuickAppend; + +public class QuestPartyWithStringIdentifier +{ + private readonly IList _members = new List(); + + public string[] Members + { + get + { + return _members.ToArray(); + } + set + { + _members.Clear(); + _members.AddRange(value); + } + } + + public IList Slayed { get; } = new List(); + + public void Apply(MembersJoined joined) + { + if (joined.Members != null) + _members.Fill(joined.Members); + } + + public void Apply(MembersDeparted departed) + { + _members.RemoveAll(x => departed.Members.Contains(x)); + } + + public void Apply(QuestStarted started) + { + Name = started.Name; + } + + public string Key { get; set; } + + public string Name { get; set; } + + public string Id { get; set; } + + public override string ToString() + { + return $"Quest party '{Name}' is {Members.Join(", ")}"; + } +} \ No newline at end of file diff --git a/src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsCollection.cs b/src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsCollection.cs new file mode 100644 index 0000000000..b8c4046a6c --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsCollection.cs @@ -0,0 +1,8 @@ +using Xunit; + +namespace EventSourcingTests.QuickAppend; + +[CollectionDefinition("quick_string_identified_streams")] +public class StringIdentifiedStreamsCollection: ICollectionFixture +{ +} diff --git a/src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsFixture.cs b/src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsFixture.cs new file mode 100644 index 0000000000..af0cd8e077 --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/StringIdentifiedStreamsFixture.cs @@ -0,0 +1,19 @@ +using Marten.Events; +using Marten.Events.Projections; +using Marten.Testing.Harness; + +namespace EventSourcingTests.QuickAppend; + +public class StringIdentifiedStreamsFixture: StoreFixture +{ + public StringIdentifiedStreamsFixture(): base("quick_string_identified_streams") + { + Options.Events.AppendMode = EventAppendMode.Quick; + Options.Events.StreamIdentity = StreamIdentity.AsString; + Options.Projections.Snapshot(SnapshotLifecycle.Inline); + + Options.Events.AddEventType(typeof(MembersJoined)); + Options.Events.AddEventType(typeof(MembersDeparted)); + Options.Events.AddEventType(typeof(QuestStarted)); + } +} diff --git a/src/EventSourcingTests/QuickAppend/quick_append_end_to_end.cs b/src/EventSourcingTests/QuickAppend/quick_append_end_to_end.cs new file mode 100644 index 0000000000..3e4ee92c4b --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/quick_append_end_to_end.cs @@ -0,0 +1,45 @@ +using System.Threading.Tasks; +using Marten.Events; +using Marten.Testing.Harness; +using Xunit; +using Shouldly; + +namespace EventSourcingTests.QuickAppend; + +public class quick_append_end_to_end : OneOffConfigurationsContext +{ + public quick_append_end_to_end() + { + StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + }); + } + + [Fact] + public async Task append_with_metadata_using_function() + { + theSession.CorrelationId = "lotr"; + theSession.CausationId = "fellowship"; + theSession.SetHeader("color", "blue"); + + var streamId = + theSession.Events.StartStream(new QuestStarted(), new MembersJoined(1, "Hobbiton", "Frodo", "Sam")).Id; + await theSession.SaveChangesAsync(); + + var events = await theSession.Events.FetchStreamAsync(streamId); + + foreach (var e in events) + { + e.Sequence.ShouldBeGreaterThan(0); + e.Version.ShouldBeGreaterThan(0); + + e.CorrelationId.ShouldBe(theSession.CorrelationId); + e.CausationId.ShouldBe(theSession.CausationId); + e.Headers["color"].ShouldBe("blue"); + } + } +} diff --git a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs new file mode 100644 index 0000000000..41975a5c00 --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs @@ -0,0 +1,676 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using EventSourcingTests.Projections; +using EventSourcingTests.Utils; +using JasperFx.Core; +using Marten; +using Marten.Events; +using Marten.Events.Projections; +using Marten.Storage; +using Marten.Testing.Harness; +using Shouldly; +using Weasel.Core; +using Xunit; +using Xunit.Abstractions; + +namespace EventSourcingTests.QuickAppend; + +public class quick_append_event_capture_and_fetching_the_stream: OneOffConfigurationsContext +{ + private readonly ITestOutputHelper _output; + private static readonly string[] SameTenants = { "tenant", "tenant" }; + private static readonly string[] DifferentTenants = { "tenant", "differentTenant" }; + private static readonly string[] DefaultTenant = { Tenancy.DefaultTenantId }; + + public quick_append_event_capture_and_fetching_the_stream(ITestOutputHelper output) + { + _output = output; + } + + public static TheoryData SessionParams = new TheoryData + { + { TenancyStyle.Conjoined, SameTenants }, + { TenancyStyle.Conjoined, DifferentTenants }, + { TenancyStyle.Single, DefaultTenant }, + { TenancyStyle.Single, DifferentTenants }, + { TenancyStyle.Single, SameTenants }, + }; + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_a_new_stream_and_fetch_the_events_back(TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, _) => + { + using var session = store.LightweightSession(tenantId); + session.Logger = new TestOutputMartenLogger(_output); + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = session.Events.StartStream(joined, departed).Id; + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => ShouldBeTestExtensions.ShouldNotBe(e.Timestamp, default(DateTimeOffset))); + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public Task capture_events_to_a_new_stream_and_fetch_the_events_back_async(TenancyStyle tenancyStyle, + string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + return When.CalledForEachAsync(tenants, async (tenantId, _) => + { + await using var session = store.LightweightSession(tenantId); + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = session.Events.StartStream(joined, departed).Id; + await session.SaveChangesAsync(); + + var streamEvents = await session.Events.FetchStreamAsync(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => e.Timestamp.ShouldNotBe(default)); + }).ShouldSucceedAsync(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public Task capture_events_to_a_new_stream_and_fetch_the_events_back_async_with_linq(TenancyStyle tenancyStyle, + string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + return When.CalledForEachAsync(tenants, async (tenantId, _) => + { + await using var session = store.LightweightSession(tenantId); + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = session.Events.StartStream(joined, departed).Id; + await session.SaveChangesAsync(); + + var streamEvents = await session.Events.QueryAllRawEvents() + .Where(x => x.StreamId == id).OrderBy(x => x.Version).ToListAsync(); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => e.Timestamp.ShouldNotBe(default(DateTimeOffset))); + }).ShouldSucceedAsync(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_a_new_stream_and_fetch_the_events_back_sync_with_linq(TenancyStyle tenancyStyle, + string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, _) => + { + using var session = store.LightweightSession(tenantId); + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = session.Events.StartStream(joined, departed).Id; + session.SaveChanges(); + + var streamEvents = session.Events.QueryAllRawEvents() + .Where(x => x.StreamId == id).OrderBy(x => x.Version).ToList(); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => e.Timestamp.ShouldNotBe(default(DateTimeOffset))); + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void live_aggregate_equals_inlined_aggregate_without_hidden_contracts(TenancyStyle tenancyStyle, + string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + var questId = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + //Note Id = questId, is we remove it from first message then AggregateStream will return party.Id=default(Guid) that is not equals to Load result + var started = new QuestStarted + { + /*Id = questId,*/ + Name = "Destroy the One Ring" + }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + session.SaveChanges(); + } + + using (var session = store.LightweightSession(tenantId)) + { + var liveAggregate = session.Events.AggregateStream(questId); + var inlinedAggregate = session.Load(questId); + liveAggregate.Id.ShouldBe(inlinedAggregate.Id); + inlinedAggregate.ToString().ShouldBe(liveAggregate.ToString()); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void open_persisted_stream_in_new_store_with_same_settings(TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + var questId = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + //Note "Id = questId" @see live_aggregate_equals_inlined_aggregate... + var started = new QuestStarted { Id = questId, Name = "Destroy the One Ring" }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + session.SaveChanges(); + } + + // events-aggregate-on-the-fly - works with same store + using (var session = store.LightweightSession(tenantId)) + { + // questId is the id of the stream + var party = session.Events.AggregateStream(questId); + + party.Id.ShouldBe(questId); + party.ShouldNotBeNull(); + + var party_at_version_3 = session.Events + .AggregateStream(questId, 3); + + party_at_version_3.ShouldNotBeNull(); + + var party_yesterday = session.Events + .AggregateStream(questId, timestamp: DateTimeOffset.UtcNow.AddDays(-1)); + party_yesterday.ShouldBeNull(); + } + + using (var session = store.LightweightSession(tenantId)) + { + var party = session.Load(questId); + party.Id.ShouldBe(questId); + } + + var newStore = ConfigureStore(tenancyStyle, false); + + //Inline is working + using (var session = store.LightweightSession(tenantId)) + { + var party = session.Load(questId); + SpecificationExtensions.ShouldNotBeNull(party); + } + + //GetAll + using (var session = store.LightweightSession(tenantId)) + { + var parties = session.Events.QueryRawEventDataOnly().ToArray(); + foreach (var party in parties) + { + SpecificationExtensions.ShouldNotBeNull(party); + } + } + + //This AggregateStream fail with NPE + using (var session = store.LightweightSession(tenantId)) + { + // questId is the id of the stream + var party = session.Events.AggregateStream(questId); //Here we get NPE + party.Id.ShouldBe(questId); + + var party_at_version_3 = session.Events + .AggregateStream(questId, 3); + party_at_version_3.Id.ShouldBe(questId); + + var party_yesterday = session.Events + .AggregateStream(questId, timestamp: DateTimeOffset.UtcNow.AddDays(-1)); + party_yesterday.ShouldBeNull(); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void query_before_saving(TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + var questId = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + var parties = session.Query().ToArray(); + parties.Length.ShouldBeLessThanOrEqualTo(index); + } + + //This SaveChanges will fail with missing method (ro collection configured?) + using (var session = store.LightweightSession(tenantId)) + { + var started = new QuestStarted { Name = "Destroy the One Ring" }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + session.SaveChanges(); + + var party = session.Events.AggregateStream(questId); + party.Id.ShouldBe(questId); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public Task aggregate_stream_async_has_the_id(TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + var questId = Guid.NewGuid(); + + return When.CalledForEachAsync(tenants, async (tenantId, index) => + { + await using (var session = store.LightweightSession(tenantId)) + { + var parties = await session.Query().ToListAsync(); + parties.Count.ShouldBeLessThanOrEqualTo(index); + } + + //This SaveChanges will fail with missing method (ro collection configured?) + await using (var session = store.LightweightSession(tenantId)) + { + var started = new QuestStarted { Name = "Destroy the One Ring" }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + await session.SaveChangesAsync(); + + var party = await session.Events.AggregateStreamAsync(questId); + party.Id.ShouldBe(questId); + } + }).ShouldThrowIfAsync( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_a_new_stream_and_fetch_the_events_back_with_stream_id_provided( + TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = Guid.NewGuid(); + session.Events.StartStream(id, joined, departed); + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_a_non_existing_stream_and_fetch_the_events_back(TenancyStyle tenancyStyle, + string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = Guid.NewGuid(); + session.Events.StartStream(id, joined); + session.Events.Append(id, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_an_existing_stream_and_fetch_the_events_back(TenancyStyle tenancyStyle, + string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + var id = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + var started = new QuestStarted(); + + using (var session = store.LightweightSession(tenantId)) + { + session.Events.StartStream(id, started); + session.SaveChanges(); + } + + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + session.Events.Append(id, joined); + session.Events.Append(id, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(3); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + streamEvents.ElementAt(2).Data.ShouldBeOfType(); + streamEvents.ElementAt(2).Version.ShouldBe(3); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_a_new_stream_and_fetch_the_events_back_in_another_database_schema( + TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = session.Events.StartStream(joined, departed).Id; + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void + capture_events_to_a_new_stream_and_fetch_the_events_back_with_stream_id_provided_in_another_database_schema( + TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = Guid.NewGuid(); + session.Events.StartStream(id, joined, departed); + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(x => SpecificationExtensions.ShouldBeGreaterThan(x.Sequence, 0L)); + } + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_a_non_existing_stream_and_fetch_the_events_back_in_another_database_schema( + TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + When.CalledForEach(tenants, (tenantId, index) => + { + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = Guid.NewGuid(); + session.Events.StartStream(id, joined); + session.Events.Append(id, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + }).ShouldSucceed(); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_events_to_an_existing_stream_and_fetch_the_events_back_in_another_database_schema( + TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + var id = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + var started = new QuestStarted(); + + using (var session = store.LightweightSession(tenantId)) + { + session.Events.StartStream(id, started); + session.SaveChanges(); + } + + using (var session = store.LightweightSession(tenantId)) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + session.Events.Append(id, joined, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(3); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + streamEvents.ElementAt(2).Data.ShouldBeOfType(); + streamEvents.ElementAt(2).Version.ShouldBe(3); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void assert_on_max_event_id_on_event_stream_append( + TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + var id = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + var started = new QuestStarted(); + + using (var session = store.LightweightSession(tenantId)) + { + session.Events.StartStream(id, started); + session.SaveChanges(); + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + // Events are appended into the stream only if the maximum event id for the stream + // would be 3 after the append operation. + session.Events.Append(id, 3, joined, departed); + + session.SaveChanges(); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + [Theory] + [MemberData(nameof(SessionParams))] + public void capture_immutable_events(TenancyStyle tenancyStyle, string[] tenants) + { + var store = ConfigureStore(tenancyStyle); + + var id = Guid.NewGuid(); + + When.CalledForEach(tenants, (tenantId, index) => + { + var immutableEvent = new ImmutableEvent(id, "some-name"); + + using (var session = store.LightweightSession(tenantId)) + { + session.Events.Append(id, immutableEvent); + session.SaveChanges(); + } + + using (var session = store.LightweightSession(tenantId)) + { + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count.ShouldBe(1); + var @event = streamEvents.ElementAt(0).Data.ShouldBeOfType(); + + @event.Id.ShouldBe(id); + @event.Name.ShouldBe("some-name"); + } + }).ShouldThrowIf( + (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || + (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) + ); + } + + private DocumentStore ConfigureStore(TenancyStyle tenancyStyle, bool cleanSchema = true) + { + var store = StoreOptions(opts => + { + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.TenancyStyle = tenancyStyle; + + opts.AutoCreateSchemaObjects = AutoCreate.All; + + if (tenancyStyle == TenancyStyle.Conjoined) + opts.Policies.AllDocumentsAreMultiTenanted(); + + opts.Connection(ConnectionSource.ConnectionString); + + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + + opts.Events.AddEventType(typeof(MembersJoined)); + opts.Events.AddEventType(typeof(MembersDeparted)); + opts.Events.AddEventType(typeof(QuestStarted)); + }, cleanSchema); + + + return store; + } +} diff --git a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs new file mode 100644 index 0000000000..bb130e3dab --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs @@ -0,0 +1,451 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Core; +using Marten; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.QuickAppend; + +[Collection("quick_string_identified_streams")] +public class + quick_append_event_capture_and_fetching_the_stream_with_string_identifiers: StoreContext< + StringIdentifiedStreamsFixture> +{ + public quick_append_event_capture_and_fetching_the_stream_with_string_identifiers( + StringIdentifiedStreamsFixture fixture): base(fixture) + { + } + + [Fact] + public void capture_events_to_a_new_stream_and_fetch_the_events_back() + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "First"; + + theSession.Events.StartStream(id, joined, departed); + theSession.SaveChanges(); + + var streamEvents = theSession.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => ShouldBeTestExtensions.ShouldNotBe(e.Timestamp, default(DateTimeOffset))); + } + + [Fact] + public async Task capture_events_to_a_new_stream_and_fetch_the_events_back_async() + { + #region sample_start-stream-with-aggregate-type + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Second"; + theSession.Events.StartStream(id, joined, departed); + await theSession.SaveChangesAsync(); + + #endregion + + var streamEvents = await theSession.Events.FetchStreamAsync(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => e.Timestamp.ShouldNotBe(default(DateTimeOffset))); + } + + [Fact] + public async Task capture_events_to_a_new_stream_and_fetch_the_events_back_async_with_linq() + { + #region sample_start-stream-with-aggregate-type + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Third"; + theSession.Events.StartStream(id, joined, departed); + await theSession.SaveChangesAsync(); + + #endregion + + var streamEvents = await theSession.Events.QueryAllRawEvents() + .Where(x => x.StreamKey == id).OrderBy(x => x.Version).ToListAsync(); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => e.Timestamp.ShouldNotBe(default(DateTimeOffset))); + } + + [Fact] + public void capture_events_to_a_new_stream_and_fetch_the_events_back_sync_with_linq() + { + #region sample_start-stream-with-aggregate-type + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Fourth"; + theSession.Events.StartStream(id, joined, departed); + theSession.SaveChanges(); + + #endregion + + var streamEvents = theSession.Events.QueryAllRawEvents() + .Where(x => x.StreamKey == id).OrderBy(x => x.Version).ToList(); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + + streamEvents.Each(e => e.Timestamp.ShouldNotBe(default(DateTimeOffset))); + } + + [Fact] + public void live_aggregate_equals_inlined_aggregate_without_hidden_contracts() + { + var questId = "Fifth"; + + using (var session = theStore.LightweightSession()) + { + //Note Id = questId, is we remove it from first message then AggregateStream will return party.Id=default(Guid) that is not equals to Load result + var started = new QuestStarted + { + /*Id = questId,*/ + Name = "Destroy the One Ring" + }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + session.SaveChanges(); + } + + using (var session = theStore.LightweightSession()) + { + var liveAggregate = session.Events.AggregateStream(questId); + var inlinedAggregate = session.Load(questId); + liveAggregate.Id.ShouldBe(inlinedAggregate.Id); + inlinedAggregate.ToString().ShouldBe(liveAggregate.ToString()); + } + } + + [Fact] + public void open_persisted_stream_in_new_store_with_same_settings() + { + var questId = "Sixth"; + + using (var session = theStore.LightweightSession()) + { + //Note "Id = questId" @see live_aggregate_equals_inlined_aggregate... + var started = new QuestStarted { Name = "Destroy the One Ring" }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + session.SaveChanges(); + } + + // events-aggregate-on-the-fly - works with same store + using (var session = theStore.LightweightSession()) + { + // questId is the id of the stream + var party = session.Events.AggregateStream(questId); + + party.ShouldNotBeNull(); + + var party_at_version_3 = session.Events + .AggregateStream(questId, 3); + + party_at_version_3.ShouldNotBeNull(); + + var party_yesterday = session.Events + .AggregateStream(questId, timestamp: DateTime.UtcNow.AddDays(-1)); + party_yesterday.ShouldBeNull(); + } + + using (var session = theStore.LightweightSession()) + { + var party = session.Load(questId); + party.ShouldNotBeNull(); + } + + var newStore = new DocumentStore(theStore.Options); + + //Inline is working + using (var session = newStore.LightweightSession()) + { + var party = session.Load(questId); + party.ShouldNotBeNull(); + } + + //GetAll + using (var session = theStore.LightweightSession()) + { + var parties = session.Events.QueryRawEventDataOnly().ToArray(); + foreach (var party in parties) + { + party.ShouldNotBeNull(); + } + } + + //This AggregateStream fail with NPE + using (var session = newStore.LightweightSession()) + { + // questId is the id of the stream + var party = session.Events.AggregateStream(questId); //Here we get NPE + party.ShouldNotBeNull(); + + var party_at_version_3 = session.Events + .AggregateStream(questId, 3); + party_at_version_3.ShouldNotBeNull(); + + var party_yesterday = session.Events + .AggregateStream(questId, timestamp: DateTime.UtcNow.AddDays(-1)); + party_yesterday.ShouldBeNull(); + } + } + + [Fact] + public async Task query_before_saving() + { + var questId = "Seventh"; + + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + await theStore.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(QuestPartyWithStringIdentifier)); + + await using (var session = theStore.LightweightSession()) + { + var parties = await session.Query().CountAsync(); + parties.ShouldBeLessThanOrEqualTo(0); + } + + //This SaveChanges will fail with missing method (ro collection configured?) + await using (var session = theStore.LightweightSession()) + { + var started = new QuestStarted { Name = "Destroy the One Ring" }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + await session.SaveChangesAsync(); + + var party = await session.Events.AggregateStreamAsync(questId); + SpecificationExtensions.ShouldNotBeNull(party); + } + } + + [Fact] + public async Task aggregate_stream_async_has_the_id() + { + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + await theStore.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(QuestPartyWithStringIdentifier)); + + var questId = "Eighth"; + + await using (var session = theStore.LightweightSession()) + { + var parties = await session.Query().ToListAsync(); + parties.Count.ShouldBeLessThanOrEqualTo(0); + } + + //This SaveChanges will fail with missing method (ro collection configured?) + await using (var session = theStore.LightweightSession()) + { + var started = new QuestStarted { Name = "Destroy the One Ring" }; + var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); + + session.Events.StartStream(questId, started, joined1); + await session.SaveChangesAsync(); + + var party = await session.Events.AggregateStreamAsync(questId); + SpecificationExtensions.ShouldNotBeNull(party); + } + } + + [Fact] + public void capture_events_to_a_new_stream_and_fetch_the_events_back_with_stream_id_provided() + { + using var session = theStore.LightweightSession(); + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Tenth"; + session.Events.StartStream(id, joined, departed); + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + + [Fact] + public void capture_events_to_a_non_existing_stream_and_fetch_the_events_back() + { + using var session = theStore.LightweightSession(); + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Eleventh"; + session.Events.StartStream(id, joined); + session.Events.Append(id, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + + [Fact] + public void capture_events_to_an_existing_stream_and_fetch_the_events_back() + { + var id = "Twelth"; + var started = new QuestStarted(); + + using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(id, started); + session.SaveChanges(); + } + + using (var session = theStore.LightweightSession()) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + session.Events.Append(id, joined); + session.Events.Append(id, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(3); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + streamEvents.ElementAt(2).Data.ShouldBeOfType(); + streamEvents.ElementAt(2).Version.ShouldBe(3); + } + } + + [Fact] + public void capture_events_to_a_new_stream_and_fetch_the_events_back_in_another_database_schema() + { + using var session = theStore.LightweightSession(); + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Thirteen"; + session.Events.StartStream(id, joined, departed); + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + + + [Fact] + public void capture_events_to_a_non_existing_stream_and_fetch_the_events_back_in_another_database_schema() + { + using var session = theStore.LightweightSession(); + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + var id = "Fourteen"; + session.Events.StartStream(id, joined); + session.Events.Append(id, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(2); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + } + + [Fact] + public void capture_events_to_an_existing_stream_and_fetch_the_events_back_in_another_database_schema() + { + var id = "Fifteen"; + var started = new QuestStarted(); + + using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(id, started); + session.SaveChanges(); + } + + using (var session = theStore.LightweightSession()) + { + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + session.Events.Append(id, joined, departed); + + session.SaveChanges(); + + var streamEvents = session.Events.FetchStream(id); + + streamEvents.Count().ShouldBe(3); + streamEvents.ElementAt(0).Data.ShouldBeOfType(); + streamEvents.ElementAt(0).Version.ShouldBe(1); + streamEvents.ElementAt(1).Data.ShouldBeOfType(); + streamEvents.ElementAt(1).Version.ShouldBe(2); + streamEvents.ElementAt(2).Data.ShouldBeOfType(); + streamEvents.ElementAt(2).Version.ShouldBe(3); + } + } + + [Fact] + public void assert_on_max_event_id_on_event_stream_append() + { + var id = "Sixteen"; + var started = new QuestStarted(); + + using var session = theStore.LightweightSession(); + session.Events.StartStream(id, started); + session.SaveChanges(); + + var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } }; + var departed = new MembersDeparted { Members = new[] { "Thom" } }; + + // Events are appended into the stream only if the maximum event id for the stream + // would be 3 after the append operation. + session.Events.Append(id, 3, joined, departed); + + session.SaveChanges(); + } +} diff --git a/src/EventSourcingTests/QuickAppend/quick_append_events_with_optimistic_or_exclusive_locks.cs b/src/EventSourcingTests/QuickAppend/quick_append_events_with_optimistic_or_exclusive_locks.cs new file mode 100644 index 0000000000..c2e5c5f846 --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/quick_append_events_with_optimistic_or_exclusive_locks.cs @@ -0,0 +1,286 @@ +using System; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using Marten.Events; +using Marten.Exceptions; +using Marten.Services; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; +using Xunit.Abstractions; + +namespace EventSourcingTests.QuickAppend; + +public class quick_append_events_with_optimistic_or_exclusive_locks +{ + public class quick_append_events_optimistic_or_exclusive_with_guid_identity: OneOffConfigurationsContext, IAsyncLifetime + { + private readonly ITestOutputHelper _output; + + public quick_append_events_optimistic_or_exclusive_with_guid_identity(ITestOutputHelper output) + { + _output = output; + + } + + public async Task InitializeAsync() + { + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + + StoreOptions(opts => opts.Events.AppendMode = EventAppendMode.Quick); + } + + public Task DisposeAsync() + { + return Task.CompletedTask; + } + + [Fact] + public async Task append_optimistic_sad_path_because_the_stream_does_not_already_exist() + { + var streamId = Guid.NewGuid(); + var ex = await Should.ThrowAsync(async () => + { + await theSession.Events.AppendOptimistic(streamId, new AEvent(), new BEvent()); + }); + + ex.Id.ShouldBe(streamId); + } + + [Fact] + public async Task append_optimistic_happy_path() + { + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + await theSession.Events.AppendOptimistic(streamId, new CEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + var state = await theSession.Events.FetchStreamStateAsync(streamId); + state.Version.ShouldBe(4); + } + + [Fact] + public async Task append_optimistic_sad_path_with_concurrency_issue() + { + var streamId = Guid.NewGuid(); + await using var session1 = theStore.LightweightSession(new SessionOptions { Timeout = 1 }); + + session1.Events.StartStream(streamId, new AEvent(), new BEvent()); + await session1.SaveChangesAsync(); + + // Fetch the expected version + await session1.Events.AppendOptimistic(streamId, new CEvent(), new BEvent()); + + await using (var session = theStore.LightweightSession(new SessionOptions{Timeout = 1})) + { + session.Events.Append(streamId, new DEvent()); + await session.SaveChangesAsync(); + } + + // Should fail a concurrency check + await Should.ThrowAsync(async () => + { + await session1.SaveChangesAsync(); + }); + } + + [Fact] + public async Task append_exclusive_sad_path_because_the_stream_does_not_already_exist() + { + var streamId = Guid.NewGuid(); + var ex = await Should.ThrowAsync(async () => + { + await theSession.Events.AppendExclusive(streamId, new AEvent(), new BEvent()); + }); + + ex.Id.ShouldBe(streamId); + } + + [Fact] + public async Task append_exclusive_happy_path() + { + theSession.Logger = new TestOutputMartenLogger(_output); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + await theSession.Events.AppendExclusive(streamId, new CEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + var state = await theSession.Events.FetchStreamStateAsync(streamId); + state.Version.ShouldBe(4); + } + + [Fact] + public async Task append_exclusive_sad_path_with_concurrency_issue() + { + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + // Fetch the expected version + await theSession.Events.AppendExclusive(streamId, new CEvent(), new BEvent()); + + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, new DEvent()); + var ex = await Should.ThrowAsync(async () => + { + await session.SaveChangesAsync(); + }); + + ex.Message.ShouldContain(MartenCommandException.MaybeLockedRowsMessage, + StringComparisonOption.Default); + } + } + + [Fact] + public async Task append_exclusive_sad_path_with_concurrency_issue_2() + { + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + // Fetch the expected version + await theSession.Events.AppendExclusive(streamId, new CEvent(), new BEvent()); + + await using (var session = theStore.LightweightSession(new SessionOptions{Timeout = 1})) + { + await Should.ThrowAsync(async () => + { + await session.Events.AppendExclusive(streamId, new DEvent()); + }); + } + } + } + + + public class append_events_optimistic_or_exclusive_with_string_identity: OneOffConfigurationsContext + { + public append_events_optimistic_or_exclusive_with_string_identity() + { + StoreOptions(x => x.Events.StreamIdentity = StreamIdentity.AsString); + theStore.Advanced.Clean.DeleteAllEventData(); + } + + [Fact] + public async Task append_optimistic_sad_path_because_the_stream_does_not_already_exist() + { + var streamId = Guid.NewGuid().ToString(); + var ex = await Should.ThrowAsync(async () => + { + await theSession.Events.AppendOptimistic(streamId, new AEvent(), new BEvent()); + }); + + ex.Id.ShouldBe(streamId); + } + + [Fact] + public async Task append_optimistic_happy_path() + { + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + await theSession.Events.AppendOptimistic(streamId, new CEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + var state = await theSession.Events.FetchStreamStateAsync(streamId); + state.Version.ShouldBe(4); + } + + [Fact] + public async Task append_optimistic_sad_path_with_concurrency_issue() + { + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + // Fetch the expected version + await theSession.Events.AppendOptimistic(streamId, new CEvent(), new BEvent()); + + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, new DEvent()); + await session.SaveChangesAsync(); + } + + // Should fail a concurrency check + await Should.ThrowAsync(async () => + { + await theSession.SaveChangesAsync(); + }); + } + + [Fact] + public async Task append_exclusive_sad_path_because_the_stream_does_not_already_exist() + { + var streamId = Guid.NewGuid().ToString(); + var ex = await Should.ThrowAsync(async () => + { + await theSession.Events.AppendExclusive(streamId, new AEvent(), new BEvent()); + }); + + ex.Id.ShouldBe(streamId); + } + + [Fact] + public async Task append_exclusive_happy_path() + { + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + await theSession.Events.AppendExclusive(streamId, new CEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + var state = await theSession.Events.FetchStreamStateAsync(streamId); + state.Version.ShouldBe(4); + } + + [Fact] + public async Task append_exclusive_sad_path_with_concurrency_issue() + { + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + // Fetch the expected version + await theSession.Events.AppendExclusive(streamId, new CEvent(), new BEvent()); + + await using (var session = theStore.LightweightSession(new SessionOptions{Timeout = 1})) + { + session.Events.Append(streamId, new DEvent()); + var ex = await Should.ThrowAsync(async () => + { + await session.SaveChangesAsync(); + }); + + ex.Message.ShouldContain(MartenCommandException.MaybeLockedRowsMessage, + StringComparisonOption.Default); + } + } + + [Fact] + public async Task append_exclusive_sad_path_with_concurrency_issue_2() + { + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent()); + await theSession.SaveChangesAsync(); + + // Fetch the expected version + await theSession.Events.AppendExclusive(streamId, new CEvent(), new BEvent()); + + await using (var session = theStore.LightweightSession(new SessionOptions{Timeout = 1})) + { + await Should.ThrowAsync(async () => + { + await session.Events.AppendExclusive(streamId, new DEvent()); + }); + } + } + } +} diff --git a/src/EventSourcingTests/QuickAppend/quick_appending_events_workflow_specs.cs b/src/EventSourcingTests/QuickAppend/quick_appending_events_workflow_specs.cs new file mode 100644 index 0000000000..59b351b809 --- /dev/null +++ b/src/EventSourcingTests/QuickAppend/quick_appending_events_workflow_specs.cs @@ -0,0 +1,413 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using JasperFx.Core.Reflection; +using Marten; +using Marten.Events; +using Marten.Events.CodeGeneration; +using Marten.Events.Operations; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Internal.Sessions; +using Marten.Services; +using Marten.Storage; +using Marten.Testing; +using Marten.Testing.Harness; +using Shouldly; +using Weasel.Core; +using Weasel.Postgresql; +using Xunit; +using Xunit.Abstractions; + +namespace EventSourcingTests.QuickAppend; + +[Collection("v4events")] +public class quick_appending_events_workflow_specs +{ + private readonly ITestOutputHelper _output; + + public quick_appending_events_workflow_specs(ITestOutputHelper output) + { + _output = output; + + } + + public class EventMetadataChecker : DocumentSessionListenerBase + { + public override Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) + { + var events = commit.GetEvents(); + foreach (var @event in events) + { + + @event.TenantId.ShouldNotBeNull(); + @event.Timestamp.ShouldNotBe(DateTime.MinValue); + } + + return Task.CompletedTask; + } + } + + [Theory] + [MemberData(nameof(Data))] + public async Task can_fetch_stream_async(TestCase @case) + { + await @case.Store.Advanced.Clean.CompletelyRemoveAllAsync(); + @case.StartNewStream(new TestOutputMartenLogger(_output)); + await using var query = @case.Store.QuerySession(); + + var builder = EventDocumentStorageGenerator.GenerateStorage(@case.Store.Options); + var handler = builder.QueryForStream(@case.ToEventStream()); + + var state = await query.As().ExecuteHandlerAsync(handler, CancellationToken.None); + state.ShouldNotBeNull(); + } + + [Theory] + [MemberData(nameof(Data))] + public void can_fetch_stream_sync(TestCase @case) + { + @case.Store.Advanced.Clean.CompletelyRemoveAll(); + @case.StartNewStream(); + using var query = @case.Store.QuerySession(); + + var builder = EventDocumentStorageGenerator.GenerateStorage(@case.Store.Options); + var handler = builder.QueryForStream(@case.ToEventStream()); + + var state = query.As().ExecuteHandler(handler); + state.ShouldNotBeNull(); + } + + [Theory] + [MemberData(nameof(Data))] + public async Task can_insert_a_new_stream(TestCase @case) + { + // This is just forcing the store to start the event storage + await @case.Store.Advanced.Clean.CompletelyRemoveAllAsync(); + @case.StartNewStream(); + + var stream = @case.CreateNewStream(); + var builder = EventDocumentStorageGenerator.GenerateStorage(@case.Store.Options); + var op = builder.InsertStream(stream); + + await using var session = @case.Store.LightweightSession(); + session.QueueOperation(op); + + await session.SaveChangesAsync(); + } + + [Theory] + [MemberData(nameof(Data))] + public async Task can_update_the_version_of_an_existing_stream_happy_path(TestCase @case) + { + @case.Store.Advanced.Clean.CompletelyRemoveAll(); + var stream = @case.StartNewStream(new TestOutputMartenLogger(_output)); + + stream.ExpectedVersionOnServer = 4; + stream.Version = 10; + + var builder = EventDocumentStorageGenerator.GenerateStorage(@case.Store.Options); + var op = builder.UpdateStreamVersion(stream); + + await using var session = @case.Store.LightweightSession(); + session.QueueOperation(op); + + session.Logger = new TestOutputMartenLogger(_output); + await session.SaveChangesAsync(); + + var handler = builder.QueryForStream(stream); + var state = session.As().ExecuteHandler(handler); + + state.Version.ShouldBe(10); + } + + [Theory] + [MemberData(nameof(Data))] + public async Task can_update_the_version_of_an_existing_stream_sad_path(TestCase @case) + { + @case.Store.Advanced.Clean.CompletelyRemoveAll(); + var stream = @case.StartNewStream(); + + stream.ExpectedVersionOnServer = 3; // it's actually 4, so this should fail + stream.Version = 10; + + var builder = EventDocumentStorageGenerator.GenerateStorage(@case.Store.Options); + var op = builder.UpdateStreamVersion(stream); + + await using var session = @case.Store.LightweightSession(); + session.QueueOperation(op); + + await Should.ThrowAsync(() => session.SaveChangesAsync()); + } + + [Theory] + [MemberData(nameof(Data))] + public async Task can_establish_the_tombstone_stream_from_scratch(TestCase @case) + { + await @case.Store.Advanced.Clean.CompletelyRemoveAllAsync(); + await @case.Store.EnsureStorageExistsAsync(typeof(IEvent)); + + var operation = new EstablishTombstoneStream(@case.Store.Events, Tenancy.DefaultTenantId); + await using var session = (DocumentSessionBase)@case.Store.LightweightSession(); + + var batch = new UpdateBatch(new []{operation}); + await session.ExecuteBatchAsync(batch, CancellationToken.None); + + if (@case.Store.Events.StreamIdentity == StreamIdentity.AsGuid) + { + (await session.Events.FetchStreamStateAsync(EstablishTombstoneStream.StreamId)).ShouldNotBeNull(); + } + else + { + (await session.Events.FetchStreamStateAsync(EstablishTombstoneStream.StreamKey)).ShouldNotBeNull(); + } + } + + + [Theory] + [MemberData(nameof(Data))] + public async Task can_re_run_the_tombstone_stream(TestCase @case) + { + await @case.Store.Advanced.Clean.CompletelyRemoveAllAsync(); + await @case.Store.EnsureStorageExistsAsync(typeof(IEvent)); + + var operation = new EstablishTombstoneStream(@case.Store.Events, Tenancy.DefaultTenantId); + await using var session = (DocumentSessionBase)@case.Store.LightweightSession(); + + var batch = new UpdateBatch(new []{operation}); + + await session.ExecuteBatchAsync(batch, CancellationToken.None); + await session.ExecuteBatchAsync(batch, CancellationToken.None); + + if (@case.Store.Events.StreamIdentity == StreamIdentity.AsGuid) + { + (await session.Events.FetchStreamStateAsync(EstablishTombstoneStream.StreamId)).ShouldNotBeNull(); + } + else + { + (await session.Events.FetchStreamStateAsync(EstablishTombstoneStream.StreamKey)).ShouldNotBeNull(); + } + } + + [Theory] + [MemberData(nameof(Data))] + public async Task exercise_tombstone_workflow_async(TestCase @case) + { + await @case.Store.Advanced.Clean.CompletelyRemoveAllAsync(); + + await using var session = @case.Store.LightweightSession(); + + if (@case.Store.Events.StreamIdentity == StreamIdentity.AsGuid) + { + session.Events.Append(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent()); + } + else + { + session.Events.Append(Guid.NewGuid().ToString(), new AEvent(), new BEvent(), new CEvent()); + } + + + session.QueueOperation(new FailingOperation()); + + await Should.ThrowAsync(async () => + { + await session.SaveChangesAsync(); + }); + + await using var session2 = @case.Store.LightweightSession(); + + if (@case.Store.Events.StreamIdentity == StreamIdentity.AsGuid) + { + (await session2.Events.FetchStreamStateAsync(EstablishTombstoneStream.StreamId)).ShouldNotBeNull(); + + var events = await session2.Events.FetchStreamAsync(EstablishTombstoneStream.StreamId); + events.Any().ShouldBeTrue(); + foreach (var @event in events) + { + @event.Data.ShouldBeOfType(); + } + } + else + { + (await session2.Events.FetchStreamStateAsync(EstablishTombstoneStream.StreamKey)).ShouldNotBeNull(); + + var events = await session2.Events.FetchStreamAsync(EstablishTombstoneStream.StreamKey); + events.Any().ShouldBeTrue(); + foreach (var @event in events) + { + @event.Data.ShouldBeOfType(); + } + } + } + + + public static IEnumerable Data() + { + return cases().Select(x => new object[] {x}); + } + + private static IEnumerable cases() + { + yield return new TestCase("Streams as Guid, Vanilla", e => e.StreamIdentity = StreamIdentity.AsGuid); + yield return new TestCase("Streams as String, Vanilla", e => e.StreamIdentity = StreamIdentity.AsString); + + yield return new TestCase("Streams as Guid, Multi-tenanted", e => + { + e.StreamIdentity = StreamIdentity.AsGuid; + e.TenancyStyle = TenancyStyle.Conjoined; + }); + + yield return new TestCase("Streams as String, Multi-tenanted", e => + { + e.StreamIdentity = StreamIdentity.AsString; + e.TenancyStyle = TenancyStyle.Conjoined; + }); + } + + public class TestCase : IDisposable + { + private readonly string _description; + private readonly Lazy _store; + + public TestCase(string description, Action config) + { + _description = description; + + _store = new Lazy(() => + { + var store = DocumentStore.For(opts => + { + config(opts.EventGraph); + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "v4events"; + opts.AutoCreateSchemaObjects = AutoCreate.All; + + opts.Events.AppendMode = EventAppendMode.Quick; + }); + + store.Advanced.Clean.CompletelyRemoveAll(); + + return store; + }); + + StreamId = Guid.NewGuid(); + TenantId = "KC"; + } + + internal DocumentStore Store => _store.Value; + + public StreamAction StartNewStream(IMartenSessionLogger logger = null) + { + var events = new object[] {new AEvent(), new BEvent(), new CEvent(), new DEvent()}; + using var session = Store.Events.TenancyStyle == TenancyStyle.Conjoined + ? Store.LightweightSession(TenantId) + : Store.LightweightSession(); + + session.Listeners.Add(new EventMetadataChecker()); + + if (logger != null) + { + session.Logger = logger; + } + + if (Store.Events.StreamIdentity == StreamIdentity.AsGuid) + { + session.Events.StartStream(StreamId, events); + session.SaveChanges(); + + var stream = StreamAction.Append(Store.Events, StreamId); + stream.Version = 4; + stream.TenantId = TenantId; + + return stream; + } + else + { + session.Events.StartStream(StreamId.ToString(), events); + session.SaveChanges(); + + var stream = StreamAction.Start(Store.Events, StreamId.ToString(), new AEvent()); + stream.Version = 4; + stream.TenantId = TenantId; + + return stream; + } + } + + public StreamAction CreateNewStream() + { + var events = new IEvent[] {new Event(new AEvent())}; + var stream = Store.Events.StreamIdentity == StreamIdentity.AsGuid ? StreamAction.Start(Guid.NewGuid(), events) : StreamAction.Start(Guid.NewGuid().ToString(), events); + + stream.TenantId = TenantId; + stream.Version = 1; + + return stream; + } + + public string TenantId { get; set; } + + public Guid StreamId { get; } + + public void Dispose() + { + if (_store.IsValueCreated) + { + _store.Value.Dispose(); + } + } + + public override string ToString() + { + return _description; + } + + public StreamAction ToEventStream() + { + if (Store.Events.StreamIdentity == StreamIdentity.AsGuid) + { + var stream = StreamAction.Start(Store.Events, StreamId, new AEvent()); + stream.TenantId = TenantId; + + return stream; + } + else + { + var stream = StreamAction.Start(Store.Events, StreamId.ToString(), new AEvent()); + stream.TenantId = TenantId; + + return stream; + } + } + } + + public class FailingOperation: IStorageOperation + { + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append("select 1"); + } + + public Type DocumentType => null; + public void Postprocess(DbDataReader reader, IList exceptions) + { + throw new DivideByZeroException("Boom!"); + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + exceptions.Add(new DivideByZeroException("Boom!")); + return Task.CompletedTask; + } + + public OperationRole Role() + { + return OperationRole.Other; + } + } +} diff --git a/src/Marten/Events/Archiving/IsArchivedColumn.cs b/src/Marten/Events/Archiving/IsArchivedColumn.cs index 8f1b16ca35..6cb985354d 100644 --- a/src/Marten/Events/Archiving/IsArchivedColumn.cs +++ b/src/Marten/Events/Archiving/IsArchivedColumn.cs @@ -25,8 +25,13 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, method.AssignMemberFromReaderAsync(null, index, x => x.IsArchived); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { throw new NotSupportedException(); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } diff --git a/src/Marten/Events/CodeGeneration/EventDocumentStorageGenerator.cs b/src/Marten/Events/CodeGeneration/EventDocumentStorageGenerator.cs index 6ac7b2c1c6..3bb9768114 100644 --- a/src/Marten/Events/CodeGeneration/EventDocumentStorageGenerator.cs +++ b/src/Marten/Events/CodeGeneration/EventDocumentStorageGenerator.cs @@ -58,10 +58,7 @@ public static GeneratedType AssembleTypes(StoreOptions options, GeneratedAssembl buildSelectorMethods(options, builderType); - var appendEventOperationType = buildAppendEventOperation(options.EventGraph, assembly); - - builderType.MethodFor(nameof(EventDocumentStorage.AppendEvent)) - .Frames.ReturnNewGeneratedTypeObject(appendEventOperationType, "stream", "e"); + buildAppendEventOperations(options, assembly, builderType); buildInsertStream(builderType, assembly, options.EventGraph); @@ -74,6 +71,23 @@ public static GeneratedType AssembleTypes(StoreOptions options, GeneratedAssembl return builderType; } + private static void buildAppendEventOperations(StoreOptions options, GeneratedAssembly assembly, + GeneratedType builderType) + { + var appendEventOperationType = buildAppendEventOperation(options.EventGraph, assembly, AppendMode.Full); + builderType.MethodFor(nameof(EventDocumentStorage.AppendEvent)) + .Frames.ReturnNewGeneratedTypeObject(appendEventOperationType, "stream", "e"); + + var quickAppendEventGivenVersion = + buildAppendEventOperation(options.EventGraph, assembly, AppendMode.QuickWithVersion); + builderType.MethodFor(nameof(EventDocumentStorage.QuickAppendEventWithVersion)) + .Frames.ReturnNewGeneratedTypeObject(quickAppendEventGivenVersion, "stream", "e"); + + var quickAppend = buildQuickAppendOperation(options.EventGraph, assembly); + builderType.MethodFor(nameof(EventDocumentStorage.QuickAppendEvents)) + .Frames.ReturnNewGeneratedTypeObject(quickAppend, "stream"); + } + private static void buildSelectorMethods(StoreOptions options, GeneratedType builderType) { var sync = builderType.MethodFor(nameof(EventDocumentStorage.ApplyReaderDataToEvent)); @@ -245,9 +259,17 @@ private static void buildConfigureCommandMethodForStreamState(EventGraph graph, } } - private static GeneratedType buildAppendEventOperation(EventGraph graph, GeneratedAssembly assembly) + private static GeneratedType buildAppendEventOperation(EventGraph graph, GeneratedAssembly assembly, + AppendMode mode) { - var operationType = assembly.AddType("AppendEventOperation", typeof(AppendEventOperationBase)); + var typeName = "AppendEventOperation"; + if (mode != AppendMode.Full) + { + typeName += mode.ToString(); + } + + var baseType = typeof(AppendEventOperationBase); + var operationType = assembly.AddType(typeName, baseType); var configure = operationType.MethodFor(nameof(AppendEventOperationBase.ConfigureCommand)); configure.DerivedVariables.Add(new Variable(typeof(IEvent), nameof(AppendEventOperationBase.Event))); @@ -258,8 +280,13 @@ private static GeneratedType buildAppendEventOperation(EventGraph graph, Generat // Hokey, use an explicit model for writeable vs readable columns some day .Where(x => !(x is IsArchivedColumn)).ToList(); + // Hokey, but we need to move Sequence to the end + var sequence = columns.OfType().Single(); + columns.Remove(sequence); + columns.Add(sequence); + var sql = - $"insert into {graph.DatabaseSchemaName}.mt_events ({columns.Select(x => x.Name).Join(", ")}) values ({columns.Select(_ => "?").Join(", ")})"; + $"insert into {graph.DatabaseSchemaName}.mt_events ({columns.Select(x => x.Name).Join(", ")}) values ({columns.Select(c => c.ValueSql(graph, mode)).Join(", ")})"; operationType.AddStringConstant("SQL", sql); @@ -268,7 +295,77 @@ private static GeneratedType buildAppendEventOperation(EventGraph graph, Generat for (var i = 0; i < columns.Count; i++) { - columns[i].GenerateAppendCode(configure, graph, i); + columns[i].GenerateAppendCode(configure, graph, i, mode); + } + + return operationType; + } + + private static GeneratedType buildQuickAppendOperation(EventGraph graph, GeneratedAssembly assembly) + { + var operationType = assembly.AddType("QuickAppendEventsOperation", typeof(QuickAppendEventsOperationBase)); + + var table = new EventsTable(graph); + var parameterList = ""; + + + var index = 6; + int causationIndex = 0; + int correlationIndex = 0; + int headerIndex = 0; + if (table.Columns.OfType().Any()) + { + parameterList += ", ?"; + causationIndex = ++index; + } + + if (table.Columns.OfType().Any()) + { + parameterList += ", ?"; + correlationIndex = ++index; + } + + if (table.Columns.OfType().Any()) + { + parameterList += ", ?"; + headerIndex = ++index; + } + + var sql = + $"select {graph.DatabaseSchemaName}.mt_quick_append_events(?, ?, ?, ?, ?, ?, ?{parameterList})"; + + operationType.AddStringConstant("SQL", sql); + + var configure = operationType.MethodFor(nameof(QuickAppendEventsOperationBase.ConfigureCommand)); + configure.DerivedVariables.Add(new Variable(typeof(StreamAction), nameof(QuickAppendEventsOperationBase.Stream))); + + configure.Frames.Code($"var parameters = {{0}}.{nameof(CommandBuilder.AppendWithParameters)}(SQL);", + Use.Type()); + + if (graph.StreamIdentity == StreamIdentity.AsGuid) + { + configure.Frames.Code("writeId(parameters);"); + } + else + { + configure.Frames.Code("writeKey(parameters);"); + } + + configure.Frames.Code("writeBasicParameters(parameters, session);"); + + if (causationIndex > 0) + { + configure.Frames.Code($"writeCausationIds({causationIndex}, parameters);"); + } + + if (correlationIndex > 0) + { + configure.Frames.Code($"writeCorrelationIds({correlationIndex}, parameters);"); + } + + if (headerIndex > 0) + { + configure.Frames.Code($"writeHeaders({headerIndex}, parameters, session);"); } return operationType; diff --git a/src/Marten/Events/EventDocumentStorage.cs b/src/Marten/Events/EventDocumentStorage.cs index a793490585..971d084eb8 100644 --- a/src/Marten/Events/EventDocumentStorage.cs +++ b/src/Marten/Events/EventDocumentStorage.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using JasperFx.Core; using Marten.Events.Archiving; +using Marten.Events.Operations; using Marten.Events.Schema; using Marten.Exceptions; using Marten.Internal; @@ -222,6 +223,12 @@ public abstract IStorageOperation AppendEvent(EventGraph events, IMartenSession public abstract IStorageOperation InsertStream(StreamAction stream); public abstract IQueryHandler QueryForStream(StreamAction stream); public abstract IStorageOperation UpdateStreamVersion(StreamAction stream); + public IStorageOperation IncrementStreamVersion(StreamAction stream) + { + return Events.StreamIdentity == StreamIdentity.AsGuid + ? new IncrementStreamVersionById(Events, stream) + : new IncrementStreamVersionByKey(Events, stream); + } public IEvent Resolve(DbDataReader reader) { @@ -292,4 +299,16 @@ private EventMapping eventMappingForDotNetTypeName(string dotnetTypeName, string return Events.EventMappingFor(type); } + public virtual IStorageOperation + QuickAppendEventWithVersion(EventGraph events, IMartenSession session, StreamAction stream, IEvent e) + { + throw new NotSupportedException( + "You will have to re-generate the Marten code before the \"quick append events\" feature is available"); + } + + public virtual IStorageOperation QuickAppendEvents(StreamAction stream) + { + throw new NotSupportedException( + "You will have to re-generate the Marten code before the \"quick append events\" feature is available"); + } } diff --git a/src/Marten/Events/EventGraph.FeatureSchema.cs b/src/Marten/Events/EventGraph.FeatureSchema.cs index 57fa96166a..de4128aea8 100644 --- a/src/Marten/Events/EventGraph.FeatureSchema.cs +++ b/src/Marten/Events/EventGraph.FeatureSchema.cs @@ -56,9 +56,11 @@ private IEnumerable createAllSchemaObjects() yield return new EventProgressionTable(DatabaseSchemaName); yield return new SystemFunction(DatabaseSchemaName, "mt_mark_event_progression", "varchar, bigint"); - yield return Function.ForRemoval(new PostgresqlObjectName(DatabaseSchemaName, "mt_append_event")); yield return new ArchiveStreamFunction(this); + yield return new QuickAppendEventFunction(this); + + foreach (var schemaSource in Options.Projections.All.OfType()) { var objects = schemaSource.CreateSchemaObjects(this); diff --git a/src/Marten/Events/EventGraph.Processing.cs b/src/Marten/Events/EventGraph.Processing.cs index c340e295f4..e131b2dc1d 100644 --- a/src/Marten/Events/EventGraph.Processing.cs +++ b/src/Marten/Events/EventGraph.Processing.cs @@ -5,7 +5,6 @@ using System.Threading.Tasks; using Marten.Events.Daemon.Resiliency; using Marten.Events.Operations; -using Marten.Exceptions; using Marten.Internal; using Marten.Internal.Operations; using Marten.Internal.Sessions; @@ -14,16 +13,47 @@ namespace Marten.Events; +public enum EventAppendMode +{ + /// + /// Default behavior that ensures that all inline projections will have full access to all event + /// metadata including intended event sequences, versions, and timestamps + /// + Rich, + + /// + /// Stripped down, more performant mode of appending events that will omit some event metadata within + /// inline projections + /// + Quick +} + public partial class EventGraph { private RetryBlock _tombstones; private async Task executeTombstoneBlock(UpdateBatch batch, CancellationToken cancellationToken) { - await using var session = (DocumentSessionBase)(batch.TenantId.IsEmpty() ? _store.LightweightSession() : _store.LightweightSession(batch.TenantId!)); + await using var session = (DocumentSessionBase)(batch.TenantId.IsEmpty() + ? _store.LightweightSession() + : _store.LightweightSession(batch.TenantId!)); await session.ExecuteBatchAsync(batch, cancellationToken).ConfigureAwait(false); } + internal IEventAppender EventAppender { get; set; } = new RichEventAppender(); + + public EventAppendMode AppendMode + { + get + { + return EventAppender is RichEventAppender ? EventAppendMode.Rich : EventAppendMode.Quick; + } + set + { + EventAppender = value == EventAppendMode.Quick ? new QuickEventAppender() : new RichEventAppender(); + } + } + internal void ProcessEvents(DocumentSessionBase session) { if (!session.WorkTracker.Streams.Any()) @@ -36,50 +66,7 @@ internal void ProcessEvents(DocumentSessionBase session) session.Database.EnsureStorageExists(typeof(IEvent)); } - var storage = session.EventStorage(); - - var fetcher = new EventSequenceFetcher(this, session.WorkTracker.Streams.Sum(x => x.Events.Count)); - var sequences = session.ExecuteHandler(fetcher); - - - foreach (var stream in session.WorkTracker.Streams.Where(x => x.Events.Any())) - { - stream.TenantId ??= session.TenantId; - - if (stream.ActionType == StreamActionType.Start) - { - stream.PrepareEvents(0, this, sequences, session); - session.QueueOperation(storage.InsertStream(stream)); - } - else - { - var handler = storage.QueryForStream(stream); - var state = session.ExecuteHandler(handler); - - if (state == null) - { - stream.PrepareEvents(0, this, sequences, session); - session.QueueOperation(storage.InsertStream(stream)); - } - else - { - if (state.IsArchived) - { - throw new InvalidStreamOperationException( - $"Attempted to append event to archived stream with Id '{state.Id}'."); - } - - stream.PrepareEvents(state.Version, this, sequences, session); - session.QueueOperation(storage.UpdateStreamVersion(stream)); - } - } - - foreach (var @event in stream.Events) - session.QueueOperation(storage.AppendEvent(this, session, stream, @event)); - } - - foreach (var projection in _inlineProjections.Value) - projection.Apply(session, session.WorkTracker.Streams.ToList()); + EventAppender.ProcessEvents(this, session, _inlineProjections.Value); } internal async Task ProcessEventsAsync(DocumentSessionBase session, CancellationToken token) @@ -94,54 +81,7 @@ internal async Task ProcessEventsAsync(DocumentSessionBase session, Cancellation await session.Database.EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); } - var fetcher = new EventSequenceFetcher(this, session.WorkTracker.Streams.Sum(x => x.Events.Count)); - var sequences = await session.ExecuteHandlerAsync(fetcher, token).ConfigureAwait(false); - - - var storage = session.EventStorage(); - - foreach (var stream in session.WorkTracker.Streams.Where(x => x.Events.Any())) - { - stream.TenantId ??= session.TenantId; - - if (stream.ActionType == StreamActionType.Start) - { - stream.PrepareEvents(0, this, sequences, session); - session.QueueOperation(storage.InsertStream(stream)); - } - else - { - var handler = storage.QueryForStream(stream); - var state = await session.ExecuteHandlerAsync(handler, token).ConfigureAwait(false); - - if (state == null) - { - stream.PrepareEvents(0, this, sequences, session); - session.QueueOperation(storage.InsertStream(stream)); - } - else - { - if (state.IsArchived) - { - throw new InvalidStreamOperationException( - $"Attempted to append event to archived stream with Id '{state.Id}'."); - } - - stream.PrepareEvents(state.Version, this, sequences, session); - session.QueueOperation(storage.UpdateStreamVersion(stream)); - } - } - - foreach (var @event in stream.Events) - { - session.QueueOperation(storage.AppendEvent(this, session, stream, @event)); - } - } - - foreach (var projection in _inlineProjections.Value) - { - await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); - } + await EventAppender.ProcessEventsAsync(this, session, _inlineProjections.Value, token).ConfigureAwait(false); } internal bool TryCreateTombstoneBatch(DocumentSessionBase session, out UpdateBatch batch) @@ -174,10 +114,7 @@ internal bool TryCreateTombstoneBatch(DocumentSessionBase session, out UpdateBat operations.AddRange(tombstones); - batch = new UpdateBatch(operations) - { - TenantId = session.TenantId - }; + batch = new UpdateBatch(operations) { TenantId = session.TenantId }; return true; } diff --git a/src/Marten/Events/IEventAppender.cs b/src/Marten/Events/IEventAppender.cs new file mode 100644 index 0000000000..d9268df446 --- /dev/null +++ b/src/Marten/Events/IEventAppender.cs @@ -0,0 +1,15 @@ +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Projections; +using Marten.Internal.Sessions; + +namespace Marten.Events; + +internal interface IEventAppender +{ + void ProcessEvents(EventGraph eventGraph, DocumentSessionBase session, + IProjection[] inlineProjectionsValue); + + Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase session, + IProjection[] inlineProjections, CancellationToken token); +} diff --git a/src/Marten/Events/IEventStorage.cs b/src/Marten/Events/IEventStorage.cs index 646f196662..80e81dbe75 100644 --- a/src/Marten/Events/IEventStorage.cs +++ b/src/Marten/Events/IEventStorage.cs @@ -42,4 +42,17 @@ public interface IEventStorage: ISelector, IDocumentStorage /// /// IStorageOperation UpdateStreamVersion(StreamAction stream); + + /// + /// Create a storage operation to just increment the existing stream + /// based on the number of the events being appended + /// + /// + /// + IStorageOperation IncrementStreamVersion(StreamAction stream); + + IStorageOperation QuickAppendEvents(StreamAction stream); + + IStorageOperation QuickAppendEventWithVersion(EventGraph events, IMartenSession session, StreamAction stream, + IEvent e); } diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index f48cc34861..47d4aef125 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -51,6 +51,8 @@ public interface IEventStoreOptions /// public bool EnableUniqueIndexOnEventId { get; set; } + public EventAppendMode AppendMode { get; set; } + /// /// Register an event type with Marten. This isn't strictly necessary for normal usage, /// but can help Marten with asynchronous projections where Marten hasn't yet encountered diff --git a/src/Marten/Events/IReadOnlyEventStoreOptions.cs b/src/Marten/Events/IReadOnlyEventStoreOptions.cs index a1eb7898bd..93f700f7cc 100644 --- a/src/Marten/Events/IReadOnlyEventStoreOptions.cs +++ b/src/Marten/Events/IReadOnlyEventStoreOptions.cs @@ -41,6 +41,8 @@ public interface IReadOnlyEventStoreOptions /// bool EnableUniqueIndexOnEventId { get; set; } + EventAppendMode AppendMode { get; set; } + /// /// Configuration for all event store projections /// diff --git a/src/Marten/Events/Operations/AppendEventOperationBase.cs b/src/Marten/Events/Operations/AppendEventOperationBase.cs index bb520fd3f8..d0cf9b7990 100644 --- a/src/Marten/Events/Operations/AppendEventOperationBase.cs +++ b/src/Marten/Events/Operations/AppendEventOperationBase.cs @@ -45,3 +45,4 @@ public override string ToString() return $"Insert Event to Stream {Stream.Key ?? Stream.Id.ToString()}, Version {Event.Version}"; } } + diff --git a/src/Marten/Events/Operations/IncrementStreamVersionById.cs b/src/Marten/Events/Operations/IncrementStreamVersionById.cs new file mode 100644 index 0000000000..713a56ea5a --- /dev/null +++ b/src/Marten/Events/Operations/IncrementStreamVersionById.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal; +using Marten.Internal.Operations; +using Weasel.Postgresql; + +namespace Marten.Events.Operations; + +internal class IncrementStreamVersionById: IStorageOperation +{ + private readonly EventGraph _events; + + public IncrementStreamVersionById(EventGraph events, StreamAction stream) + { + _events = events; + Stream = stream; + } + + public StreamAction Stream { get; } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append("update "); + builder.Append(_events.DatabaseSchemaName); + builder.Append(".mt_events set version = version + "); + builder.Append(Stream.Events.Count.ToString()); + builder.Append(" where id = "); + builder.AppendParameter(Stream.Id); + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() => OperationRole.Events; +} \ No newline at end of file diff --git a/src/Marten/Events/Operations/IncrementStreamVersionByKey.cs b/src/Marten/Events/Operations/IncrementStreamVersionByKey.cs new file mode 100644 index 0000000000..eb758a7738 --- /dev/null +++ b/src/Marten/Events/Operations/IncrementStreamVersionByKey.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal; +using Marten.Internal.Operations; +using Weasel.Postgresql; + +namespace Marten.Events.Operations; + +internal class IncrementStreamVersionByKey: IStorageOperation +{ + private readonly EventGraph _events; + + public IncrementStreamVersionByKey(EventGraph events, StreamAction stream) + { + _events = events; + Stream = stream; + } + + public StreamAction Stream { get; } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append("update "); + builder.Append(_events.DatabaseSchemaName); + builder.Append(".mt_events set version = version + "); + builder.Append(Stream.Events.Count.ToString()); + builder.Append(" where id = "); + builder.AppendParameter(Stream.Key); + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() => OperationRole.Events; +} \ No newline at end of file diff --git a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs new file mode 100644 index 0000000000..7cd1128179 --- /dev/null +++ b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using Marten.Internal; +using Marten.Internal.Operations; +using Npgsql; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Operations; + +public abstract class QuickAppendEventsOperationBase : IStorageOperation +{ + public QuickAppendEventsOperationBase(StreamAction stream) + { + Stream = stream; + } + + public StreamAction Stream { get; } + + public OperationRole Role() + { + return OperationRole.Events; + } + + public Type DocumentType => typeof(IEvent); + + public override string ToString() + { + return $"Append {Stream.Events.Select(x => x.EventTypeName).Join(", ")} to event stream {Stream}"; + } + + public abstract void ConfigureCommand(ICommandBuilder builder, IMartenSession session); + + public void Postprocess(DbDataReader reader, IList exceptions) + { + if (reader.Read()) + { + var values = reader.GetFieldValue(0); + + // Ignore the first value + for (int i = 1; i < values.Length; i++) + { + // Only setting the sequence to aid in tombstone processing + Stream.Events[i - 1].Sequence = values[i]; + } + } + } + + protected void writeId(NpgsqlParameter[] parameters) + { + parameters[0].NpgsqlDbType = NpgsqlDbType.Uuid; + parameters[0].Value = Stream.Id; + } + + protected void writeKey(NpgsqlParameter[] parameters) + { + parameters[0].NpgsqlDbType = NpgsqlDbType.Varchar; + parameters[0].Value = Stream.Key; + } + + protected void writeBasicParameters(NpgsqlParameter[] parameters, IMartenSession session) + { + parameters[1].NpgsqlDbType = NpgsqlDbType.Varchar; + parameters[1].Value = Stream.AggregateTypeName.IsEmpty() ? DBNull.Value : Stream.AggregateTypeName; + parameters[2].NpgsqlDbType = NpgsqlDbType.Varchar; + parameters[2].Value = Stream.TenantId; + parameters[3].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Uuid; + parameters[3].Value = Stream.Events.Select(x => x.Id).ToArray(); + parameters[4].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; + parameters[4].Value = Stream.Events.Select(x => x.EventTypeName).ToArray(); + parameters[5].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; + parameters[5].Value = Stream.Events.Select(x => x.DotNetTypeName).ToArray(); + parameters[6].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb; + parameters[6].Value = Stream.Events.Select(e => session.Serializer.ToJson(e.Data)).ToArray(); + } + + protected void writeCausationIds(int index, NpgsqlParameter[] parameters) + { + parameters[index].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; + parameters[index].Value = Stream.Events.Select(x => x.CausationId).ToArray(); + } + + protected void writeCorrelationIds(int index, NpgsqlParameter[] parameters) + { + parameters[index].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; + parameters[index].Value = Stream.Events.Select(x => x.CorrelationId).ToArray(); + } + + protected void writeHeaders(int index, NpgsqlParameter[] parameters, IMartenSession session) + { + parameters[index].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb; + parameters[index].Value = Stream.Events.Select(x => session.Serializer.ToJson(x.Headers)).ToArray(); + } + + public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + var values = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + + // Ignore the first value + for (int i = 1; i < values.Length; i++) + { + // Only setting the sequence to aid in tombstone processing + Stream.Events[i - 1].Sequence = values[i]; + } + } + } +} diff --git a/src/Marten/Events/Operations/UpdateStreamOperations.cs b/src/Marten/Events/Operations/UpdateStreamVersion.cs similarity index 97% rename from src/Marten/Events/Operations/UpdateStreamOperations.cs rename to src/Marten/Events/Operations/UpdateStreamVersion.cs index ead705af21..81abb1445a 100644 --- a/src/Marten/Events/Operations/UpdateStreamOperations.cs +++ b/src/Marten/Events/Operations/UpdateStreamVersion.cs @@ -3,6 +3,7 @@ using System.Data.Common; using System.Threading; using System.Threading.Tasks; +using Marten.Events.Schema; using Marten.Exceptions; using Marten.Internal; using Marten.Internal.Operations; diff --git a/src/Marten/Events/QuickEventAppender.cs b/src/Marten/Events/QuickEventAppender.cs new file mode 100644 index 0000000000..c7372d6b5a --- /dev/null +++ b/src/Marten/Events/QuickEventAppender.cs @@ -0,0 +1,73 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Projections; +using Marten.Internal.Sessions; + +namespace Marten.Events; + +internal class QuickEventAppender: IEventAppender +{ + public void ProcessEvents(EventGraph eventGraph, DocumentSessionBase session, IProjection[] inlineProjections) + { + registerOperationsForStreams(eventGraph, session); + + foreach (var projection in inlineProjections) + { + projection.Apply(session, session.WorkTracker.Streams.ToList()); + } + } + + private static void registerOperationsForStreams(EventGraph eventGraph, DocumentSessionBase session) + { + var storage = session.EventStorage(); + + foreach (var stream in session.WorkTracker.Streams.Where(x => x.Events.Any())) + { + stream.TenantId ??= session.TenantId; + + // Not really using it, just need a stand in + var sequences = new Queue(); + if (stream.ActionType == StreamActionType.Start) + { + stream.PrepareEvents(0, eventGraph, sequences, session); + session.QueueOperation(storage.InsertStream(stream)); + + foreach (var @event in stream.Events) + { + session.QueueOperation(storage.QuickAppendEventWithVersion(eventGraph, session, stream, @event)); + } + } + else + { + if (stream.ExpectedVersionOnServer.HasValue) + { + // We can supply the version to the events going in + stream.PrepareEvents(stream.ExpectedVersionOnServer.Value, eventGraph, sequences, session); + session.QueueOperation(storage.UpdateStreamVersion(stream)); + foreach (var @event in stream.Events) + { + session.QueueOperation(storage.QuickAppendEventWithVersion(eventGraph, session, stream, @event)); + } + } + else + { + stream.PrepareEvents(0, eventGraph, sequences, session); + session.QueueOperation(storage.QuickAppendEvents(stream)); + } + } + } + } + + public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase session, IProjection[] inlineProjections, + CancellationToken token) + { + registerOperationsForStreams(eventGraph, session); + + foreach (var projection in inlineProjections) + { + await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); + } + } +} diff --git a/src/Marten/Events/RichEventAppender.cs b/src/Marten/Events/RichEventAppender.cs new file mode 100644 index 0000000000..5a132d1bcc --- /dev/null +++ b/src/Marten/Events/RichEventAppender.cs @@ -0,0 +1,117 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Projections; +using Marten.Exceptions; +using Marten.Internal.Sessions; + +namespace Marten.Events; + +internal class RichEventAppender: IEventAppender +{ + public void ProcessEvents(EventGraph eventGraph, DocumentSessionBase session, + IProjection[] inlineProjections) + { + var storage = session.EventStorage(); + + var fetcher = new EventSequenceFetcher(eventGraph, session.WorkTracker.Streams.Sum(x => x.Events.Count)); + var sequences = session.ExecuteHandler(fetcher); + + + foreach (var stream in session.WorkTracker.Streams.Where(x => x.Events.Any())) + { + stream.TenantId ??= session.TenantId; + + if (stream.ActionType == StreamActionType.Start) + { + stream.PrepareEvents(0, eventGraph, sequences, session); + session.QueueOperation(storage.InsertStream(stream)); + } + else + { + var handler = storage.QueryForStream(stream); + var state = session.ExecuteHandler(handler); + + if (state == null) + { + stream.PrepareEvents(0, eventGraph, sequences, session); + session.QueueOperation(storage.InsertStream(stream)); + } + else + { + if (state.IsArchived) + { + throw new InvalidStreamOperationException( + $"Attempted to append event to archived stream with Id '{state.Id}'."); + } + + stream.PrepareEvents(state.Version, eventGraph, sequences, session); + session.QueueOperation(storage.UpdateStreamVersion(stream)); + } + } + + foreach (var @event in stream.Events) + { + session.QueueOperation(storage.AppendEvent(eventGraph, session, stream, @event)); + } + } + + foreach (var projection in inlineProjections) + { + projection.Apply(session, session.WorkTracker.Streams.ToList()); + } + } + + public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase session, + IProjection[] inlineProjections, CancellationToken token) + { + var fetcher = new EventSequenceFetcher(eventGraph, session.WorkTracker.Streams.Sum(x => x.Events.Count)); + var sequences = await session.ExecuteHandlerAsync(fetcher, token).ConfigureAwait(false); + + + var storage = session.EventStorage(); + + foreach (var stream in session.WorkTracker.Streams.Where(x => x.Events.Any())) + { + stream.TenantId ??= session.TenantId; + + if (stream.ActionType == StreamActionType.Start) + { + stream.PrepareEvents(0, eventGraph, sequences, session); + session.QueueOperation(storage.InsertStream(stream)); + } + else + { + var handler = storage.QueryForStream(stream); + var state = await session.ExecuteHandlerAsync(handler, token).ConfigureAwait(false); + + if (state == null) + { + stream.PrepareEvents(0, eventGraph, sequences, session); + session.QueueOperation(storage.InsertStream(stream)); + } + else + { + if (state.IsArchived) + { + throw new InvalidStreamOperationException( + $"Attempted to append event to archived stream with Id '{state.Id}'."); + } + + stream.PrepareEvents(state.Version, eventGraph, sequences, session); + session.QueueOperation(storage.UpdateStreamVersion(stream)); + } + } + + foreach (var @event in stream.Events) + { + session.QueueOperation(storage.AppendEvent(eventGraph, session, stream, @event)); + } + } + + foreach (var projection in inlineProjections) + { + await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); + } + } +} diff --git a/src/Marten/Events/Schema/EventJsonDataColumn.cs b/src/Marten/Events/Schema/EventJsonDataColumn.cs index d4a9ecee32..2ae77e93e7 100644 --- a/src/Marten/Events/Schema/EventJsonDataColumn.cs +++ b/src/Marten/Events/Schema/EventJsonDataColumn.cs @@ -24,10 +24,15 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, throw new NotSupportedException(); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.Frames.Code($"parameters[{index}].NpgsqlDbType = {{0}};", NpgsqlDbType.Jsonb); method.Frames.Code($"parameters[{index}].Value = {{0}}.Serializer.ToJson({{1}}.{nameof(IEvent.Data)});", Use.Type(), Use.Type()); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } diff --git a/src/Marten/Events/Schema/EventTableColumn.cs b/src/Marten/Events/Schema/EventTableColumn.cs index 3199366c31..4045b1e3c0 100644 --- a/src/Marten/Events/Schema/EventTableColumn.cs +++ b/src/Marten/Events/Schema/EventTableColumn.cs @@ -18,17 +18,18 @@ namespace Marten.Events.Schema; internal class EventTableColumn: TableColumn, IEventTableColumn { private readonly Expression> _eventMemberExpression; - private readonly MemberInfo _member; public EventTableColumn(string name, Expression> eventMemberExpression): base(name, "varchar") { _eventMemberExpression = eventMemberExpression; - _member = MemberFinder.Determine(eventMemberExpression).Single(); - var memberType = _member.GetMemberType(); + Member = MemberFinder.Determine(eventMemberExpression).Single(); + var memberType = Member.GetMemberType(); Type = PostgresqlProvider.Instance.GetDatabaseType(memberType, EnumStorage.AsInteger); NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(memberType); } + public MemberInfo Member { get; } + public NpgsqlDbType NpgsqlDbType { get; set; } public void GenerateSelectorCodeSync(GeneratedMethod method, EventGraph graph, int index) @@ -47,11 +48,16 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, }); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public virtual void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.Frames.Code($"parameters[{index}].{nameof(NpgsqlParameter.NpgsqlDbType)} = {{0}};", NpgsqlDbType); method.Frames.Code( - $"parameters[{index}].{nameof(NpgsqlParameter.Value)} = {{0}}.{_member.Name};", Use.Type()); + $"parameters[{index}].{nameof(NpgsqlParameter.Value)} = {{0}}.{Member.Name};", Use.Type()); + } + + public virtual string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; } } diff --git a/src/Marten/Events/Schema/EventTypeColumn.cs b/src/Marten/Events/Schema/EventTypeColumn.cs index 8d6bbb2015..e3a0cb4df1 100644 --- a/src/Marten/Events/Schema/EventTypeColumn.cs +++ b/src/Marten/Events/Schema/EventTypeColumn.cs @@ -22,8 +22,13 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, throw new NotSupportedException(); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.SetParameterFromMember(index, x => x.EventTypeName); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } diff --git a/src/Marten/Events/Schema/EventsTable.cs b/src/Marten/Events/Schema/EventsTable.cs index b00bbfe721..c5ee61a60b 100644 --- a/src/Marten/Events/Schema/EventsTable.cs +++ b/src/Marten/Events/Schema/EventsTable.cs @@ -3,7 +3,6 @@ using Marten.Events.Archiving; using Marten.Storage; using Marten.Storage.Metadata; -using Weasel.Core; using Weasel.Postgresql; using Weasel.Postgresql.Tables; @@ -15,11 +14,11 @@ internal class EventsTable: Table { public EventsTable(EventGraph events): base(new PostgresqlObjectName(events.DatabaseSchemaName, "mt_events")) { - AddColumn(new EventTableColumn("seq_id", x => x.Sequence)).AsPrimaryKey(); + AddColumn(new SequenceColumn()).AsPrimaryKey(); AddColumn(new EventTableColumn("id", x => x.Id)).NotNull(); AddColumn(new StreamIdColumn(events)); - AddColumn(new EventTableColumn("version", x => x.Version)).NotNull(); + AddColumn(new VersionColumn()); AddColumn(); AddColumn(); AddColumn(new EventTableColumn("timestamp", x => x.Timestamp)) @@ -33,7 +32,6 @@ public EventsTable(EventGraph events): base(new PostgresqlObjectName(events.Data AddIfActive(events.Metadata.CausationId); AddIfActive(events.Metadata.Headers); - if (events.TenancyStyle == TenancyStyle.Conjoined) { ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id_tenant_id") diff --git a/src/Marten/Events/Schema/IEventTableColumn.cs b/src/Marten/Events/Schema/IEventTableColumn.cs index 2475cfe1f9..cfaff04fed 100644 --- a/src/Marten/Events/Schema/IEventTableColumn.cs +++ b/src/Marten/Events/Schema/IEventTableColumn.cs @@ -2,6 +2,12 @@ namespace Marten.Events.Schema; +public enum AppendMode +{ + Full, + QuickWithVersion +} + /// /// This interface is used by the event store code generation to build the IEventStorage /// @@ -35,5 +41,8 @@ internal interface IEventTableColumn /// /// /// - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index); + /// + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full); + + string ValueSql(EventGraph graph, AppendMode mode); } diff --git a/src/Marten/Events/Schema/QuickAppendEventFunction.cs b/src/Marten/Events/Schema/QuickAppendEventFunction.cs new file mode 100644 index 0000000000..9ec64c20af --- /dev/null +++ b/src/Marten/Events/Schema/QuickAppendEventFunction.cs @@ -0,0 +1,114 @@ +using System.IO; +using System.Linq; +using Marten.Schema; +using Marten.Storage; +using Marten.Storage.Metadata; +using Weasel.Core; +using Weasel.Postgresql.Functions; + +namespace Marten.Events.Schema; + + public class QuickAppendEventFunction : Function + { + private readonly EventGraph _events; + + public QuickAppendEventFunction(EventGraph events) : base(new DbObjectName(events.DatabaseSchemaName, "mt_quick_append_events")) + { + _events = events; + } + + public override void WriteCreateStatement(Migrator migrator, TextWriter writer) + { + var streamIdType = _events.GetStreamIdDBType(); + var databaseSchema = _events.DatabaseSchemaName; + + var tenancyStyle = _events.TenancyStyle; + + var streamsWhere = "id = stream"; + + if (tenancyStyle == TenancyStyle.Conjoined) + { + streamsWhere += " AND tenant_id = tenantid"; + } + + var table = new EventsTable(_events); + var metadataColumns = ""; + var metadataParameters = ""; + var metadataValues = ""; + + if (table.Columns.OfType().Any()) + { + metadataColumns += ", " + CausationIdColumn.ColumnName; + metadataParameters += ", causation_ids varchar[]"; + metadataValues += ", causations_ids[index]"; + } + + if (table.Columns.OfType().Any()) + { + metadataColumns += ", " + CorrelationIdColumn.ColumnName; + metadataParameters += ", correlation_ids varchar[]"; + metadataValues += ", correlation_ids[index]"; + } + + if (table.Columns.OfType().Any()) + { + metadataColumns += ", " + HeadersColumn.ColumnName; + metadataParameters += ", headers jsonb[]"; + metadataValues += ", headers[index]"; + } + + + writer.WriteLine($@" +CREATE OR REPLACE FUNCTION {Identifier}(stream {streamIdType}, stream_type varchar, tenantid varchar, event_ids uuid[], event_types varchar[], dotnet_types varchar[], bodies jsonb[]{metadataParameters}) RETURNS int[] AS $$ +DECLARE + event_version int; + event_type varchar; + event_id uuid; + body jsonb; + index int; + seq int; + actual_tenant varchar; + return_value int[]; +BEGIN + select version into event_version from {databaseSchema}.mt_streams where {streamsWhere}; + if event_version IS NULL then + event_version = 0; + insert into {databaseSchema}.mt_streams (id, type, version, timestamp, tenant_id) values (stream, stream_type, 0, now(), tenantid); + else + if tenantid IS NOT NULL then + select tenant_id into actual_tenant from {databaseSchema}.mt_streams where {streamsWhere}; + if actual_tenant != tenantid then + RAISE EXCEPTION 'The tenantid does not match the existing stream'; + end if; + end if; + end if; + + index := 1; + return_value := ARRAY[event_version + array_length(event_ids, 1)]; + + foreach event_id in ARRAY event_ids + loop + seq := nextval('{databaseSchema}.mt_events_sequence'); + return_value := array_append(return_value, seq); + + event_version := event_version + 1; + event_type = event_types[index]; + body = bodies[index]; + + insert into {databaseSchema}.mt_events + (seq_id, id, stream_id, version, data, type, tenant_id, {SchemaConstants.DotNetTypeColumn}, is_archived{metadataColumns}) + values + (seq, event_id, stream, event_version, body, event_type, tenantid, dotnet_types[index], FALSE{metadataValues}); + + index := index + 1; + end loop; + + update {databaseSchema}.mt_streams set version = event_version, timestamp = now() where {streamsWhere}; + + return return_value; +END +$$ LANGUAGE plpgsql; +"); + } + + } diff --git a/src/Marten/Events/Schema/SequenceColumn.cs b/src/Marten/Events/Schema/SequenceColumn.cs new file mode 100644 index 0000000000..c7c97d7fbf --- /dev/null +++ b/src/Marten/Events/Schema/SequenceColumn.cs @@ -0,0 +1,25 @@ +using JasperFx.CodeGeneration; + +namespace Marten.Events.Schema; + +internal class SequenceColumn: EventTableColumn +{ + public SequenceColumn() : base("seq_id", x => x.Sequence) + { + AllowNulls = false; + } + + public override string ValueSql(EventGraph graph, AppendMode mode) + { + return mode == AppendMode.Full ? base.ValueSql(graph, mode) : $"nextval('{graph.DatabaseSchemaName}.mt_events_sequence')"; + } + + + public override void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode mode) + { + if (mode == AppendMode.Full) + { + base.GenerateAppendCode(method, graph, index, mode); + } + } +} diff --git a/src/Marten/Events/Schema/StreamIdColumn.cs b/src/Marten/Events/Schema/StreamIdColumn.cs index 97ffcc377a..255ae3be99 100644 --- a/src/Marten/Events/Schema/StreamIdColumn.cs +++ b/src/Marten/Events/Schema/StreamIdColumn.cs @@ -35,7 +35,7 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, } } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { if (graph.StreamIdentity == StreamIdentity.AsGuid) { @@ -46,4 +46,9 @@ public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int ind method.SetParameterFromMember(index, x => x.Key); } } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } diff --git a/src/Marten/Events/Schema/VersionColumn.cs b/src/Marten/Events/Schema/VersionColumn.cs new file mode 100644 index 0000000000..b11264cd1f --- /dev/null +++ b/src/Marten/Events/Schema/VersionColumn.cs @@ -0,0 +1,14 @@ +using System.Diagnostics.CodeAnalysis; +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using Npgsql; + +namespace Marten.Events.Schema; + +internal class VersionColumn: EventTableColumn +{ + public VersionColumn() : base("version", x => x.Version) + { + AllowNulls = false; + } +} diff --git a/src/Marten/Events/StreamAction.cs b/src/Marten/Events/StreamAction.cs index ea0b4e4409..4e44f52be9 100644 --- a/src/Marten/Events/StreamAction.cs +++ b/src/Marten/Events/StreamAction.cs @@ -290,7 +290,11 @@ internal void PrepareEvents(long currentVersion, EventGraph graph, Queue s @event.Id = CombGuidIdGeneration.NewGuid(); } - @event.Sequence = sequences.Dequeue(); + if (sequences.TryDequeue(out var sequence)) + { + @event.Sequence = sequence; + } + @event.TenantId = session.TenantId; @event.Timestamp = timestamp; diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs index e0251543d6..5c0e981082 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs @@ -41,7 +41,10 @@ public void SaveChanges() foreach (var operationType in operationDocumentTypes()) Database.EnsureStorageExists(operationType); } - foreach (var listener in Listeners) listener.BeforeSaveChanges(this); + foreach (var listener in Listeners) + { + listener.BeforeSaveChanges(this); + } var batch = new UpdateBatch(_workTracker.AllOperations); @@ -52,7 +55,10 @@ public void SaveChanges() EjectPatchedTypes(_workTracker); Logger.RecordSavedChanges(this, _workTracker); - foreach (var listener in Listeners) listener.AfterCommit(this, _workTracker); + foreach (var listener in Listeners) + { + listener.AfterCommit(this, _workTracker); + } // Need to clear the unit of work here _workTracker.Reset(); diff --git a/src/Marten/Storage/Metadata/CausationIdColumn.cs b/src/Marten/Storage/Metadata/CausationIdColumn.cs index 922139c7b2..08b1e371ca 100644 --- a/src/Marten/Storage/Metadata/CausationIdColumn.cs +++ b/src/Marten/Storage/Metadata/CausationIdColumn.cs @@ -40,7 +40,7 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, }); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.SetParameterFromMember(index, x => x.CausationId); } @@ -68,6 +68,11 @@ public override void WriteMetadataInUpdateStatement(ICommandBuilder builder, Doc builder.Append(" = "); builder.AppendParameter(session.CausationId); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } internal class CausationIdArgument: UpsertArgument @@ -111,4 +116,9 @@ public override void GenerateBulkWriterCodeAsync(GeneratedType type, GeneratedMe load.Frames.CodeAsync("await writer.WriteAsync(\"BULK_INSERT\", {0}, {1});", DbType, Use.Type()); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } diff --git a/src/Marten/Storage/Metadata/CorrelationIdColumn.cs b/src/Marten/Storage/Metadata/CorrelationIdColumn.cs index 87f3e8aeb5..2666a4c863 100644 --- a/src/Marten/Storage/Metadata/CorrelationIdColumn.cs +++ b/src/Marten/Storage/Metadata/CorrelationIdColumn.cs @@ -40,7 +40,7 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, }); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.SetParameterFromMember(index, x => x.CorrelationId); } @@ -68,6 +68,11 @@ public override void WriteMetadataInUpdateStatement(ICommandBuilder builder, Doc builder.Append(" = "); builder.AppendParameter(session.CorrelationId); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } internal class CorrelationIdArgument: UpsertArgument diff --git a/src/Marten/Storage/Metadata/DotNetTypeColumn.cs b/src/Marten/Storage/Metadata/DotNetTypeColumn.cs index 07325b62d6..7399e8ae8d 100644 --- a/src/Marten/Storage/Metadata/DotNetTypeColumn.cs +++ b/src/Marten/Storage/Metadata/DotNetTypeColumn.cs @@ -24,8 +24,13 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, throw new NotSupportedException(); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.SetParameterFromMember(index, x => x.DotNetTypeName); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } diff --git a/src/Marten/Storage/Metadata/HeadersColumn.cs b/src/Marten/Storage/Metadata/HeadersColumn.cs index 4d656b4ced..78ab6ab72e 100644 --- a/src/Marten/Storage/Metadata/HeadersColumn.cs +++ b/src/Marten/Storage/Metadata/HeadersColumn.cs @@ -43,7 +43,7 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, }); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.Frames.Code($"parameters[{index}].NpgsqlDbType = {{0}};", NpgsqlDbType.Jsonb); method.Frames.Code($"parameters[{index}].Value = {{0}}.Serializer.ToJson({{1}}.{nameof(IEvent.Headers)});", @@ -83,6 +83,11 @@ internal override UpsertArgument ToArgument() { return new HeadersArgument(); } + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } } internal class HeadersArgument: UpsertArgument diff --git a/src/Marten/Storage/Metadata/TenantIdColumn.cs b/src/Marten/Storage/Metadata/TenantIdColumn.cs index a7a70318ae..68cbaf73b9 100644 --- a/src/Marten/Storage/Metadata/TenantIdColumn.cs +++ b/src/Marten/Storage/Metadata/TenantIdColumn.cs @@ -33,7 +33,7 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph, }); } - public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index) + public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full) { method.SetParameterFromMember(index, x => x.TenantId); } @@ -81,4 +81,9 @@ void IStreamTableColumn.GenerateSelectorCodeSync(GeneratedMethod method, int ind bool IStreamTableColumn.Reads => true; bool IStreamTableColumn.Writes => true; + + public string ValueSql(EventGraph graph, AppendMode mode) + { + return "?"; + } }