Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong session-tenacity used in Custom Aggregation Projections #2764

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Internal.Sessions;
using Marten.Storage;
using Marten.Testing.Harness;
using Shouldly;
Expand Down Expand Up @@ -90,6 +93,43 @@ async Task AssertGlobalProjectionUpdatedForTenant()
resource.TotalResourcesCount.ShouldBe(2);
}
}

[Fact]
public async Task ForEventsAppendedToTenantedSession_CustomProjection()
{
StoreOptions(opts =>
{
opts.Policies.AllDocumentsAreMultiTenanted();
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true;

opts.Schema.For<CompanyLocation>().SoftDeleted();
opts.Projections.Add(new CompanyLocationCustomProjection(), ProjectionLifecycle.Inline);
});

var tenantId = Guid.NewGuid().ToString();
var companyLocationId = Guid.NewGuid();
var companyLocationName = "New York";

CompanyLocationCustomProjection.ExpectedTenant = tenantId;

// theSession is for the default-tenant
// we switch to another tenant, and append events there
// the projected document should also be saved for that other tenant, NOT the default-tenant
theSession.ForTenant(tenantId).Events.StartStream(companyLocationId, new CompanyLocationCreated(companyLocationName));

await theSession.SaveChangesAsync();

var defaultTenantCompanyLocations = await theSession.Query<CompanyLocation>().ToListAsync();
defaultTenantCompanyLocations.ShouldBeEmpty();

var otherTenantCompanyLocations = await theSession.ForTenant(tenantId).Query<CompanyLocation>().ToListAsync();
var singleCompanyLocation = otherTenantCompanyLocations.SingleOrDefault();

singleCompanyLocation.ShouldNotBeNull();
singleCompanyLocation.Id.ShouldBe(companyLocationId);
singleCompanyLocation.Name.ShouldBe(companyLocationName);
}
}

public record Event;
Expand Down Expand Up @@ -150,3 +190,61 @@ public void Apply(ResourceCreatedEvent e, ResourcesGlobalSummary resourceGlobal)
public void Apply(ResourceRemovedEvent e, ResourcesGlobalSummary resourceGlobal) =>
resourceGlobal.TotalResourcesCount--;
}

public record CompanyLocation
{
public Guid Id { get; set; }
public string Name { get; set; }
}
public record CompanyLocationCreated(string Name);
public record CompanyLocationUpdated(string NewName);
public record CompanyLocationDeleted();

public class CompanyLocationCustomProjection : CustomProjection<CompanyLocation, Guid>
{
public static string ExpectedTenant;

public CompanyLocationCustomProjection()
{
this.AggregateByStream();

this.IncludeType<CompanyLocationCreated>();
this.IncludeType<CompanyLocationUpdated>();
this.IncludeType<CompanyLocationDeleted>();
}

public override ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice<CompanyLocation, Guid> slice, CancellationToken cancellation, ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
{
var location = slice.Aggregate;

// The session and the slice should be for the same tenant
session.TenantId.ShouldBe(ExpectedTenant);
slice.Tenant.TenantId.ShouldBe(ExpectedTenant);
session.TenantId.ShouldBe(slice.Tenant.TenantId);

foreach (var data in slice.AllData())
{
switch (data)
{
case CompanyLocationCreated c:
location = new CompanyLocation
{
Id = slice.Id,
Name = c.Name,
};
session.Store(location);
break;

case CompanyLocationUpdated u:
location.Name = u.NewName;
break;

case CompanyLocationDeleted d:
session.Delete(location);
break;
}
}

return ValueTask.CompletedTask;
}
}
7 changes: 5 additions & 2 deletions src/Marten/Events/Aggregation/CustomProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Services;
using Marten.Sessions;
using Marten.Storage;

namespace Marten.Events.Aggregation;
Expand Down Expand Up @@ -50,8 +51,10 @@ async Task IProjection.ApplyAsync(IDocumentOperations operations, IReadOnlyList<
var storage = (IDocumentStorage<TDoc, TId>)martenSession.StorageFor<TDoc>();
foreach (var slice in slices)
{
slice.Aggregate = await storage.LoadAsync(slice.Id, martenSession, cancellation).ConfigureAwait(false);
await ApplyChangesAsync(martenSession, slice, cancellation).ConfigureAwait(false);
var tenantedSession = martenSession.UseTenancyBasedOnSliceAndStorage(storage, slice);

slice.Aggregate = await storage.LoadAsync(slice.Id, tenantedSession, cancellation).ConfigureAwait(false);
await ApplyChangesAsync(tenantedSession, slice, cancellation).ConfigureAwait(false);
}
}

Expand Down
Loading