Skip to content

Commit

Permalink
Merge branch 'JasperFx:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
ericgreenmix authored Oct 28, 2024
2 parents df12919 + 8f8f050 commit 7822459
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project>
<PropertyGroup>
<Version>7.30.3</Version>
<Version>7.31.0</Version>
<LangVersion>12.0</LangVersion>
<Authors>Jeremy D. Miller;Babu Annamalai;Oskar Dudycz;Joona-Pekka Kokko</Authors>
<PackageIconUrl>https://martendb.io/logo.png</PackageIconUrl>
Expand Down
8 changes: 4 additions & 4 deletions docs/documents/multi-tenancy.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ builder.Services.AddMarten(opts =>
opts.Policies.PartitionMultiTenantedDocumentsUsingMartenManagement("tenants");
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L151-L168' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_marten_managed_tenant_partitioning' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L189-L206' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_marten_managed_tenant_partitioning' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The tenant to partition name mapping will be stored in a table created by Marten called `mt_tenant_partitions` with
Expand All @@ -467,7 +467,7 @@ await theStore
// with the named tenant ids
.AddMartenManagedTenantsAsync(CancellationToken.None, "a1", "a2", "a3");
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L57-L65' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_add_managed_tenants_at_runtime' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L58-L66' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_add_managed_tenants_at_runtime' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The API above will try to add any missing table partitions to all known document types. There is also a separate overload
Expand All @@ -492,7 +492,7 @@ public class DocThatShouldBeExempted1
public Guid Id { get; set; }
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L222-L230' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_donotpartitionattribute' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L260-L268' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_donotpartitionattribute' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

or exempt a single document type through the fluent interface:
Expand All @@ -502,7 +502,7 @@ or exempt a single document type through the fluent interface:
```cs
opts.Schema.For<DocThatShouldBeExempted2>().DoNotPartition();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L207-L211' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_exempt_from_partitioning_through_fluent_interface' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L245-L249' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_exempt_from_partitioning_through_fluent_interface' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Implementation Details
Expand Down
32 changes: 28 additions & 4 deletions docs/events/protection.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ builder.Services.AddMarten(opts =>
});
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L367-L395' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_defining_masking_rules' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L460-L488' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_defining_masking_rules' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

That's strictly a configuration time effort. Next, you can apply the masking on demand to any subset of events with
Expand All @@ -61,7 +61,31 @@ public static Task apply_masking_to_streams(IDocumentStore store, Guid streamId,
}, token);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L398-L414' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_to_a_single_stream' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L491-L507' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_to_a_single_stream' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

As a finer grained operation, you can specify an event filter (`Func<IEvent, bool>`) within an event stream to be masked with
this overload:

<!-- snippet: sample_apply_masking_to_a_single_stream_and_filter -->
<a id='snippet-sample_apply_masking_to_a_single_stream_and_filter'></a>
```cs
public static Task apply_masking_to_streams_and_filter(IDocumentStore store, Guid streamId, CancellationToken token)
{
return store
.Advanced
.ApplyEventDataMasking(x =>
{
// Mask selected events within a single stream by a user defined criteria
x.IncludeStream(streamId, e => e.EventTypesAre(typeof(MembersJoined), typeof(MembersDeparted)));

// You can add or modify event metadata headers as well
// BUT, you'll of course need event header tracking to be enabled
x.AddHeader("masked", DateTimeOffset.UtcNow);
}, token);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L509-L526' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_to_a_single_stream_and_filter' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: tip
Expand All @@ -82,7 +106,7 @@ public static Task apply_masking_by_filter(IDocumentStore store, Guid[] streamId
});
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L416-L426' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_by_filter' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L529-L539' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_by_filter' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Finally, if you are using multi-tenancy, you can specify the tenant id as part of the same fluent interface:
Expand All @@ -104,7 +128,7 @@ public static Task apply_masking_by_tenant(IDocumentStore store, string tenantId
});
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L428-L444' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_with_multi_tenancy' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/removing_protected_information.cs#L541-L557' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_masking_with_multi_tenancy' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Here's a couple more facts you might need to know:
Expand Down
49 changes: 49 additions & 0 deletions src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using JasperFx.Core;
using Lamar.IoC.Instances;
using Marten;
Expand All @@ -15,6 +16,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using NSubstitute;
using Shouldly;
using Weasel.Postgresql;
using Xunit;
Expand Down Expand Up @@ -446,6 +448,53 @@ public async Task use_from_service_provider_as_scoped_with_martenStore()

await store.Advanced.Clean.DeleteAllEventDataAsync();
}

[Fact]
public void subscription_wrapper_copies_the_filters_from_subscription_base()
{
var services = new ServiceCollection();
services.AddScoped<FilteredSubscription>();

var provider = services.BuildServiceProvider();

var wrapper = new ScopedSubscriptionServiceWrapper<FilteredSubscription>(provider);

wrapper.IncludeArchivedEvents.ShouldBeTrue();
wrapper.IncludedEventTypes.Count.ShouldBe(2);
wrapper.IncludedEventTypes.ShouldContain(typeof(AEvent));
wrapper.IncludedEventTypes.ShouldContain(typeof(BEvent));
}
}

public class FilteredSubscription: SubscriptionBase, IDisposable
{
public FilteredSubscription()
{
IncludeType<AEvent>();
IncludeType<BEvent>();
StreamType = typeof(SimpleAggregate);
IncludeArchivedEvents = true;
}

public override Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
CancellationToken cancellationToken)
{
return Task.FromResult(Substitute.For<IChangeListener>());
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// TODO release managed resources here
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}

public class SimpleSubscription: ISubscription
Expand Down
113 changes: 113 additions & 0 deletions src/EventSourcingTests/removing_protected_information.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,100 @@ await theStore.Advanced.ApplyEventDataMasking(x =>
.Single().Name.ShouldBe("*****");
}

[Fact]
public async Task end_to_end_masking_by_guid_identified_stream_and_filter_within_stream()
{
StoreOptions(opts =>
{
opts.Events.AddMaskingRuleForProtectedInformation<QuestStarted>(x => x.Name = "*****");
opts.Events.AddMaskingRuleForProtectedInformation<MembersJoined>(x =>
{
for (int i = 0; i < x.Members.Length; i++)
{
x.Members[i] = "*****";
}
});
opts.Events.MetadataConfig.HeadersEnabled = true;
});

theSession.SetHeader("color", "blue");

var streamId = Guid.NewGuid();
theSession.Events.StartStream<Quest>(streamId, new QuestStarted{Name = "Find Gandalf"}, new MembersJoined(1, "Hobbiton", "Frodo", "Sam"), new MembersJoined(3, "Brandybuck", "Merry", "Pippin"));
await theSession.SaveChangesAsync();

theSession.Events.Append(streamId, new MembersDeparted { Members = new string[] { "Frodo" } });
await theSession.SaveChangesAsync();

await theStore.Advanced.ApplyEventDataMasking(x =>
{
x
.IncludeStream(streamId, e => e.Data is MembersJoined { Location: "Hobbiton" })
.AddHeader("color", "green")
.AddHeader("opid", 1);
});

var events = await theSession.Events.FetchStreamAsync(streamId);

// Should have matched and been masked
var hobbiton = events.OfType<Event<MembersJoined>>().Single(x => x.Data.Location == "Hobbiton");
hobbiton.Headers["color"].ShouldBe("green");
hobbiton.Data.Members.All(x => x == "*****").ShouldBeTrue();


// Should NOT have been matched or masked
var brandybuck = events.OfType<Event<MembersJoined>>().Single(x => x.Data.Location == "Brandybuck");
brandybuck.Headers["color"].ShouldBe("blue");
brandybuck.Data.Members.All(x => x != "*****").ShouldBeTrue();
}

[Fact]
public async Task end_to_end_masking_by_string_identified_stream_and_filter_within_stream()
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsString;
opts.Events.AddMaskingRuleForProtectedInformation<QuestStarted>(x => x.Name = "*****");
opts.Events.AddMaskingRuleForProtectedInformation<MembersJoined>(x =>
{
for (int i = 0; i < x.Members.Length; i++)
{
x.Members[i] = "*****";
}
});
opts.Events.MetadataConfig.HeadersEnabled = true;
});

theSession.SetHeader("color", "blue");

var streamId = Guid.NewGuid().ToString();
theSession.Events.StartStream<Quest>(streamId, new QuestStarted{Name = "Find Gandalf"}, new MembersJoined(1, "Hobbiton", "Frodo", "Sam"), new MembersJoined(3, "Brandybuck", "Merry", "Pippin"));
await theSession.SaveChangesAsync();

theSession.Events.Append(streamId, new MembersDeparted { Members = new string[] { "Frodo" } });
await theSession.SaveChangesAsync();

await theStore.Advanced.ApplyEventDataMasking(x =>
{
x
.IncludeStream(streamId, e => e.Data is MembersJoined { Location: "Hobbiton" })
.AddHeader("color", "green")
.AddHeader("opid", 1);
});

var events = await theSession.Events.FetchStreamAsync(streamId);

// Should have matched and been masked
var hobbiton = events.OfType<Event<MembersJoined>>().Single(x => x.Data.Location == "Hobbiton");
hobbiton.Headers["color"].ShouldBe("green");
hobbiton.Data.Members.All(x => x == "*****").ShouldBeTrue();


// Should NOT have been matched or masked
var brandybuck = events.OfType<Event<MembersJoined>>().Single(x => x.Data.Location == "Brandybuck");
brandybuck.Headers["color"].ShouldBe("blue");
brandybuck.Data.Members.All(x => x != "*****").ShouldBeTrue();
}
}


Expand Down Expand Up @@ -413,6 +506,26 @@ public static Task apply_masking_to_streams(IDocumentStore store, Guid streamId,

#endregion

#region sample_apply_masking_to_a_single_stream_and_filter

public static Task apply_masking_to_streams_and_filter(IDocumentStore store, Guid streamId, CancellationToken token)
{
return store
.Advanced
.ApplyEventDataMasking(x =>
{
// Mask selected events within a single stream by a user defined criteria
x.IncludeStream(streamId, e => e.EventTypesAre(typeof(MembersJoined), typeof(MembersDeparted)));

// You can add or modify event metadata headers as well
// BUT, you'll of course need event header tracking to be enabled
x.AddHeader("masked", DateTimeOffset.UtcNow);
}, token);
}

#endregion


#region sample_apply_masking_by_filter

public static Task apply_masking_by_filter(IDocumentStore store, Guid[] streamIds)
Expand Down
38 changes: 38 additions & 0 deletions src/Marten/Events/Protected/EventDataMasking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ public IEventDataMasking IncludeStream(string streamKey)
return this;
}

public IEventDataMasking IncludeStream(Guid streamId, Func<IEvent, bool> filter)
{
_sources.Add(async (s, t) =>
{
var raw = await s.Events.FetchStreamAsync(streamId, token: t).ConfigureAwait(false);
return raw.Where(filter).ToList();
});

return this;
}

public IEventDataMasking IncludeStream(string streamKey, Func<IEvent, bool> filter)
{
_sources.Add(async (s, t) =>
{
var raw = await s.Events.FetchStreamAsync(streamKey, token: t).ConfigureAwait(false);
return raw.Where(filter).ToList();
});

return this;
}

public IEventDataMasking IncludeEvents(Expression<Func<IEvent, bool>> filter)
{
_sources.Add((s, t) => s.Events.QueryAllRawEvents().Where(filter).ToListAsync(t));
Expand Down Expand Up @@ -108,6 +130,22 @@ public interface IEventDataMasking
/// <returns></returns>
IEventDataMasking IncludeStream(string streamKey);

/// <summary>
/// Apply data protection masking to this event stream
/// </summary>
/// <param name="streamId"></param>
/// <param name="filter">Further filter events within the stream to more finely target events for masking</param>
/// <returns></returns>
IEventDataMasking IncludeStream(Guid streamId, Func<IEvent, bool> filter);

/// <summary>
/// Apply data protection masking to this event stream
/// </summary>
/// <param name="streamKey"></param>
/// <param name="filter">Further filter events within the stream to more finely target events for masking</param>
/// <returns></returns>
IEventDataMasking IncludeStream(string streamKey, Func<IEvent, bool> filter);

/// <summary>
/// Apply data protection masking to events matching
/// this criteria
Expand Down
14 changes: 14 additions & 0 deletions src/Marten/Subscriptions/SubscriptionWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -33,6 +34,19 @@ public ScopedSubscriptionServiceWrapper(IServiceProvider provider)
{
_provider = provider;
SubscriptionName = typeof(T).Name;

if (typeof(T).CanBeCastTo<SubscriptionBase>())
{
using var scope = _provider.CreateScope();
var sp = scope.ServiceProvider;

var subscription = sp.GetRequiredService<T>().As<SubscriptionBase>();
IncludedEventTypes.AddRange(subscription.IncludedEventTypes);
StreamType = subscription.StreamType;
IncludeArchivedEvents = subscription.IncludeArchivedEvents;
}


}

public override async Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller,
Expand Down

0 comments on commit 7822459

Please sign in to comment.