Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FetchForWriting + Strong Typed Identifiers on the projected aggregate… #3492

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@

namespace EventSourcingTests.Aggregation;

public class strong_typed_identifiers_on_aggregates_must_be_nullable : OneOffConfigurationsContext
{
[Fact]
public void should_warn_if_the_id_is_not_nullable()
{
Should.Throw<InvalidProjectionException>(() =>
{
StoreOptions(opts => opts.Projections.Snapshot<Payment3>(SnapshotLifecycle.Inline));
});

}
}

public class Payment3
{
[JsonInclude] public PaymentId Id { get; private set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using EventSourcingTests.Projections;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -44,6 +46,28 @@ public async Task can_utilize_strong_typed_id_in_aggregate_stream()
payment.Id.Value.Value.ShouldBe(id);
}

[Theory]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
[InlineData(ProjectionLifecycle.Live)]
public async Task use_fetch_for_writing(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.UseSystemTextJsonForSerialization(new JsonSerializerOptions { IncludeFields = true });
opts.Projections.Add(new SingleStreamProjection<Payment>(), lifecycle);
});

var id = theSession.Events.StartStream<Payment>(new PaymentCreated(DateTimeOffset.UtcNow),
new PaymentVerified(DateTimeOffset.UtcNow)).Id;

await theSession.SaveChangesAsync();

// This shouldn't blow up
var stream = await theSession.Events.FetchForWriting<Payment>(id);
stream.Aggregate.Id.Value.Value.ShouldBe(id);
}

[Fact]
public async Task can_utilize_strong_typed_id_in_with_inline_aggregations()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -47,6 +48,30 @@ public async Task can_utilize_strong_typed_id_in_aggregate_stream()
payment.Id.Value.Value.ShouldBe(id);
}

[Theory]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
[InlineData(ProjectionLifecycle.Live)]
public async Task use_fetch_for_writing(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsString;
opts.UseSystemTextJsonForSerialization(new JsonSerializerOptions { IncludeFields = true });
opts.Projections.Add(new SingleStreamProjection<Payment2>(), lifecycle);
});

var id = Guid.NewGuid().ToString();
theSession.Events.StartStream<Payment2>(id, new PaymentCreated(DateTimeOffset.UtcNow),
new PaymentVerified(DateTimeOffset.UtcNow));

await theSession.SaveChangesAsync();

// This shouldn't blow up
var stream = await theSession.Events.FetchForWriting<Payment2>(id);
stream.Aggregate.Id.Value.Value.ShouldBe(id);
}

[Fact]
public async Task can_utilize_strong_typed_id_in_with_inline_aggregations()
{
Expand Down
6 changes: 2 additions & 4 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Linq.QueryHandlers;
Expand Down Expand Up @@ -151,10 +152,7 @@ private IAggregateFetchPlan<TDoc, TId> findFetchPlan<TDoc, TId>() where TDoc : c
return (IAggregateFetchPlan<TDoc, TId>)stored;
}

// All the IDocumentStorage types are codegen'd
// ReSharper disable once SuspiciousTypeConversion.Global
var documentProvider = _store.Options.Providers.StorageFor<TDoc>();
var storage = (IDocumentStorage<TDoc, TId>)documentProvider.IdentityMap;
var storage = _store.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);

var plan = determineFetchPlan(storage, _session.Options);

Expand Down
4 changes: 2 additions & 2 deletions src/Marten/Events/Fetching/FetchInlinedPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio
IDocumentStorage<TDoc, TId> storage = null;
if (session.Options.Events.UseIdentityMapForInlineAggregates)
{
storage = (IDocumentStorage<TDoc, TId>)session.Options.Providers.StorageFor<TDoc>().IdentityMap;
storage = session.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);
// Opt into the identity map mechanics for this aggregate type just in case
// you're using a lightweight session
session.UseIdentityMapFor<TDoc>();
}
else
{
storage = session.StorageFor<TDoc, TId>();
storage = session.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(session.TrackingMode);
}

await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
Expand Down
18 changes: 18 additions & 0 deletions src/Marten/Internal/Storage/IDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ public DocumentProvider(IBulkLoader<T>? bulkLoader, IDocumentStorage<T> queryOnl
public IDocumentStorage<T> Lightweight { get; }
public IDocumentStorage<T> IdentityMap { get; }
public IDocumentStorage<T> DirtyTracking { get; }

public IDocumentStorage<T> Select(DocumentTracking tracking)
{
switch (tracking)
{
case DocumentTracking.None:
return Lightweight;
case DocumentTracking.QueryOnly:
return QueryOnly;
case DocumentTracking.DirtyTracking:
return DirtyTracking;
case DocumentTracking.IdentityOnly:
return IdentityMap;

default:
throw new ArgumentOutOfRangeException(nameof(tracking));
}
}
}

public interface IDocumentStorage<T>: IDocumentStorage where T : notnull
Expand Down
151 changes: 151 additions & 0 deletions src/Marten/Internal/ValueTypeInfo.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FastExpressionCompiler;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Internal.Operations;
using Marten.Internal.Storage;
using Marten.Linq;
using Marten.Linq.Members;
using Marten.Linq.Members.ValueCollections;
using Marten.Linq.QueryHandlers;
using Marten.Linq.Selectors;
using Marten.Linq.SqlGeneration;
using Marten.Services;
using Marten.Storage;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;
using Weasel.Postgresql.SqlGeneration;

namespace Marten.Internal;

Expand Down Expand Up @@ -110,3 +127,137 @@ public ValueTypeElementMember(Type declaringType, Type reflectedType) : base(dec
{
}
}

internal class ValueTypeIdentifiedDocumentStorage<TDoc, TSimple, TValueType>: IDocumentStorage<TDoc, TSimple>
{
private readonly IDocumentStorage<TDoc, TValueType> _inner;
private readonly Func<TSimple, TValueType> _converter;
private readonly Func<TValueType,TSimple> _unwrapper;

public ValueTypeIdentifiedDocumentStorage(ValueTypeInfo valueTypeInfo, IDocumentStorage<TDoc, TValueType> inner)
{
_inner = inner;

_converter = valueTypeInfo.CreateConverter<TValueType, TSimple>();
_unwrapper = valueTypeInfo.ValueAccessor<TValueType, TSimple>();
}

public void Apply(ICommandBuilder builder) => _inner.Apply(builder);

public string FromObject => _inner.FromObject;
public Type SelectedType => _inner.SelectedType;
public string[] SelectFields() => _inner.SelectFields();

public ISelector BuildSelector(IMartenSession session) => _inner.BuildSelector(session);

public IQueryHandler<T> BuildHandler<T>(IMartenSession session, ISqlFragment topStatement,
ISqlFragment currentStatement)
=> _inner.BuildHandler<T>(session, topStatement, currentStatement);

public ISelectClause UseStatistics(QueryStatistics statistics)
=> _inner.UseStatistics(statistics);

public Type SourceType => _inner.SourceType;
public Type IdType => _inner.IdType;
public bool UseOptimisticConcurrency => _inner.UseOptimisticConcurrency;
public IOperationFragment DeleteFragment => _inner.DeleteFragment;
public IOperationFragment HardDeleteFragment => _inner.HardDeleteFragment;
public IReadOnlyList<DuplicatedField> DuplicatedFields => _inner.DuplicatedFields;
public DbObjectName TableName => _inner.TableName;
public Type DocumentType => _inner.DocumentType;
public TenancyStyle TenancyStyle => _inner.TenancyStyle;

public Task TruncateDocumentStorageAsync(IMartenDatabase database, CancellationToken ct = default)
=> _inner.TruncateDocumentStorageAsync(database, ct);

public void TruncateDocumentStorage(IMartenDatabase database) => _inner.TruncateDocumentStorage(database);

public ISqlFragment FilterDocuments(ISqlFragment query, IMartenSession session)
=> _inner.FilterDocuments(query, session);

public ISqlFragment DefaultWhereFragment()
=> _inner.DefaultWhereFragment();

public IQueryableMemberCollection QueryMembers => _inner.QueryMembers;
public ISelectClause SelectClauseWithDuplicatedFields => _inner.SelectClauseWithDuplicatedFields;
public bool UseNumericRevisions => _inner.UseNumericRevisions;
public object RawIdentityValue(object id) => _inner.RawIdentityValue(id);

public object IdentityFor(TDoc document) => _inner.IdentityFor(document);

public Guid? VersionFor(TDoc document, IMartenSession session) => _inner.VersionFor(document, session);

public void Store(IMartenSession session, TDoc document) => _inner.Store(session, document);

public void Store(IMartenSession session, TDoc document, Guid? version) => _inner.Store(session, document, version);

public void Store(IMartenSession session, TDoc document, int revision) => _inner.Store(session, document, revision);

public void Eject(IMartenSession session, TDoc document) => _inner.Eject(session, document);

public IStorageOperation Update(TDoc document, IMartenSession session, string tenantId) =>
_inner.Update(document, session, tenantId);

public IStorageOperation Insert(TDoc document, IMartenSession session, string tenantId)
=> _inner.Insert(document, session, tenantId);

public IStorageOperation Upsert(TDoc document, IMartenSession session, string tenantId)
=> _inner.Upsert(document, session, tenantId);

public IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenantId)
=> _inner.Overwrite(document, session, tenantId);

public IDeletion DeleteForDocument(TDoc document, string tenantId)
=> _inner.DeleteForDocument(document, tenantId);

public void EjectById(IMartenSession session, object id)
=> _inner.EjectById(session, id);

public void RemoveDirtyTracker(IMartenSession session, object id)
=> _inner.RemoveDirtyTracker(session, id);

public IDeletion HardDeleteForDocument(TDoc document, string tenantId)
=> _inner.HardDeleteForDocument(document, tenantId);

public void SetIdentityFromString(TDoc document, string identityString)
=> _inner.SetIdentityFromString(document, identityString);

public void SetIdentityFromGuid(TDoc document, Guid identityGuid)
=> _inner.SetIdentityFromGuid(document, identityGuid);

public void SetIdentity(TDoc document, TSimple identity)
=> _inner.SetIdentity(document, _converter(identity));

public IDeletion DeleteForId(TSimple id, string tenantId)
=> _inner.DeleteForId(_converter(id), tenantId);

public TDoc Load(TSimple id, IMartenSession session)
=> _inner.Load(_converter(id), session);

public Task<TDoc> LoadAsync(TSimple id, IMartenSession session, CancellationToken token)
=> _inner.LoadAsync(_converter(id), session, token);

public IReadOnlyList<TDoc> LoadMany(TSimple[] ids, IMartenSession session)
=> _inner.LoadMany(ids.Select(_converter).ToArray(), session);

public Task<IReadOnlyList<TDoc>> LoadManyAsync(TSimple[] ids, IMartenSession session, CancellationToken token)
=> _inner.LoadManyAsync(ids.Select(_converter).ToArray(), session, token);

public TSimple AssignIdentity(TDoc document, string tenantId, IMartenDatabase database)
=> _unwrapper(_inner.AssignIdentity(document, tenantId, database));

public TSimple Identity(TDoc document) => _unwrapper(_inner.Identity(document));

public ISqlFragment ByIdFilter(TSimple id) => _inner.ByIdFilter(_converter(id));

public IDeletion HardDeleteForId(TSimple id, string tenantId)
=> _inner.HardDeleteForId(_converter(id), tenantId);

public NpgsqlCommand BuildLoadCommand(TSimple id, string tenantId)
=> _inner.BuildLoadCommand(_converter(id), tenantId);

public NpgsqlCommand BuildLoadManyCommand(TSimple[] ids, string tenantId)
=> _inner.BuildLoadManyCommand(ids.Select(_converter).ToArray(), tenantId);

public object RawIdentityValue(TSimple id) => id;
}
19 changes: 19 additions & 0 deletions src/Marten/StoreOptions.Identity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using JasperFx.Core.Reflection;
using Marten.Exceptions;
using Marten.Internal;
using Marten.Internal.Storage;
using Marten.Linq.Members;
using Marten.Schema.Identity;
using Marten.Schema.Identity.Sequences;
Expand All @@ -15,6 +16,24 @@ namespace Marten;

public partial class StoreOptions
{
internal IDocumentStorage<TDoc, TId> ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking tracking)
{
var provider = Providers.StorageFor<TDoc>();
var raw = provider.Select(tracking);

if (raw is IDocumentStorage<TDoc, TId> storage) return storage;

var valueTypeInfo = TryFindValueType(raw.IdType);
if (valueTypeInfo == null)
throw new InvalidOperationException(
$"Invalid identifier type for aggregate {typeof(TDoc).FullNameInCode()}. Id type is {raw.IdType.FullNameInCode()}");

return typeof(ValueTypeIdentifiedDocumentStorage<,,>).CloseAndBuildAs<IDocumentStorage<TDoc, TId>>(
valueTypeInfo, raw, typeof(TDoc), typeof(TId),
raw.IdType);
}


internal IIdGeneration DetermineIdStrategy(Type documentType, MemberInfo idMember)
{
var idType = idMember.GetMemberType();
Expand Down
Loading