diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.cs index 1e3e6a8847..9468587818 100644 --- a/src/Marten/Events/Fetching/FetchAsyncPlan.cs +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.cs @@ -11,6 +11,7 @@ using Marten.Internal.Storage; using Marten.Linq.QueryHandlers; using Marten.Schema; +using Marten.Storage; using Npgsql; using Weasel.Core; using Weasel.Postgresql; @@ -73,8 +74,18 @@ public FetchAsyncPlan(EventGraph events, IEventIdentityStrategy identityStr _storage = storage; _aggregator = _events.Options.Projections.AggregatorFor(); - _versionSelectionSql = - $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; + if (_events.TenancyStyle == TenancyStyle.Single) + { + _versionSelectionSql = + $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; + } + else + { + _versionSelectionSql = + $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id and d.tenant_id = a.tenant_id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; + } + + } public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default) @@ -177,6 +188,14 @@ private void writeEventFetchStatement(TId id, builder.Append(_initialSql); builder.Append(_versionSelectionSql); builder.AppendParameter(id); + + // You must do this for performance even if the stream ids were + // magically unique across tenants + if (_events.TenancyStyle == TenancyStyle.Conjoined) + { + builder.Append(" and d.tenant_id = "); + builder.AppendParameter(builder.TenantId); + } } public async Task> FetchForWriting(DocumentSessionBase session, TId id, long expectedStartingVersion,