Skip to content

Commit

Permalink
FetchForWriting + Strong Typed Identifiers on the projected aggregate…
Browse files Browse the repository at this point in the history
… documents. Closes GH-3490
  • Loading branch information
jeremydmiller committed Oct 18, 2024
1 parent 5a46bbb commit 5c40cf3
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@

namespace EventSourcingTests.Aggregation;

public class strong_typed_identifiers_on_aggregates_must_be_nullable : OneOffConfigurationsContext
{
[Fact]
public void should_warn_if_the_id_is_not_nullable()
{
Should.Throw<InvalidProjectionException>(() =>
{
StoreOptions(opts => opts.Projections.Snapshot<Payment3>(SnapshotLifecycle.Inline));
});

}
}

public class Payment3
{
[JsonInclude] public PaymentId Id { get; private set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using EventSourcingTests.Projections;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -44,6 +46,28 @@ public async Task can_utilize_strong_typed_id_in_aggregate_stream()
payment.Id.Value.Value.ShouldBe(id);
}

[Theory]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
[InlineData(ProjectionLifecycle.Live)]
public async Task use_fetch_for_writing(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.UseSystemTextJsonForSerialization(new JsonSerializerOptions { IncludeFields = true });
opts.Projections.Add(new SingleStreamProjection<Payment>(), lifecycle);
});

var id = theSession.Events.StartStream<Payment>(new PaymentCreated(DateTimeOffset.UtcNow),
new PaymentVerified(DateTimeOffset.UtcNow)).Id;

await theSession.SaveChangesAsync();

// This shouldn't blow up
var stream = await theSession.Events.FetchForWriting<Payment>(id);
stream.Aggregate.Id.Value.Value.ShouldBe(id);
}

[Fact]
public async Task can_utilize_strong_typed_id_in_with_inline_aggregations()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -47,6 +48,30 @@ public async Task can_utilize_strong_typed_id_in_aggregate_stream()
payment.Id.Value.Value.ShouldBe(id);
}

[Theory]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
[InlineData(ProjectionLifecycle.Live)]
public async Task use_fetch_for_writing(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsString;
opts.UseSystemTextJsonForSerialization(new JsonSerializerOptions { IncludeFields = true });
opts.Projections.Add(new SingleStreamProjection<Payment2>(), lifecycle);
});

var id = Guid.NewGuid().ToString();
theSession.Events.StartStream<Payment2>(id, new PaymentCreated(DateTimeOffset.UtcNow),
new PaymentVerified(DateTimeOffset.UtcNow));

await theSession.SaveChangesAsync();

// This shouldn't blow up
var stream = await theSession.Events.FetchForWriting<Payment2>(id);
stream.Aggregate.Id.Value.Value.ShouldBe(id);
}

[Fact]
public async Task can_utilize_strong_typed_id_in_with_inline_aggregations()
{
Expand Down
6 changes: 2 additions & 4 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Linq.QueryHandlers;
Expand Down Expand Up @@ -151,10 +152,7 @@ private IAggregateFetchPlan<TDoc, TId> findFetchPlan<TDoc, TId>() where TDoc : c
return (IAggregateFetchPlan<TDoc, TId>)stored;
}

// All the IDocumentStorage types are codegen'd
// ReSharper disable once SuspiciousTypeConversion.Global
var documentProvider = _store.Options.Providers.StorageFor<TDoc>();
var storage = (IDocumentStorage<TDoc, TId>)documentProvider.IdentityMap;
var storage = _store.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);

var plan = determineFetchPlan(storage, _session.Options);

Expand Down
4 changes: 2 additions & 2 deletions src/Marten/Events/Fetching/FetchInlinedPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio
IDocumentStorage<TDoc, TId> storage = null;
if (session.Options.Events.UseIdentityMapForInlineAggregates)
{
storage = (IDocumentStorage<TDoc, TId>)session.Options.Providers.StorageFor<TDoc>().IdentityMap;
storage = session.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);
// Opt into the identity map mechanics for this aggregate type just in case
// you're using a lightweight session
session.UseIdentityMapFor<TDoc>();
}
else
{
storage = session.StorageFor<TDoc, TId>();
storage = session.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(session.TrackingMode);
}

await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
Expand Down
18 changes: 18 additions & 0 deletions src/Marten/Internal/Storage/IDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ public DocumentProvider(IBulkLoader<T>? bulkLoader, IDocumentStorage<T> queryOnl
public IDocumentStorage<T> Lightweight { get; }
public IDocumentStorage<T> IdentityMap { get; }
public IDocumentStorage<T> DirtyTracking { get; }

public IDocumentStorage<T> Select(DocumentTracking tracking)
{
switch (tracking)
{
case DocumentTracking.None:
return Lightweight;
case DocumentTracking.QueryOnly:
return QueryOnly;
case DocumentTracking.DirtyTracking:
return DirtyTracking;
case DocumentTracking.IdentityOnly:
return IdentityMap;

default:
throw new ArgumentOutOfRangeException(nameof(tracking));
}
}
}

public interface IDocumentStorage<T>: IDocumentStorage where T : notnull
Expand Down
151 changes: 151 additions & 0 deletions src/Marten/Internal/ValueTypeInfo.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FastExpressionCompiler;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Internal.Operations;
using Marten.Internal.Storage;
using Marten.Linq;
using Marten.Linq.Members;
using Marten.Linq.Members.ValueCollections;
using Marten.Linq.QueryHandlers;
using Marten.Linq.Selectors;
using Marten.Linq.SqlGeneration;
using Marten.Services;
using Marten.Storage;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;
using Weasel.Postgresql.SqlGeneration;

namespace Marten.Internal;

Expand Down Expand Up @@ -110,3 +127,137 @@ public ValueTypeElementMember(Type declaringType, Type reflectedType) : base(dec
{
}
}

internal class ValueTypeIdentifiedDocumentStorage<TDoc, TSimple, TValueType>: IDocumentStorage<TDoc, TSimple>
{
private readonly IDocumentStorage<TDoc, TValueType> _inner;
private readonly Func<TSimple, TValueType> _converter;
private readonly Func<TValueType,TSimple> _unwrapper;

public ValueTypeIdentifiedDocumentStorage(ValueTypeInfo valueTypeInfo, IDocumentStorage<TDoc, TValueType> inner)
{
_inner = inner;

_converter = valueTypeInfo.CreateConverter<TValueType, TSimple>();
_unwrapper = valueTypeInfo.ValueAccessor<TValueType, TSimple>();
}

public void Apply(ICommandBuilder builder) => _inner.Apply(builder);

public string FromObject => _inner.FromObject;
public Type SelectedType => _inner.SelectedType;
public string[] SelectFields() => _inner.SelectFields();

public ISelector BuildSelector(IMartenSession session) => _inner.BuildSelector(session);

public IQueryHandler<T> BuildHandler<T>(IMartenSession session, ISqlFragment topStatement,
ISqlFragment currentStatement)
=> _inner.BuildHandler<T>(session, topStatement, currentStatement);

public ISelectClause UseStatistics(QueryStatistics statistics)
=> _inner.UseStatistics(statistics);

public Type SourceType => _inner.SourceType;
public Type IdType => _inner.IdType;
public bool UseOptimisticConcurrency => _inner.UseOptimisticConcurrency;
public IOperationFragment DeleteFragment => _inner.DeleteFragment;
public IOperationFragment HardDeleteFragment => _inner.HardDeleteFragment;
public IReadOnlyList<DuplicatedField> DuplicatedFields => _inner.DuplicatedFields;
public DbObjectName TableName => _inner.TableName;
public Type DocumentType => _inner.DocumentType;
public TenancyStyle TenancyStyle => _inner.TenancyStyle;

public Task TruncateDocumentStorageAsync(IMartenDatabase database, CancellationToken ct = default)
=> _inner.TruncateDocumentStorageAsync(database, ct);

public void TruncateDocumentStorage(IMartenDatabase database) => _inner.TruncateDocumentStorage(database);

public ISqlFragment FilterDocuments(ISqlFragment query, IMartenSession session)
=> _inner.FilterDocuments(query, session);

public ISqlFragment DefaultWhereFragment()
=> _inner.DefaultWhereFragment();

public IQueryableMemberCollection QueryMembers => _inner.QueryMembers;
public ISelectClause SelectClauseWithDuplicatedFields => _inner.SelectClauseWithDuplicatedFields;
public bool UseNumericRevisions => _inner.UseNumericRevisions;
public object RawIdentityValue(object id) => _inner.RawIdentityValue(id);

public object IdentityFor(TDoc document) => _inner.IdentityFor(document);

public Guid? VersionFor(TDoc document, IMartenSession session) => _inner.VersionFor(document, session);

public void Store(IMartenSession session, TDoc document) => _inner.Store(session, document);

public void Store(IMartenSession session, TDoc document, Guid? version) => _inner.Store(session, document, version);

public void Store(IMartenSession session, TDoc document, int revision) => _inner.Store(session, document, revision);

public void Eject(IMartenSession session, TDoc document) => _inner.Eject(session, document);

public IStorageOperation Update(TDoc document, IMartenSession session, string tenantId) =>
_inner.Update(document, session, tenantId);

public IStorageOperation Insert(TDoc document, IMartenSession session, string tenantId)
=> _inner.Insert(document, session, tenantId);

public IStorageOperation Upsert(TDoc document, IMartenSession session, string tenantId)
=> _inner.Upsert(document, session, tenantId);

public IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenantId)
=> _inner.Overwrite(document, session, tenantId);

public IDeletion DeleteForDocument(TDoc document, string tenantId)
=> _inner.DeleteForDocument(document, tenantId);

public void EjectById(IMartenSession session, object id)
=> _inner.EjectById(session, id);

public void RemoveDirtyTracker(IMartenSession session, object id)
=> _inner.RemoveDirtyTracker(session, id);

public IDeletion HardDeleteForDocument(TDoc document, string tenantId)
=> _inner.HardDeleteForDocument(document, tenantId);

public void SetIdentityFromString(TDoc document, string identityString)
=> _inner.SetIdentityFromString(document, identityString);

public void SetIdentityFromGuid(TDoc document, Guid identityGuid)
=> _inner.SetIdentityFromGuid(document, identityGuid);

public void SetIdentity(TDoc document, TSimple identity)
=> _inner.SetIdentity(document, _converter(identity));

public IDeletion DeleteForId(TSimple id, string tenantId)
=> _inner.DeleteForId(_converter(id), tenantId);

public TDoc Load(TSimple id, IMartenSession session)
=> _inner.Load(_converter(id), session);

public Task<TDoc> LoadAsync(TSimple id, IMartenSession session, CancellationToken token)
=> _inner.LoadAsync(_converter(id), session, token);

public IReadOnlyList<TDoc> LoadMany(TSimple[] ids, IMartenSession session)
=> _inner.LoadMany(ids.Select(_converter).ToArray(), session);

public Task<IReadOnlyList<TDoc>> LoadManyAsync(TSimple[] ids, IMartenSession session, CancellationToken token)
=> _inner.LoadManyAsync(ids.Select(_converter).ToArray(), session, token);

public TSimple AssignIdentity(TDoc document, string tenantId, IMartenDatabase database)
=> _unwrapper(_inner.AssignIdentity(document, tenantId, database));

public TSimple Identity(TDoc document) => _unwrapper(_inner.Identity(document));

public ISqlFragment ByIdFilter(TSimple id) => _inner.ByIdFilter(_converter(id));

public IDeletion HardDeleteForId(TSimple id, string tenantId)
=> _inner.HardDeleteForId(_converter(id), tenantId);

public NpgsqlCommand BuildLoadCommand(TSimple id, string tenantId)
=> _inner.BuildLoadCommand(_converter(id), tenantId);

public NpgsqlCommand BuildLoadManyCommand(TSimple[] ids, string tenantId)
=> _inner.BuildLoadManyCommand(ids.Select(_converter).ToArray(), tenantId);

public object RawIdentityValue(TSimple id) => id;
}
19 changes: 19 additions & 0 deletions src/Marten/StoreOptions.Identity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using JasperFx.Core.Reflection;
using Marten.Exceptions;
using Marten.Internal;
using Marten.Internal.Storage;
using Marten.Linq.Members;
using Marten.Schema.Identity;
using Marten.Schema.Identity.Sequences;
Expand All @@ -15,6 +16,24 @@ namespace Marten;

public partial class StoreOptions
{
internal IDocumentStorage<TDoc, TId> ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking tracking)
{
var provider = Providers.StorageFor<TDoc>();
var raw = provider.Select(tracking);

if (raw is IDocumentStorage<TDoc, TId> storage) return storage;

var valueTypeInfo = TryFindValueType(raw.IdType);
if (valueTypeInfo == null)
throw new InvalidOperationException(
$"Invalid identifier type for aggregate {typeof(TDoc).FullNameInCode()}. Id type is {raw.IdType.FullNameInCode()}");

return typeof(ValueTypeIdentifiedDocumentStorage<,,>).CloseAndBuildAs<IDocumentStorage<TDoc, TId>>(
valueTypeInfo, raw, typeof(TDoc), typeof(TId),
raw.IdType);
}


internal IIdGeneration DetermineIdStrategy(Type documentType, MemberInfo idMember)
{
var idType = idMember.GetMemberType();
Expand Down

0 comments on commit 5c40cf3

Please sign in to comment.