From c0b9f341bd425300c9a548ce210f2c489db07e15 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 17 Jul 2024 22:01:03 -0500 Subject: [PATCH] Adds back in the optimization for using the identity map on inline aggregates + FetchForWriting, but this time as an opt in --- docs/scenarios/command_handler_workflow.md | 13 ++++ .../Writing/document_inserts.cs | 9 +-- .../fetching_inline_aggregates_for_writing.cs | 65 +++++++++++++++++++ src/EventSourcingTests/EventGraphTests.cs | 6 ++ .../Events/Aggregation/AggregationRuntime.cs | 11 +++- src/Marten/Events/EventGraph.cs | 1 + .../Events/Fetching/FetchInlinedPlan.cs | 7 ++ src/Marten/Events/IEventStoreOptions.cs | 9 +++ .../Events/IReadOnlyEventStoreOptions.cs | 7 ++ .../Internal/Sessions/DocumentSessionBase.cs | 21 +++--- src/Marten/Internal/Sessions/QuerySession.cs | 25 ++++--- 11 files changed, 144 insertions(+), 30 deletions(-) diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md index 4793ac11b0..c4300ce665 100644 --- a/docs/scenarios/command_handler_workflow.md +++ b/docs/scenarios/command_handler_workflow.md @@ -133,6 +133,19 @@ the standard `IDocumentSession.SaveChangesAsync()` method call. At that point, i `Order` stream between our handler calling `FetchForWriting()` and `IDocumentSession.SaveChangesAsync()`, the entire command will fail with a Marten `ConcurrencyException`. +### Inline Optimization + +If you are using and `Inline` single stream projection for the aggregate being targeted by `FetchForWriting()`, you can +make a performance optimization with this setting: + +sample: sample_use_identity_map_for_inline_aggregates + +It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization +-- and you probably should even though that's not a Marten default! -- and _also_ using `FetchForWriting()` with +`Inline` projections, this optimizes your system to make fewer network round trips to the database and reuse the data +you already fetched when applying the `Inline` projection. This is an _opt in_ setting because it can be harmful to +existing code that might be modifying the aggregate document fetched by `FetchForWriting()` outside of Marten itself. + ## Explicit Optimistic Concurrency This time let's explicitly opt into optimistic concurrency checks by telling Marten what the expected starting diff --git a/src/DocumentDbTests/Writing/document_inserts.cs b/src/DocumentDbTests/Writing/document_inserts.cs index eea441451f..c14c0f0506 100644 --- a/src/DocumentDbTests/Writing/document_inserts.cs +++ b/src/DocumentDbTests/Writing/document_inserts.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading.Tasks; using Marten.Exceptions; using Marten.Testing.Documents; using Marten.Testing.Harness; @@ -26,20 +27,20 @@ public void can_insert_all_new_documents() } [Fact] - public void can_insert_a_mixed_bag_of_documents() + public async Task can_insert_a_mixed_bag_of_documents() { var docs = new object[] { Target.Random(), Target.Random(), Target.Random(), new User(), new User(), new User(), new User() }; - using (var session = theStore.LightweightSession()) + await using (var session = theStore.LightweightSession()) { session.InsertObjects(docs); - session.SaveChanges(); + await session.SaveChangesAsync(); } - using (var query = theStore.QuerySession()) + await using (var query = theStore.QuerySession()) { query.Query().Count().ShouldBe(3); query.Query().Count().ShouldBe(4); diff --git a/src/EventSourcingTests/Aggregation/fetching_inline_aggregates_for_writing.cs b/src/EventSourcingTests/Aggregation/fetching_inline_aggregates_for_writing.cs index 97d7c3244b..db459ee276 100644 --- a/src/EventSourcingTests/Aggregation/fetching_inline_aggregates_for_writing.cs +++ b/src/EventSourcingTests/Aggregation/fetching_inline_aggregates_for_writing.cs @@ -1,10 +1,12 @@ using System; using System.Threading.Tasks; +using Marten; using Marten.Events; using Marten.Events.Projections; using Marten.Exceptions; using Marten.Storage; using Marten.Testing.Harness; +using Microsoft.Extensions.Hosting; using Xunit; using Shouldly; @@ -457,4 +459,67 @@ await Should.ThrowAsync(async () => }); } + public static void using_identity_map_for_inline_aggregates() + { + #region sample_use_identity_map_for_inline_aggregates + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection("some connection string"); + + // Force Marten to use the identity map for only the aggregate type + // that is the targeted "T" in FetchForWriting() when using + // an Inline projection for the "T". Saves on Marten doing an extra + // database fetch of the same data you already fetched from FetchForWriting() + // when Marten needs to apply the Inline projection as part of SaveChanges() + opts.Events.UseIdentityMapForInlineAggregates = true; + }) + // This is non-trivial performance optimization if you never + // need identity map mechanics in your commands or query handlers + .UseLightweightSessions(); + + #endregion + } + + [Fact] + public async Task silently_turns_on_identity_map_for_inline_aggregates() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + opts.Events.UseIdentityMapForInlineAggregates = true; + }); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + using var session = theStore.LightweightSession(); + var existing = await session.Events.FetchForWriting(streamId); + + // Should already be using the identity map + var loadAgain = await session.LoadAsync(streamId); + loadAgain.ShouldBeTheSameAs(existing.Aggregate); + + // Append to the stream and see that the existing aggregate is changed + existing.AppendOne(new AEvent()); + await session.SaveChangesAsync(); + + // 1 from the original version, another we just appended + existing.Aggregate.ACount.ShouldBe(2); + + using var query = theStore.QuerySession(); + var loadedFresh = await query.LoadAsync(streamId); + loadedFresh.ACount.ShouldBe(2); + } + } diff --git a/src/EventSourcingTests/EventGraphTests.cs b/src/EventSourcingTests/EventGraphTests.cs index 6c3e5e1576..22e56bea51 100644 --- a/src/EventSourcingTests/EventGraphTests.cs +++ b/src/EventSourcingTests/EventGraphTests.cs @@ -106,6 +106,12 @@ public void switch_to_quick_and_back_to_rich() theGraph.EventAppender.ShouldBeOfType(); } + [Fact] + public void use_identity_map_for_inline_aggregates_is_false_by_default() + { + theGraph.UseIdentityMapForInlineAggregates.ShouldBeFalse(); + } + public class HouseRemodeling { public Guid Id { get; set; } diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index 10f1926305..30c5d57bbe 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -70,7 +70,16 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, // do not load if sliced by stream and the stream does not yet exist if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline && (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start)) { - aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false); + if (session.Options.Events.UseIdentityMapForInlineAggregates) + { + // It's actually important to go in through the front door and use the session so that + // the identity map can kick in here + aggregate = await session.LoadAsync(slice.Id, cancellation).ConfigureAwait(false); + } + else + { + aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false); + } } // Does the aggregate already exist before the events are applied? diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 9f4a71c9b7..9a6057d91d 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -105,6 +105,7 @@ public StreamIdentity StreamIdentity public TenancyStyle TenancyStyle { get; set; } = TenancyStyle.Single; public bool EnableGlobalProjectionsForConjoinedTenancy { get; set; } + public bool UseIdentityMapForInlineAggregates { get; set; } /// /// Configure the meta data required to be stored for events. By default meta data fields are disabled diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.cs index 0702da45d3..9a2da680bc 100644 --- a/src/Marten/Events/Fetching/FetchInlinedPlan.cs +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.cs @@ -29,6 +29,13 @@ public async Task> FetchForWriting(DocumentSessionBase sessio await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + if (session.Options.Events.UseIdentityMapForInlineAggregates) + { + // Opt into the identity map mechanics for this aggregate type just in case + // you're using a lightweight session + session.UseIdentityMapFor(); + } + if (forUpdate) { await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index 47d4aef125..ffc7a1235e 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -30,6 +30,13 @@ public interface IEventStoreOptions /// bool EnableGlobalProjectionsForConjoinedTenancy { get; set; } + /// + /// Opt into a performance optimization that directs Marten to always use the identity map for an + /// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false. + /// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten + /// + bool UseIdentityMapForInlineAggregates { get; set; } + /// /// Override the database schema name for event related tables. By default this /// is the same schema as the document storage @@ -582,4 +589,6 @@ Func> upcastAsync return options.Upcast(GetEventTypeNameWithSchemaVersion(schemaVersion), upcastAsync); } + + } diff --git a/src/Marten/Events/IReadOnlyEventStoreOptions.cs b/src/Marten/Events/IReadOnlyEventStoreOptions.cs index 93f700f7cc..dc873010fe 100644 --- a/src/Marten/Events/IReadOnlyEventStoreOptions.cs +++ b/src/Marten/Events/IReadOnlyEventStoreOptions.cs @@ -49,4 +49,11 @@ public interface IReadOnlyEventStoreOptions IReadOnlyList Projections(); IReadOnlyList AllKnownEventTypes(); + + /// + /// Opt into a performance optimization that directs Marten to always use the identity map for an + /// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false. + /// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten + /// + bool UseIdentityMapForInlineAggregates { get; set; } } diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.cs index 4b7c99b556..a14f9683c3 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.cs @@ -365,22 +365,19 @@ private void store(IEnumerable entities) where T : notnull private void storeEntity(T entity, IDocumentStorage storage) where T : notnull { - if (entity is IVersioned versioned) + switch (entity) { - if (versioned.Version != Guid.Empty) - { + case IVersioned versioned when versioned.Version != Guid.Empty: storage.Store(this, entity, versioned.Version); return; - } - } - else if (entity is IRevisioned revisioned && revisioned.Version != 0) - { - storage.Store(this, entity, revisioned.Version); - return; + case IRevisioned revisioned when revisioned.Version != 0: + storage.Store(this, entity, revisioned.Version); + return; + default: + // Put it in the identity map -- if necessary + storage.Store(this, entity); + break; } - - // Put it in the identity map -- if necessary - storage.Store(this, entity); } public void EjectPatchedTypes(IUnitOfWork changes) diff --git a/src/Marten/Internal/Sessions/QuerySession.cs b/src/Marten/Internal/Sessions/QuerySession.cs index 8fd16b60de..050fd07932 100644 --- a/src/Marten/Internal/Sessions/QuerySession.cs +++ b/src/Marten/Internal/Sessions/QuerySession.cs @@ -84,20 +84,19 @@ public NpgsqlConnection Connection { get { - if (_connection is IAlwaysConnectedLifetime lifetime) + switch (_connection) { - return lifetime.Connection; - } - else if (_connection is ITransactionStarter starter) - { - var l = starter.Start(); - _connection = l; - return l.Connection; - } - else - { - throw new InvalidOperationException( - $"The current lifetime {_connection} is neither a {nameof(IAlwaysConnectedLifetime)} nor a {nameof(ITransactionStarter)}"); + case IAlwaysConnectedLifetime lifetime: + return lifetime.Connection; + case ITransactionStarter starter: + { + var l = starter.Start(); + _connection = l; + return l.Connection; + } + default: + throw new InvalidOperationException( + $"The current lifetime {_connection} is neither a {nameof(IAlwaysConnectedLifetime)} nor a {nameof(ITransactionStarter)}"); } } }