Skip to content

Commit

Permalink
Adding a subscription based on subscription base via IoC copies the e…
Browse files Browse the repository at this point in the history
…vent filters from the inner subscription. Closes GH-3510
  • Loading branch information
jeremydmiller committed Oct 25, 2024
1 parent 9fdfc7d commit 42bb232
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
49 changes: 49 additions & 0 deletions src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using JasperFx.Core;
using Lamar.IoC.Instances;
using Marten;
Expand All @@ -15,6 +16,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using NSubstitute;
using Shouldly;
using Weasel.Postgresql;
using Xunit;
Expand Down Expand Up @@ -446,6 +448,53 @@ public async Task use_from_service_provider_as_scoped_with_martenStore()

await store.Advanced.Clean.DeleteAllEventDataAsync();
}

[Fact]
public void subscription_wrapper_copies_the_filters_from_subscription_base()
{
var services = new ServiceCollection();
services.AddScoped<FilteredSubscription>();

var provider = services.BuildServiceProvider();

var wrapper = new ScopedSubscriptionServiceWrapper<FilteredSubscription>(provider);

wrapper.IncludeArchivedEvents.ShouldBeTrue();
wrapper.IncludedEventTypes.Count.ShouldBe(2);
wrapper.IncludedEventTypes.ShouldContain(typeof(AEvent));
wrapper.IncludedEventTypes.ShouldContain(typeof(BEvent));
}
}

public class FilteredSubscription: SubscriptionBase, IDisposable
{
public FilteredSubscription()
{
IncludeType<AEvent>();
IncludeType<BEvent>();
StreamType = typeof(SimpleAggregate);
IncludeArchivedEvents = true;
}

public override Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
CancellationToken cancellationToken)
{
return Task.FromResult(Substitute.For<IChangeListener>());
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// TODO release managed resources here
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}

public class SimpleSubscription: ISubscription
Expand Down
14 changes: 14 additions & 0 deletions src/Marten/Subscriptions/SubscriptionWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -33,6 +34,19 @@ public ScopedSubscriptionServiceWrapper(IServiceProvider provider)
{
_provider = provider;
SubscriptionName = typeof(T).Name;

if (typeof(T).CanBeCastTo<SubscriptionBase>())
{
using var scope = _provider.CreateScope();
var sp = scope.ServiceProvider;

var subscription = sp.GetRequiredService<T>().As<SubscriptionBase>();
IncludedEventTypes.AddRange(subscription.IncludedEventTypes);
StreamType = subscription.StreamType;
IncludeArchivedEvents = subscription.IncludeArchivedEvents;
}


}

public override async Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller,
Expand Down

0 comments on commit 42bb232

Please sign in to comment.