Skip to content

Commit

Permalink
Using the tenant id as part of FetchForWriting/Latest when using asyn…
Browse files Browse the repository at this point in the history
…c projections
  • Loading branch information
jeremydmiller committed Jan 7, 2025
1 parent 01c4349 commit 1336de9
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions src/Marten/Events/Fetching/FetchAsyncPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Marten.Internal.Storage;
using Marten.Linq.QueryHandlers;
using Marten.Schema;
using Marten.Storage;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;
Expand Down Expand Up @@ -73,8 +74,18 @@ public FetchAsyncPlan(EventGraph events, IEventIdentityStrategy<TId> identityStr
_storage = storage;
_aggregator = _events.Options.Projections.AggregatorFor<TDoc>();

_versionSelectionSql =
$" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = ";
if (_events.TenancyStyle == TenancyStyle.Single)
{
_versionSelectionSql =
$" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = ";
}
else
{
_versionSelectionSql =
$" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id and d.tenant_id = a.tenant_id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = ";
}


}

public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default)
Expand Down Expand Up @@ -177,6 +188,14 @@ private void writeEventFetchStatement(TId id,
builder.Append(_initialSql);
builder.Append(_versionSelectionSql);
builder.AppendParameter(id);

// You must do this for performance even if the stream ids were
// magically unique across tenants
if (_events.TenancyStyle == TenancyStyle.Conjoined)
{
builder.Append(" and d.tenant_id = ");
builder.AppendParameter(builder.TenantId);
}
}

public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, long expectedStartingVersion,
Expand Down

0 comments on commit 1336de9

Please sign in to comment.