diff --git a/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs b/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs index 9193a8794b..ecfda9af4d 100644 --- a/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs +++ b/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs @@ -1,8 +1,11 @@ using System; +using System.Linq; +using System.Threading; using System.Threading.Tasks; using Marten; using Marten.Events.Aggregation; using Marten.Events.Projections; +using Marten.Internal.Sessions; using Marten.Storage; using Marten.Testing.Harness; using Shouldly; @@ -90,6 +93,43 @@ async Task AssertGlobalProjectionUpdatedForTenant() resource.TotalResourcesCount.ShouldBe(2); } } + + [Fact] + public async Task ForEventsAppendedToTenantedSession_CustomProjection() + { + StoreOptions(opts => + { + opts.Policies.AllDocumentsAreMultiTenanted(); + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; + + opts.Schema.For().SoftDeleted(); + opts.Projections.Add(new CompanyLocationCustomProjection(), ProjectionLifecycle.Inline); + }); + + var tenantId = Guid.NewGuid().ToString(); + var companyLocationId = Guid.NewGuid(); + var companyLocationName = "New York"; + + CompanyLocationCustomProjection.ExpectedTenant = tenantId; + + // theSession is for the default-tenant + // we switch to another tenant, and append events there + // the projected document should also be saved for that other tenant, NOT the default-tenant + theSession.ForTenant(tenantId).Events.StartStream(companyLocationId, new CompanyLocationCreated(companyLocationName)); + + await theSession.SaveChangesAsync(); + + var defaultTenantCompanyLocations = await theSession.Query().ToListAsync(); + defaultTenantCompanyLocations.ShouldBeEmpty(); + + var otherTenantCompanyLocations = await theSession.ForTenant(tenantId).Query().ToListAsync(); + var singleCompanyLocation = otherTenantCompanyLocations.SingleOrDefault(); + + singleCompanyLocation.ShouldNotBeNull(); + singleCompanyLocation.Id.ShouldBe(companyLocationId); + singleCompanyLocation.Name.ShouldBe(companyLocationName); + } } public record Event; @@ -150,3 +190,61 @@ public void Apply(ResourceCreatedEvent e, ResourcesGlobalSummary resourceGlobal) public void Apply(ResourceRemovedEvent e, ResourcesGlobalSummary resourceGlobal) => resourceGlobal.TotalResourcesCount--; } + +public record CompanyLocation +{ + public Guid Id { get; set; } + public string Name { get; set; } +} +public record CompanyLocationCreated(string Name); +public record CompanyLocationUpdated(string NewName); +public record CompanyLocationDeleted(); + +public class CompanyLocationCustomProjection : CustomProjection +{ + public static string ExpectedTenant; + + public CompanyLocationCustomProjection() + { + this.AggregateByStream(); + + this.IncludeType(); + this.IncludeType(); + this.IncludeType(); + } + + public override ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice slice, CancellationToken cancellation, ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline) + { + var location = slice.Aggregate; + + // The session and the slice should be for the same tenant + session.TenantId.ShouldBe(ExpectedTenant); + slice.Tenant.TenantId.ShouldBe(ExpectedTenant); + session.TenantId.ShouldBe(slice.Tenant.TenantId); + + foreach (var data in slice.AllData()) + { + switch (data) + { + case CompanyLocationCreated c: + location = new CompanyLocation + { + Id = slice.Id, + Name = c.Name, + }; + session.Store(location); + break; + + case CompanyLocationUpdated u: + location.Name = u.NewName; + break; + + case CompanyLocationDeleted d: + session.Delete(location); + break; + } + } + + return ValueTask.CompletedTask; + } +} diff --git a/src/Marten/Events/Aggregation/CustomProjection.cs b/src/Marten/Events/Aggregation/CustomProjection.cs index ff513b40ac..1bc022d60f 100644 --- a/src/Marten/Events/Aggregation/CustomProjection.cs +++ b/src/Marten/Events/Aggregation/CustomProjection.cs @@ -10,6 +10,7 @@ using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Services; +using Marten.Sessions; using Marten.Storage; namespace Marten.Events.Aggregation; @@ -50,8 +51,10 @@ async Task IProjection.ApplyAsync(IDocumentOperations operations, IReadOnlyList< var storage = (IDocumentStorage)martenSession.StorageFor(); foreach (var slice in slices) { - slice.Aggregate = await storage.LoadAsync(slice.Id, martenSession, cancellation).ConfigureAwait(false); - await ApplyChangesAsync(martenSession, slice, cancellation).ConfigureAwait(false); + var tenantedSession = martenSession.UseTenancyBasedOnSliceAndStorage(storage, slice); + + slice.Aggregate = await storage.LoadAsync(slice.Id, tenantedSession, cancellation).ConfigureAwait(false); + await ApplyChangesAsync(tenantedSession, slice, cancellation).ConfigureAwait(false); } }