From ac6c8fe8a49281892064ce6c0c503dfe012562c2 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Fri, 18 Oct 2024 15:16:22 -0500 Subject: [PATCH] FetchForWriting + Strong Typed Identifiers on the projected aggregate documents. Closes GH-3490 --- ...ntifiers_on_aggregates_must_be_nullable.cs | 13 -- ..._strong_typed_id_for_aggregate_identity.cs | 24 +++ ..._strong_typed_id_for_aggregate_identity.cs | 25 +++ .../Events/EventStore.FetchForWriting.cs | 6 +- .../Events/Fetching/FetchInlinedPlan.cs | 4 +- .../Internal/Storage/IDocumentStorage.cs | 18 +++ src/Marten/Internal/ValueTypeInfo.cs | 151 ++++++++++++++++++ src/Marten/StoreOptions.Identity.cs | 19 +++ 8 files changed, 241 insertions(+), 19 deletions(-) diff --git a/src/EventSourcingTests/Aggregation/strong_typed_identifiers_on_aggregates_must_be_nullable.cs b/src/EventSourcingTests/Aggregation/strong_typed_identifiers_on_aggregates_must_be_nullable.cs index ec1fd086d1..9073b8685e 100644 --- a/src/EventSourcingTests/Aggregation/strong_typed_identifiers_on_aggregates_must_be_nullable.cs +++ b/src/EventSourcingTests/Aggregation/strong_typed_identifiers_on_aggregates_must_be_nullable.cs @@ -9,19 +9,6 @@ namespace EventSourcingTests.Aggregation; -public class strong_typed_identifiers_on_aggregates_must_be_nullable : OneOffConfigurationsContext -{ - [Fact] - public void should_warn_if_the_id_is_not_nullable() - { - Should.Throw(() => - { - StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); - }); - - } -} - public class Payment3 { [JsonInclude] public PaymentId Id { get; private set; } diff --git a/src/EventSourcingTests/Aggregation/using_guid_based_strong_typed_id_for_aggregate_identity.cs b/src/EventSourcingTests/Aggregation/using_guid_based_strong_typed_id_for_aggregate_identity.cs index 4a714008b8..7a24b7f8de 100644 --- a/src/EventSourcingTests/Aggregation/using_guid_based_strong_typed_id_for_aggregate_identity.cs +++ b/src/EventSourcingTests/Aggregation/using_guid_based_strong_typed_id_for_aggregate_identity.cs @@ -3,9 +3,11 @@ using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; +using EventSourcingTests.Projections; using JasperFx.Core; using JasperFx.Core.Reflection; using Marten.Events; +using Marten.Events.Aggregation; using Marten.Events.Projections; using Marten.Testing.Harness; using Microsoft.Extensions.Logging; @@ -44,6 +46,28 @@ public async Task can_utilize_strong_typed_id_in_aggregate_stream() payment.Id.Value.Value.ShouldBe(id); } + [Theory] + [InlineData(ProjectionLifecycle.Inline)] + [InlineData(ProjectionLifecycle.Async)] + [InlineData(ProjectionLifecycle.Live)] + public async Task use_fetch_for_writing(ProjectionLifecycle lifecycle) + { + StoreOptions(opts => + { + opts.UseSystemTextJsonForSerialization(new JsonSerializerOptions { IncludeFields = true }); + opts.Projections.Add(new SingleStreamProjection(), lifecycle); + }); + + var id = theSession.Events.StartStream(new PaymentCreated(DateTimeOffset.UtcNow), + new PaymentVerified(DateTimeOffset.UtcNow)).Id; + + await theSession.SaveChangesAsync(); + + // This shouldn't blow up + var stream = await theSession.Events.FetchForWriting(id); + stream.Aggregate.Id.Value.Value.ShouldBe(id); + } + [Fact] public async Task can_utilize_strong_typed_id_in_with_inline_aggregations() { diff --git a/src/EventSourcingTests/Aggregation/using_string_based_strong_typed_id_for_aggregate_identity.cs b/src/EventSourcingTests/Aggregation/using_string_based_strong_typed_id_for_aggregate_identity.cs index d1ef36e617..10d36dbe80 100644 --- a/src/EventSourcingTests/Aggregation/using_string_based_strong_typed_id_for_aggregate_identity.cs +++ b/src/EventSourcingTests/Aggregation/using_string_based_strong_typed_id_for_aggregate_identity.cs @@ -6,6 +6,7 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using Marten.Events; +using Marten.Events.Aggregation; using Marten.Events.Projections; using Marten.Testing.Harness; using Microsoft.Extensions.Logging; @@ -47,6 +48,30 @@ public async Task can_utilize_strong_typed_id_in_aggregate_stream() payment.Id.Value.Value.ShouldBe(id); } + [Theory] + [InlineData(ProjectionLifecycle.Inline)] + [InlineData(ProjectionLifecycle.Async)] + [InlineData(ProjectionLifecycle.Live)] + public async Task use_fetch_for_writing(ProjectionLifecycle lifecycle) + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.UseSystemTextJsonForSerialization(new JsonSerializerOptions { IncludeFields = true }); + opts.Projections.Add(new SingleStreamProjection(), lifecycle); + }); + + var id = Guid.NewGuid().ToString(); + theSession.Events.StartStream(id, new PaymentCreated(DateTimeOffset.UtcNow), + new PaymentVerified(DateTimeOffset.UtcNow)); + + await theSession.SaveChangesAsync(); + + // This shouldn't blow up + var stream = await theSession.Events.FetchForWriting(id); + stream.Aggregate.Id.Value.Value.ShouldBe(id); + } + [Fact] public async Task can_utilize_strong_typed_id_in_with_inline_aggregations() { diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index 2d6c70ce21..f621f3a775 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using JasperFx.Core; using JasperFx.Core.Reflection; +using Marten.Internal; using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Linq.QueryHandlers; @@ -151,10 +152,7 @@ private IAggregateFetchPlan findFetchPlan() where TDoc : c return (IAggregateFetchPlan)stored; } - // All the IDocumentStorage types are codegen'd - // ReSharper disable once SuspiciousTypeConversion.Global - var documentProvider = _store.Options.Providers.StorageFor(); - var storage = (IDocumentStorage)documentProvider.IdentityMap; + var storage = _store.Options.ResolveCorrectedDocumentStorage(DocumentTracking.IdentityOnly); var plan = determineFetchPlan(storage, _session.Options); diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.cs index 0996b9de45..46b4fda0c6 100644 --- a/src/Marten/Events/Fetching/FetchInlinedPlan.cs +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.cs @@ -27,14 +27,14 @@ public async Task> FetchForWriting(DocumentSessionBase sessio IDocumentStorage storage = null; if (session.Options.Events.UseIdentityMapForInlineAggregates) { - storage = (IDocumentStorage)session.Options.Providers.StorageFor().IdentityMap; + storage = session.Options.ResolveCorrectedDocumentStorage(DocumentTracking.IdentityOnly); // Opt into the identity map mechanics for this aggregate type just in case // you're using a lightweight session session.UseIdentityMapFor(); } else { - storage = session.StorageFor(); + storage = session.Options.ResolveCorrectedDocumentStorage(session.TrackingMode); } await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); diff --git a/src/Marten/Internal/Storage/IDocumentStorage.cs b/src/Marten/Internal/Storage/IDocumentStorage.cs index d66b0b03c9..eae40bd0d4 100644 --- a/src/Marten/Internal/Storage/IDocumentStorage.cs +++ b/src/Marten/Internal/Storage/IDocumentStorage.cs @@ -84,6 +84,24 @@ public DocumentProvider(IBulkLoader? bulkLoader, IDocumentStorage queryOnl public IDocumentStorage Lightweight { get; } public IDocumentStorage IdentityMap { get; } public IDocumentStorage DirtyTracking { get; } + + public IDocumentStorage Select(DocumentTracking tracking) + { + switch (tracking) + { + case DocumentTracking.None: + return Lightweight; + case DocumentTracking.QueryOnly: + return QueryOnly; + case DocumentTracking.DirtyTracking: + return DirtyTracking; + case DocumentTracking.IdentityOnly: + return IdentityMap; + + default: + throw new ArgumentOutOfRangeException(nameof(tracking)); + } + } } public interface IDocumentStorage: IDocumentStorage where T : notnull diff --git a/src/Marten/Internal/ValueTypeInfo.cs b/src/Marten/Internal/ValueTypeInfo.cs index cb3a5219f3..ab5b3562c4 100644 --- a/src/Marten/Internal/ValueTypeInfo.cs +++ b/src/Marten/Internal/ValueTypeInfo.cs @@ -1,11 +1,28 @@ using System; +using System.Collections.Generic; using System.Collections.Immutable; +using System.Linq; using System.Linq.Expressions; using System.Reflection; +using System.Threading; +using System.Threading.Tasks; using FastExpressionCompiler; using JasperFx.Core.Reflection; using Marten.Events; +using Marten.Internal.Operations; +using Marten.Internal.Storage; +using Marten.Linq; +using Marten.Linq.Members; using Marten.Linq.Members.ValueCollections; +using Marten.Linq.QueryHandlers; +using Marten.Linq.Selectors; +using Marten.Linq.SqlGeneration; +using Marten.Services; +using Marten.Storage; +using Npgsql; +using Weasel.Core; +using Weasel.Postgresql; +using Weasel.Postgresql.SqlGeneration; namespace Marten.Internal; @@ -110,3 +127,137 @@ public ValueTypeElementMember(Type declaringType, Type reflectedType) : base(dec { } } + +internal class ValueTypeIdentifiedDocumentStorage: IDocumentStorage +{ + private readonly IDocumentStorage _inner; + private readonly Func _converter; + private readonly Func _unwrapper; + + public ValueTypeIdentifiedDocumentStorage(ValueTypeInfo valueTypeInfo, IDocumentStorage inner) + { + _inner = inner; + + _converter = valueTypeInfo.CreateConverter(); + _unwrapper = valueTypeInfo.ValueAccessor(); + } + + public void Apply(ICommandBuilder builder) => _inner.Apply(builder); + + public string FromObject => _inner.FromObject; + public Type SelectedType => _inner.SelectedType; + public string[] SelectFields() => _inner.SelectFields(); + + public ISelector BuildSelector(IMartenSession session) => _inner.BuildSelector(session); + + public IQueryHandler BuildHandler(IMartenSession session, ISqlFragment topStatement, + ISqlFragment currentStatement) + => _inner.BuildHandler(session, topStatement, currentStatement); + + public ISelectClause UseStatistics(QueryStatistics statistics) + => _inner.UseStatistics(statistics); + + public Type SourceType => _inner.SourceType; + public Type IdType => _inner.IdType; + public bool UseOptimisticConcurrency => _inner.UseOptimisticConcurrency; + public IOperationFragment DeleteFragment => _inner.DeleteFragment; + public IOperationFragment HardDeleteFragment => _inner.HardDeleteFragment; + public IReadOnlyList DuplicatedFields => _inner.DuplicatedFields; + public DbObjectName TableName => _inner.TableName; + public Type DocumentType => _inner.DocumentType; + public TenancyStyle TenancyStyle => _inner.TenancyStyle; + + public Task TruncateDocumentStorageAsync(IMartenDatabase database, CancellationToken ct = default) + => _inner.TruncateDocumentStorageAsync(database, ct); + + public void TruncateDocumentStorage(IMartenDatabase database) => _inner.TruncateDocumentStorage(database); + + public ISqlFragment FilterDocuments(ISqlFragment query, IMartenSession session) + => _inner.FilterDocuments(query, session); + + public ISqlFragment DefaultWhereFragment() + => _inner.DefaultWhereFragment(); + + public IQueryableMemberCollection QueryMembers => _inner.QueryMembers; + public ISelectClause SelectClauseWithDuplicatedFields => _inner.SelectClauseWithDuplicatedFields; + public bool UseNumericRevisions => _inner.UseNumericRevisions; + public object RawIdentityValue(object id) => _inner.RawIdentityValue(id); + + public object IdentityFor(TDoc document) => _inner.IdentityFor(document); + + public Guid? VersionFor(TDoc document, IMartenSession session) => _inner.VersionFor(document, session); + + public void Store(IMartenSession session, TDoc document) => _inner.Store(session, document); + + public void Store(IMartenSession session, TDoc document, Guid? version) => _inner.Store(session, document, version); + + public void Store(IMartenSession session, TDoc document, int revision) => _inner.Store(session, document, revision); + + public void Eject(IMartenSession session, TDoc document) => _inner.Eject(session, document); + + public IStorageOperation Update(TDoc document, IMartenSession session, string tenantId) => + _inner.Update(document, session, tenantId); + + public IStorageOperation Insert(TDoc document, IMartenSession session, string tenantId) + => _inner.Insert(document, session, tenantId); + + public IStorageOperation Upsert(TDoc document, IMartenSession session, string tenantId) + => _inner.Upsert(document, session, tenantId); + + public IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenantId) + => _inner.Overwrite(document, session, tenantId); + + public IDeletion DeleteForDocument(TDoc document, string tenantId) + => _inner.DeleteForDocument(document, tenantId); + + public void EjectById(IMartenSession session, object id) + => _inner.EjectById(session, id); + + public void RemoveDirtyTracker(IMartenSession session, object id) + => _inner.RemoveDirtyTracker(session, id); + + public IDeletion HardDeleteForDocument(TDoc document, string tenantId) + => _inner.HardDeleteForDocument(document, tenantId); + + public void SetIdentityFromString(TDoc document, string identityString) + => _inner.SetIdentityFromString(document, identityString); + + public void SetIdentityFromGuid(TDoc document, Guid identityGuid) + => _inner.SetIdentityFromGuid(document, identityGuid); + + public void SetIdentity(TDoc document, TSimple identity) + => _inner.SetIdentity(document, _converter(identity)); + + public IDeletion DeleteForId(TSimple id, string tenantId) + => _inner.DeleteForId(_converter(id), tenantId); + + public TDoc Load(TSimple id, IMartenSession session) + => _inner.Load(_converter(id), session); + + public Task LoadAsync(TSimple id, IMartenSession session, CancellationToken token) + => _inner.LoadAsync(_converter(id), session, token); + + public IReadOnlyList LoadMany(TSimple[] ids, IMartenSession session) + => _inner.LoadMany(ids.Select(_converter).ToArray(), session); + + public Task> LoadManyAsync(TSimple[] ids, IMartenSession session, CancellationToken token) + => _inner.LoadManyAsync(ids.Select(_converter).ToArray(), session, token); + + public TSimple AssignIdentity(TDoc document, string tenantId, IMartenDatabase database) + => _unwrapper(_inner.AssignIdentity(document, tenantId, database)); + + public TSimple Identity(TDoc document) => _unwrapper(_inner.Identity(document)); + + public ISqlFragment ByIdFilter(TSimple id) => _inner.ByIdFilter(_converter(id)); + + public IDeletion HardDeleteForId(TSimple id, string tenantId) + => _inner.HardDeleteForId(_converter(id), tenantId); + + public NpgsqlCommand BuildLoadCommand(TSimple id, string tenantId) + => _inner.BuildLoadCommand(_converter(id), tenantId); + + public NpgsqlCommand BuildLoadManyCommand(TSimple[] ids, string tenantId) + => _inner.BuildLoadManyCommand(ids.Select(_converter).ToArray(), tenantId); + + public object RawIdentityValue(TSimple id) => id; +} diff --git a/src/Marten/StoreOptions.Identity.cs b/src/Marten/StoreOptions.Identity.cs index 5e5efc1bbc..911caa8ff3 100644 --- a/src/Marten/StoreOptions.Identity.cs +++ b/src/Marten/StoreOptions.Identity.cs @@ -6,6 +6,7 @@ using JasperFx.Core.Reflection; using Marten.Exceptions; using Marten.Internal; +using Marten.Internal.Storage; using Marten.Linq.Members; using Marten.Schema.Identity; using Marten.Schema.Identity.Sequences; @@ -15,6 +16,24 @@ namespace Marten; public partial class StoreOptions { + internal IDocumentStorage ResolveCorrectedDocumentStorage(DocumentTracking tracking) + { + var provider = Providers.StorageFor(); + var raw = provider.Select(tracking); + + if (raw is IDocumentStorage storage) return storage; + + var valueTypeInfo = TryFindValueType(raw.IdType); + if (valueTypeInfo == null) + throw new InvalidOperationException( + $"Invalid identifier type for aggregate {typeof(TDoc).FullNameInCode()}. Id type is {raw.IdType.FullNameInCode()}"); + + return typeof(ValueTypeIdentifiedDocumentStorage<,,>).CloseAndBuildAs>( + valueTypeInfo, raw, typeof(TDoc), typeof(TId), + raw.IdType); + } + + internal IIdGeneration DetermineIdStrategy(Type documentType, MemberInfo idMember) { var idType = idMember.GetMemberType();