Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AddProjectionWithServices and AddSubscriptionWithServices to .AddMartenStore #3252

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,52 @@ public async Task use_from_service_provider_as_singleton()
await store.Advanced.Clean.DeleteAllEventDataAsync();
}

[Fact]
public async Task use_from_service_provider_as_singleton_with_martenStore()
{
await rewindState();

using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
}).AddAsyncDaemon(DaemonMode.Solo).AddSubscriptionWithServices<SimpleSubscription>(ServiceLifetime.Singleton,
o => o.SubscriptionName = "Simple2");
}).StartAsync();

var store = (DocumentStore)host.Services.GetRequiredService<ICustomStore>();


store.Options.Projections.AllShards().Select(x => x.Name.Identity)
.ShouldContain("Simple2:All");

var events1 = new object[] { new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events2 = new object[] { new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events3 = new object[] { new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events4 = new object[] { new EventSourcingTests.Aggregation.EEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };

await using var session = store.LightweightSession();

session.Events.StartStream(Guid.NewGuid(), events1);
session.Events.StartStream(Guid.NewGuid(), events2);
session.Events.StartStream(Guid.NewGuid(), events3);
session.Events.StartStream(Guid.NewGuid(), events4);

await session.SaveChangesAsync();

await store.WaitForNonStaleProjectionDataAsync(60.Seconds());

SimpleSubscription.EventsEncountered[1].Count.ShouldBeGreaterThanOrEqualTo(16);

var progress = await store.Advanced.ProjectionProgressFor(new ShardName("Simple2", "All"));
progress.ShouldBeGreaterThanOrEqualTo(16);

await store.Advanced.Clean.DeleteAllEventDataAsync();
}

private async Task rewindState()
{
SimpleSubscription.Clear();
Expand Down Expand Up @@ -325,6 +371,52 @@ public async Task use_from_service_provider_as_scoped()
var store = (DocumentStore)host.Services.GetRequiredService<IDocumentStore>();


store.Options.Projections.AllShards().Select(x => x.Name.Identity)
.ShouldContain("Simple2:All");

var events1 = new object[] { new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events2 = new object[] { new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events3 = new object[] { new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events4 = new object[] { new EventSourcingTests.Aggregation.EEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };

await using var session = store.LightweightSession();

session.Events.StartStream(Guid.NewGuid(), events1);
session.Events.StartStream(Guid.NewGuid(), events2);
session.Events.StartStream(Guid.NewGuid(), events3);
session.Events.StartStream(Guid.NewGuid(), events4);

await session.SaveChangesAsync();

await store.WaitForNonStaleProjectionDataAsync(60.Seconds());

SimpleSubscription.EventsEncountered.Sum(x => x.Count).ShouldBeGreaterThanOrEqualTo(16);

var progress = await store.Advanced.ProjectionProgressFor(new ShardName("Simple2", "All"));
progress.ShouldBeGreaterThanOrEqualTo(16);

await store.Advanced.Clean.DeleteAllEventDataAsync();
}

[Fact]
public async Task use_from_service_provider_as_scoped_with_martenStore()
{
await rewindState();

using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
}).AddAsyncDaemon(DaemonMode.Solo).AddSubscriptionWithServices<SimpleSubscription>(ServiceLifetime.Scoped,
o => o.SubscriptionName = "Simple2");
}).StartAsync();

var store = (DocumentStore)host.Services.GetRequiredService<ICustomStore>();


store.Options.Projections.AllShards().Select(x => x.Name.Identity)
.ShouldContain("Simple2:All");

Expand Down Expand Up @@ -385,3 +477,7 @@ public Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionCo
return Task.FromResult((IChangeListener)NullChangeListener.Instance);
}
}

public interface ICustomStore: IDocumentStore
{
}
174 changes: 174 additions & 0 deletions src/EventSourcingTests/Projections/projections_with_IoC_services.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,176 @@ public async Task get_async_shards_with_custom_name()
projectionSource.AsyncProjectionShards((DocumentStore)store).Single().Name.Identity.ShouldBe("MyProjection:All");


}

[Fact]
public async Task can_apply_database_changes_at_runtime_with_projection_with_services_on_martenStore()
{
await using (var conn = new NpgsqlConnection(ConnectionSource.ConnectionString))
{
await conn.OpenAsync();
await conn.DropSchemaAsync("ioc");
await conn.CloseAsync();
}

using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

var mse = services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
});

mse.AddProjectionWithServices<ProductProjection>(
ProjectionLifecycle.Inline,
ServiceLifetime.Scoped
);

mse.ApplyAllDatabaseChangesOnStartup();
})
.StartAsync();

var store = host.Services.GetRequiredService<ICustomStore>();
var tables = store.Storage.AllObjects().OfType<DocumentTable>();

tables.Any(x => x.DocumentType == typeof(Product)).ShouldBeTrue();

var existing = await store.Storage.Database.ExistingTableFor(typeof(Product));
existing.ShouldNotBeNull();
}

[Fact]
public async Task use_projection_as_singleton_and_inline_on_martenStore()
{
#region sample_registering_projection_built_by_services

using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
})
// Note that this is chained after the call to AddMartenStore()
.AddProjectionWithServices<ProductProjection>(
ProjectionLifecycle.Inline,
ServiceLifetime.Singleton
).ApplyAllDatabaseChangesOnStartup();
})
.StartAsync();

#endregion

var store = host.Services.GetRequiredService<ICustomStore>();

await using var session = store.LightweightSession();
var streamId = session.Events.StartStream<Product>(
new ProductRegistered("Ankle Socks", "Socks")
).Id;
await session.SaveChangesAsync();

var product = await session.LoadAsync<Product>(streamId);
product.Price.ShouldBeGreaterThan(0);
product.Name.ShouldBe("Ankle Socks");
}

[Fact]
public async Task use_projection_as_singleton_and_async_on_martenStore()
{
using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
opts.Projections.DaemonLockId = 99123;
})
.AddProjectionWithServices<ProductProjection>(ProjectionLifecycle.Async, ServiceLifetime.Singleton)
.ApplyAllDatabaseChangesOnStartup();
}).StartAsync();

var store = host.Services.GetRequiredService<ICustomStore>();
await store.Advanced.Clean.CompletelyRemoveAllAsync();

await using var session = store.LightweightSession();
var streamId = session.Events.StartStream<Product>(new ProductRegistered("Ankle Socks", "Socks")).Id;
await session.SaveChangesAsync();

using var daemon = await store.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await daemon.Tracker.WaitForShardState("Product:All", 1);

var product = await session.LoadAsync<Product>(streamId);
product.ShouldNotBeNull();
product.Price.ShouldBeGreaterThan(0);
product.Name.ShouldBe("Ankle Socks");
}

[Fact]
public async Task use_projection_as_scoped_and_inline_on_martenStore()
{
using var host = await Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
}).AddProjectionWithServices<ProductProjection>(ProjectionLifecycle.Inline, ServiceLifetime.Scoped, "MyProjection")
.ApplyAllDatabaseChangesOnStartup();
}).StartAsync();

var store = host.Services.GetRequiredService<ICustomStore>();

store.Options.As<StoreOptions>().Projections.All.Single()
.ShouldBeOfType<ScopedProjectionWrapper<ProductProjection>>().ProjectionName.ShouldBe("MyProjection");

await using var session = store.LightweightSession();
var streamId = session.Events.StartStream<Product>(new ProductRegistered("Ankle Socks", "Socks")).Id;
await session.SaveChangesAsync();

var product = await session.LoadAsync<Product>(streamId);
product.Price.ShouldBeGreaterThan(0);
product.Name.ShouldBe("Ankle Socks");
}

[Fact]
public async Task get_async_shards_with_custom_name_on_martenStore()
{
using var host = await Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

services.AddMartenStore<ICustomStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc";
}).AddProjectionWithServices<ProductProjection>(ProjectionLifecycle.Async, ServiceLifetime.Scoped, "MyProjection")
.ApplyAllDatabaseChangesOnStartup();
}).StartAsync();

var store = host.Services.GetRequiredService<ICustomStore>();

var projectionSource = store.Options.As<StoreOptions>().Projections.All.Single();
projectionSource
.ShouldBeOfType<ScopedProjectionWrapper<ProductProjection>>().ProjectionName.ShouldBe("MyProjection");

projectionSource.AsyncProjectionShards((DocumentStore)store).Single().Name.Identity.ShouldBe("MyProjection:All");


}
}

Expand Down Expand Up @@ -255,3 +425,7 @@ public override ValueTask ApplyChangesAsync(
}

#endregion

public interface ICustomStore: IDocumentStore
{
}
Loading
Loading