diff --git a/docs/documents/multi-tenancy.md b/docs/documents/multi-tenancy.md
index d6befb8edb..9da861f73f 100644
--- a/docs/documents/multi-tenancy.md
+++ b/docs/documents/multi-tenancy.md
@@ -446,7 +446,7 @@ builder.Services.AddMarten(opts =>
opts.Policies.PartitionMultiTenantedDocumentsUsingMartenManagement("tenants");
});
```
-snippet source | anchor
+snippet source | anchor
The tenant to partition name mapping will be stored in a table created by Marten called `mt_tenant_partitions` with
@@ -467,7 +467,7 @@ await theStore
// with the named tenant ids
.AddMartenManagedTenantsAsync(CancellationToken.None, "a1", "a2", "a3");
```
-snippet source | anchor
+snippet source | anchor
The API above will try to add any missing table partitions to all known document types. There is also a separate overload
@@ -492,7 +492,7 @@ public class DocThatShouldBeExempted1
public Guid Id { get; set; }
}
```
-snippet source | anchor
+snippet source | anchor
or exempt a single document type through the fluent interface:
@@ -502,7 +502,7 @@ or exempt a single document type through the fluent interface:
```cs
opts.Schema.For().DoNotPartition();
```
-snippet source | anchor
+snippet source | anchor
## Implementation Details
diff --git a/docs/events/protection.md b/docs/events/protection.md
index c7d0dae1e5..f75dda35fa 100644
--- a/docs/events/protection.md
+++ b/docs/events/protection.md
@@ -38,7 +38,7 @@ builder.Services.AddMarten(opts =>
});
});
```
-snippet source | anchor
+snippet source | anchor
That's strictly a configuration time effort. Next, you can apply the masking on demand to any subset of events with
@@ -61,7 +61,31 @@ public static Task apply_masking_to_streams(IDocumentStore store, Guid streamId,
}, token);
}
```
-snippet source | anchor
+snippet source | anchor
+
+
+As a finer grained operation, you can specify an event filter (`Func`) within an event stream to be masked with
+this overload:
+
+
+
+```cs
+public static Task apply_masking_to_streams_and_filter(IDocumentStore store, Guid streamId, CancellationToken token)
+{
+ return store
+ .Advanced
+ .ApplyEventDataMasking(x =>
+ {
+ // Mask selected events within a single stream by a user defined criteria
+ x.IncludeStream(streamId, e => e.EventTypesAre(typeof(MembersJoined), typeof(MembersDeparted)));
+
+ // You can add or modify event metadata headers as well
+ // BUT, you'll of course need event header tracking to be enabled
+ x.AddHeader("masked", DateTimeOffset.UtcNow);
+ }, token);
+}
+```
+snippet source | anchor
::: tip
@@ -82,7 +106,7 @@ public static Task apply_masking_by_filter(IDocumentStore store, Guid[] streamId
});
}
```
-snippet source | anchor
+snippet source | anchor
Finally, if you are using multi-tenancy, you can specify the tenant id as part of the same fluent interface:
@@ -104,7 +128,7 @@ public static Task apply_masking_by_tenant(IDocumentStore store, string tenantId
});
}
```
-snippet source | anchor
+snippet source | anchor
Here's a couple more facts you might need to know:
diff --git a/src/EventSourcingTests/removing_protected_information.cs b/src/EventSourcingTests/removing_protected_information.cs
index a1d9241438..4ba6f734c7 100644
--- a/src/EventSourcingTests/removing_protected_information.cs
+++ b/src/EventSourcingTests/removing_protected_information.cs
@@ -345,7 +345,100 @@ await theStore.Advanced.ApplyEventDataMasking(x =>
.Single().Name.ShouldBe("*****");
}
+ [Fact]
+ public async Task end_to_end_masking_by_guid_identified_stream_and_filter_within_stream()
+ {
+ StoreOptions(opts =>
+ {
+ opts.Events.AddMaskingRuleForProtectedInformation(x => x.Name = "*****");
+ opts.Events.AddMaskingRuleForProtectedInformation(x =>
+ {
+ for (int i = 0; i < x.Members.Length; i++)
+ {
+ x.Members[i] = "*****";
+ }
+ });
+ opts.Events.MetadataConfig.HeadersEnabled = true;
+ });
+
+ theSession.SetHeader("color", "blue");
+
+ var streamId = Guid.NewGuid();
+ theSession.Events.StartStream(streamId, new QuestStarted{Name = "Find Gandalf"}, new MembersJoined(1, "Hobbiton", "Frodo", "Sam"), new MembersJoined(3, "Brandybuck", "Merry", "Pippin"));
+ await theSession.SaveChangesAsync();
+
+ theSession.Events.Append(streamId, new MembersDeparted { Members = new string[] { "Frodo" } });
+ await theSession.SaveChangesAsync();
+
+ await theStore.Advanced.ApplyEventDataMasking(x =>
+ {
+ x
+ .IncludeStream(streamId, e => e.Data is MembersJoined { Location: "Hobbiton" })
+ .AddHeader("color", "green")
+ .AddHeader("opid", 1);
+ });
+
+ var events = await theSession.Events.FetchStreamAsync(streamId);
+
+ // Should have matched and been masked
+ var hobbiton = events.OfType>().Single(x => x.Data.Location == "Hobbiton");
+ hobbiton.Headers["color"].ShouldBe("green");
+ hobbiton.Data.Members.All(x => x == "*****").ShouldBeTrue();
+
+
+ // Should NOT have been matched or masked
+ var brandybuck = events.OfType>().Single(x => x.Data.Location == "Brandybuck");
+ brandybuck.Headers["color"].ShouldBe("blue");
+ brandybuck.Data.Members.All(x => x != "*****").ShouldBeTrue();
+ }
+
+ [Fact]
+ public async Task end_to_end_masking_by_string_identified_stream_and_filter_within_stream()
+ {
+ StoreOptions(opts =>
+ {
+ opts.Events.StreamIdentity = StreamIdentity.AsString;
+ opts.Events.AddMaskingRuleForProtectedInformation(x => x.Name = "*****");
+ opts.Events.AddMaskingRuleForProtectedInformation(x =>
+ {
+ for (int i = 0; i < x.Members.Length; i++)
+ {
+ x.Members[i] = "*****";
+ }
+ });
+ opts.Events.MetadataConfig.HeadersEnabled = true;
+ });
+
+ theSession.SetHeader("color", "blue");
+
+ var streamId = Guid.NewGuid().ToString();
+ theSession.Events.StartStream(streamId, new QuestStarted{Name = "Find Gandalf"}, new MembersJoined(1, "Hobbiton", "Frodo", "Sam"), new MembersJoined(3, "Brandybuck", "Merry", "Pippin"));
+ await theSession.SaveChangesAsync();
+
+ theSession.Events.Append(streamId, new MembersDeparted { Members = new string[] { "Frodo" } });
+ await theSession.SaveChangesAsync();
+
+ await theStore.Advanced.ApplyEventDataMasking(x =>
+ {
+ x
+ .IncludeStream(streamId, e => e.Data is MembersJoined { Location: "Hobbiton" })
+ .AddHeader("color", "green")
+ .AddHeader("opid", 1);
+ });
+ var events = await theSession.Events.FetchStreamAsync(streamId);
+
+ // Should have matched and been masked
+ var hobbiton = events.OfType>().Single(x => x.Data.Location == "Hobbiton");
+ hobbiton.Headers["color"].ShouldBe("green");
+ hobbiton.Data.Members.All(x => x == "*****").ShouldBeTrue();
+
+
+ // Should NOT have been matched or masked
+ var brandybuck = events.OfType>().Single(x => x.Data.Location == "Brandybuck");
+ brandybuck.Headers["color"].ShouldBe("blue");
+ brandybuck.Data.Members.All(x => x != "*****").ShouldBeTrue();
+ }
}
@@ -413,6 +506,26 @@ public static Task apply_masking_to_streams(IDocumentStore store, Guid streamId,
#endregion
+ #region sample_apply_masking_to_a_single_stream_and_filter
+
+ public static Task apply_masking_to_streams_and_filter(IDocumentStore store, Guid streamId, CancellationToken token)
+ {
+ return store
+ .Advanced
+ .ApplyEventDataMasking(x =>
+ {
+ // Mask selected events within a single stream by a user defined criteria
+ x.IncludeStream(streamId, e => e.EventTypesAre(typeof(MembersJoined), typeof(MembersDeparted)));
+
+ // You can add or modify event metadata headers as well
+ // BUT, you'll of course need event header tracking to be enabled
+ x.AddHeader("masked", DateTimeOffset.UtcNow);
+ }, token);
+ }
+
+ #endregion
+
+
#region sample_apply_masking_by_filter
public static Task apply_masking_by_filter(IDocumentStore store, Guid[] streamIds)
diff --git a/src/Marten/Events/Protected/EventDataMasking.cs b/src/Marten/Events/Protected/EventDataMasking.cs
index 890d72d160..bfa9247218 100644
--- a/src/Marten/Events/Protected/EventDataMasking.cs
+++ b/src/Marten/Events/Protected/EventDataMasking.cs
@@ -37,6 +37,28 @@ public IEventDataMasking IncludeStream(string streamKey)
return this;
}
+ public IEventDataMasking IncludeStream(Guid streamId, Func filter)
+ {
+ _sources.Add(async (s, t) =>
+ {
+ var raw = await s.Events.FetchStreamAsync(streamId, token: t).ConfigureAwait(false);
+ return raw.Where(filter).ToList();
+ });
+
+ return this;
+ }
+
+ public IEventDataMasking IncludeStream(string streamKey, Func filter)
+ {
+ _sources.Add(async (s, t) =>
+ {
+ var raw = await s.Events.FetchStreamAsync(streamKey, token: t).ConfigureAwait(false);
+ return raw.Where(filter).ToList();
+ });
+
+ return this;
+ }
+
public IEventDataMasking IncludeEvents(Expression> filter)
{
_sources.Add((s, t) => s.Events.QueryAllRawEvents().Where(filter).ToListAsync(t));
@@ -108,6 +130,22 @@ public interface IEventDataMasking
///
IEventDataMasking IncludeStream(string streamKey);
+ ///
+ /// Apply data protection masking to this event stream
+ ///
+ ///
+ /// Further filter events within the stream to more finely target events for masking
+ ///
+ IEventDataMasking IncludeStream(Guid streamId, Func filter);
+
+ ///
+ /// Apply data protection masking to this event stream
+ ///
+ ///
+ /// Further filter events within the stream to more finely target events for masking
+ ///
+ IEventDataMasking IncludeStream(string streamKey, Func filter);
+
///
/// Apply data protection masking to events matching
/// this criteria