Skip to content

Commit

Permalink
Adds back in the optimization for using the identity map on inline ag…
Browse files Browse the repository at this point in the history
…gregates + FetchForWriting, but this time as an opt in
  • Loading branch information
jeremydmiller committed Jul 18, 2024
1 parent c183b72 commit c0b9f34
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 30 deletions.
13 changes: 13 additions & 0 deletions docs/scenarios/command_handler_workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ the standard `IDocumentSession.SaveChangesAsync()` method call. At that point, i
`Order` stream between our handler calling `FetchForWriting()` and `IDocumentSession.SaveChangesAsync()`, the entire command will fail with a Marten
`ConcurrencyException`.

### Inline Optimization <Badge type="tip" text="7.25" />

If you are using and `Inline` single stream projection for the aggregate being targeted by `FetchForWriting()`, you can
make a performance optimization with this setting:

sample: sample_use_identity_map_for_inline_aggregates

It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization
-- and you probably should even though that's not a Marten default! -- and _also_ using `FetchForWriting<T>()` with
`Inline` projections, this optimizes your system to make fewer network round trips to the database and reuse the data
you already fetched when applying the `Inline` projection. This is an _opt in_ setting because it can be harmful to
existing code that might be modifying the aggregate document fetched by `FetchForWriting()` outside of Marten itself.

## Explicit Optimistic Concurrency

This time let's explicitly opt into optimistic concurrency checks by telling Marten what the expected starting
Expand Down
9 changes: 5 additions & 4 deletions src/DocumentDbTests/Writing/document_inserts.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Marten.Exceptions;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
Expand All @@ -26,20 +27,20 @@ public void can_insert_all_new_documents()
}

[Fact]
public void can_insert_a_mixed_bag_of_documents()
public async Task can_insert_a_mixed_bag_of_documents()
{
var docs = new object[]
{
Target.Random(), Target.Random(), Target.Random(), new User(), new User(), new User(), new User()
};

using (var session = theStore.LightweightSession())
await using (var session = theStore.LightweightSession())
{
session.InsertObjects(docs);
session.SaveChanges();
await session.SaveChangesAsync();
}

using (var query = theStore.QuerySession())
await using (var query = theStore.QuerySession())
{
query.Query<Target>().Count().ShouldBe(3);
query.Query<User>().Count().ShouldBe(4);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Threading.Tasks;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Exceptions;
using Marten.Storage;
using Marten.Testing.Harness;
using Microsoft.Extensions.Hosting;
using Xunit;
using Shouldly;

Expand Down Expand Up @@ -457,4 +459,67 @@ await Should.ThrowAsync<ConcurrencyException>(async () =>
});
}

public static void using_identity_map_for_inline_aggregates()
{
#region sample_use_identity_map_for_inline_aggregates

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection("some connection string");

// Force Marten to use the identity map for only the aggregate type
// that is the targeted "T" in FetchForWriting<T>() when using
// an Inline projection for the "T". Saves on Marten doing an extra
// database fetch of the same data you already fetched from FetchForWriting()
// when Marten needs to apply the Inline projection as part of SaveChanges()
opts.Events.UseIdentityMapForInlineAggregates = true;
})
// This is non-trivial performance optimization if you never
// need identity map mechanics in your commands or query handlers
.UseLightweightSessions();

#endregion
}

[Fact]
public async Task silently_turns_on_identity_map_for_inline_aggregates()
{
StoreOptions(opts =>
{
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline);
opts.Events.UseIdentityMapForInlineAggregates = true;
});

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.Aggregate.ShouldBeNull();
stream.CurrentVersion.ShouldBe(0);

stream.AppendOne(new AEvent());
stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

using var session = theStore.LightweightSession();
var existing = await session.Events.FetchForWriting<SimpleAggregate>(streamId);

// Should already be using the identity map
var loadAgain = await session.LoadAsync<SimpleAggregate>(streamId);
loadAgain.ShouldBeTheSameAs(existing.Aggregate);

// Append to the stream and see that the existing aggregate is changed
existing.AppendOne(new AEvent());
await session.SaveChangesAsync();

// 1 from the original version, another we just appended
existing.Aggregate.ACount.ShouldBe(2);

using var query = theStore.QuerySession();
var loadedFresh = await query.LoadAsync<SimpleAggregate>(streamId);
loadedFresh.ACount.ShouldBe(2);
}

}
6 changes: 6 additions & 0 deletions src/EventSourcingTests/EventGraphTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public void switch_to_quick_and_back_to_rich()
theGraph.EventAppender.ShouldBeOfType<RichEventAppender>();
}

[Fact]
public void use_identity_map_for_inline_aggregates_is_false_by_default()
{
theGraph.UseIdentityMapForInlineAggregates.ShouldBeFalse();
}

public class HouseRemodeling
{
public Guid Id { get; set; }
Expand Down
11 changes: 10 additions & 1 deletion src/Marten/Events/Aggregation/AggregationRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,16 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
// do not load if sliced by stream and the stream does not yet exist
if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline && (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start))
{
aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false);
if (session.Options.Events.UseIdentityMapForInlineAggregates)
{
// It's actually important to go in through the front door and use the session so that
// the identity map can kick in here
aggregate = await session.LoadAsync<TDoc>(slice.Id, cancellation).ConfigureAwait(false);
}
else
{
aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false);
}
}

// Does the aggregate already exist before the events are applied?
Expand Down
1 change: 1 addition & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public StreamIdentity StreamIdentity
public TenancyStyle TenancyStyle { get; set; } = TenancyStyle.Single;

public bool EnableGlobalProjectionsForConjoinedTenancy { get; set; }
public bool UseIdentityMapForInlineAggregates { get; set; }

/// <summary>
/// Configure the meta data required to be stored for events. By default meta data fields are disabled
Expand Down
7 changes: 7 additions & 0 deletions src/Marten/Events/Fetching/FetchInlinedPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio
await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);

if (session.Options.Events.UseIdentityMapForInlineAggregates)
{
// Opt into the identity map mechanics for this aggregate type just in case
// you're using a lightweight session
session.UseIdentityMapFor<TDoc>();
}

if (forUpdate)
{
await session.BeginTransactionAsync(cancellation).ConfigureAwait(false);
Expand Down
9 changes: 9 additions & 0 deletions src/Marten/Events/IEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public interface IEventStoreOptions
/// </summary>
bool EnableGlobalProjectionsForConjoinedTenancy { get; set; }

/// <summary>
/// Opt into a performance optimization that directs Marten to always use the identity map for an
/// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false.
/// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten
/// </summary>
bool UseIdentityMapForInlineAggregates { get; set; }

/// <summary>
/// Override the database schema name for event related tables. By default this
/// is the same schema as the document storage
Expand Down Expand Up @@ -582,4 +589,6 @@ Func<TOldEvent, CancellationToken, Task<TEvent>> upcastAsync
return options.Upcast(GetEventTypeNameWithSchemaVersion<TOldEvent>(schemaVersion), upcastAsync);
}



}
7 changes: 7 additions & 0 deletions src/Marten/Events/IReadOnlyEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ public interface IReadOnlyEventStoreOptions
IReadOnlyList<IReadOnlyProjectionData> Projections();

IReadOnlyList<IEventType> AllKnownEventTypes();

/// <summary>
/// Opt into a performance optimization that directs Marten to always use the identity map for an
/// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false.
/// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten
/// </summary>
bool UseIdentityMapForInlineAggregates { get; set; }
}
21 changes: 9 additions & 12 deletions src/Marten/Internal/Sessions/DocumentSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,22 +365,19 @@ private void store<T>(IEnumerable<T> entities) where T : notnull

private void storeEntity<T>(T entity, IDocumentStorage<T> storage) where T : notnull
{
if (entity is IVersioned versioned)
switch (entity)
{
if (versioned.Version != Guid.Empty)
{
case IVersioned versioned when versioned.Version != Guid.Empty:
storage.Store(this, entity, versioned.Version);
return;
}
}
else if (entity is IRevisioned revisioned && revisioned.Version != 0)
{
storage.Store(this, entity, revisioned.Version);
return;
case IRevisioned revisioned when revisioned.Version != 0:
storage.Store(this, entity, revisioned.Version);
return;
default:
// Put it in the identity map -- if necessary
storage.Store(this, entity);
break;
}

// Put it in the identity map -- if necessary
storage.Store(this, entity);
}

public void EjectPatchedTypes(IUnitOfWork changes)
Expand Down
25 changes: 12 additions & 13 deletions src/Marten/Internal/Sessions/QuerySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,19 @@ public NpgsqlConnection Connection
{
get
{
if (_connection is IAlwaysConnectedLifetime lifetime)
switch (_connection)
{
return lifetime.Connection;
}
else if (_connection is ITransactionStarter starter)
{
var l = starter.Start();
_connection = l;
return l.Connection;
}
else
{
throw new InvalidOperationException(
$"The current lifetime {_connection} is neither a {nameof(IAlwaysConnectedLifetime)} nor a {nameof(ITransactionStarter)}");
case IAlwaysConnectedLifetime lifetime:
return lifetime.Connection;
case ITransactionStarter starter:
{
var l = starter.Start();
_connection = l;
return l.Connection;
}
default:
throw new InvalidOperationException(
$"The current lifetime {_connection} is neither a {nameof(IAlwaysConnectedLifetime)} nor a {nameof(ITransactionStarter)}");
}
}
}
Expand Down

0 comments on commit c0b9f34

Please sign in to comment.