From 9c4b3114d516afa40e53bf061fb7eec96d0094e7 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 1 Oct 2024 11:43:09 -0500 Subject: [PATCH 1/5] Improvements to async projection performance and optimized aggregation rebuilds. Closes GH-3461 Closes GH-3460 Closes GH-3434 Closes GH-3426 --- docs/configuration/prebuilding.md | 9 +- docs/documents/multi-tenancy.md | 8 +- docs/events/optimizing.md | 83 ++- .../projections/aggregate-projections.md | 14 +- docs/events/projections/custom-aggregates.md | 6 +- .../projections/multi-stream-projections.md | 13 +- docs/events/projections/rebuilding.md | 46 ++ src/CommandLineRunner/Program.cs | 7 +- src/CoreTests/Util/RecentlyUsedCacheTests.cs | 86 +++ src/DaemonTests/DaemonTests.csproj | 1 + src/DaemonTests/EventRangeTests.cs | 47 +- .../TestingSupport/DaemonContext.cs | 98 +++- src/DaemonTests/TestingSupport/Trip.cs | 5 +- .../TripProjectionWithCustomName.cs | 1 + src/DaemonTests/ViewProjectionTests.cs | 11 + src/DaemonTests/build_aggregate_projection.cs | 77 +++ .../custom_aggregation_in_async_daemon.cs | 46 +- src/DaemonTests/optimized_rebuilds.cs | 492 ++++++++++++++++++ .../Aggregation/CustomProjectionTests.cs | 62 +++ src/EventSourcingTests/EventGraphTests.cs | 6 + .../Examples/Optimizations.cs | 7 + .../SingleStreamProjectionTests.cs | 4 + .../Commands/Projection/ProjectionHost.cs | 4 + src/Marten/AdvancedOperations.cs | 1 - .../Events/Aggregation/AggregationRuntime.cs | 74 +++ .../Events/Aggregation/CustomProjection.cs | 54 +- .../GeneratedAggregateProjectionBase.cs | 8 + .../Aggregation/IAggregateProjection.cs | 9 +- .../Events/Aggregation/IAggregationRuntime.cs | 14 +- .../Aggregation/Rebuilds/AggregateIdentity.cs | 5 + .../Rebuilds/AggregatePageHandler.cs | 210 ++++++++ .../Rebuilds/AggregateRebuildTable.cs | 29 ++ .../Rebuilds/BackfillStreamTypeOperation.cs | 70 +++ .../DequeuePendingAggregateRebuilds.cs | 45 ++ .../Rebuilds/MarkShardModeAsContinuous.cs | 54 ++ .../Rebuilds/MarkShardModeAsRebuilding.cs | 55 ++ .../Rebuilds/QueryForNextAggregateIds.cs | 77 +++ .../Aggregation/Rebuilds/QueryForTenantIds.cs | 51 ++ .../Rebuilds/SeedAggregateRebuildTable.cs | 48 ++ .../Rebuilds/SingleStreamRebuilder.cs | 226 ++++++++ .../StringIdentifiedSingleStreamRebuilder.cs | 22 + .../Events/Aggregation/TenantSliceGroup.cs | 14 +- src/Marten/Events/Daemon/DaemonSettings.cs | 2 + .../Events/Daemon/HighWater/HighWaterAgent.cs | 12 + src/Marten/Events/Daemon/IProjectionDaemon.cs | 2 +- .../Events/Daemon/ISubscriptionAgent.cs | 5 +- .../Events/Daemon/ISubscriptionExecution.cs | 13 + .../Events/Daemon/Internals/EventLoader.cs | 12 +- .../Events/Daemon/Internals/EventPage.cs | 2 + .../Events/Daemon/Internals/EventRange.cs | 8 + .../Internals/GroupedProjectionExecution.cs | 21 +- .../Daemon/Internals/IDeadLetterQueue.cs | 1 + .../Daemon/Internals/NulloDaemonRuntime.cs | 7 + .../Daemon/Internals/ProjectionUpdateBatch.cs | 24 +- .../Daemon/Internals/SubscriptionAgent.cs | 36 +- .../Progress/ProjectionProgressStatement.cs | 11 +- .../Daemon/Progress/ShardStateSelector.cs | 27 +- .../Daemon/ProjectionDaemon.Rebuilding.cs | 1 - src/Marten/Events/Daemon/ProjectionDaemon.cs | 14 +- .../Events/Daemon/ShardExecutionMode.cs | 3 +- src/Marten/Events/Daemon/ShardState.cs | 13 + src/Marten/Events/EventDocumentStorage.cs | 10 + src/Marten/Events/EventGraph.FeatureSchema.cs | 8 +- src/Marten/Events/EventGraph.cs | 7 + src/Marten/Events/EventMapping.cs | 10 + src/Marten/Events/IEventStoreOptions.cs | 7 + .../Events/IReadOnlyEventStoreOptions.cs | 7 + src/Marten/Events/Projections/EventSlice.cs | 2 +- .../Events/Projections/GeneratedProjection.cs | 15 + .../Events/Projections/IProjectionSource.cs | 4 + .../Events/Projections/ProjectionWrapper.cs | 6 + .../Projections/ScopedProjectionWrapper.cs | 7 + src/Marten/Events/QueryEventStore.cs | 20 +- .../Events/Schema/EventProgressionTable.cs | 13 +- .../Internal/Storage/DocumentStorage.cs | 36 ++ .../Internal/Storage/IDocumentStorage.cs | 2 + .../Storage/SubClassDocumentStorage.cs | 10 + src/Marten/Internal/ValueTypeInfo.cs | 39 +- .../Storage/MartenDatabase.EventStorage.cs | 4 +- src/Marten/Storage/Tenant2.cs | 32 ++ src/Marten/StoreOptions.Identity.cs | 6 + .../Subscriptions/SubscriptionExecution.cs | 16 +- src/Marten/Util/RecentlyUsedCache.cs | 82 +++ 83 files changed, 2650 insertions(+), 106 deletions(-) create mode 100644 src/CoreTests/Util/RecentlyUsedCacheTests.cs create mode 100644 src/DaemonTests/optimized_rebuilds.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/AggregateIdentity.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/AggregateRebuildTable.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/BackfillStreamTypeOperation.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/DequeuePendingAggregateRebuilds.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsContinuous.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsRebuilding.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/QueryForNextAggregateIds.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/QueryForTenantIds.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/SeedAggregateRebuildTable.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/SingleStreamRebuilder.cs create mode 100644 src/Marten/Events/Aggregation/Rebuilds/StringIdentifiedSingleStreamRebuilder.cs create mode 100644 src/Marten/Util/RecentlyUsedCache.cs diff --git a/docs/configuration/prebuilding.md b/docs/configuration/prebuilding.md index 6d97ef93d7..f68b4b1215 100644 --- a/docs/configuration/prebuilding.md +++ b/docs/configuration/prebuilding.md @@ -152,6 +152,9 @@ public static class Program { opts.AutoCreateSchemaObjects = AutoCreate.All; opts.DatabaseSchemaName = "cli"; + opts.DisableNpgsqlLogging = true; + + opts.Events.UseOptimizedProjectionRebuilds = true; opts.MultiTenantedWithSingleServer( ConnectionSource.ConnectionString, @@ -162,7 +165,7 @@ public static class Program // *try* to use pre-generated code at runtime opts.GeneratedCodeMode = TypeLoadMode.Auto; - opts.Schema.For().AddSubClass(); + //opts.Schema.For().AddSubClass(); // You have to register all persisted document types ahead of time // RegisterDocumentType() is the equivalent of saying Schema.For() @@ -188,13 +191,13 @@ public static class Program .Add(new SimpleProjection(), ProjectionLifecycle.Inline); // This is actually important to register "live" aggregations too for the code generation - opts.Projections.LiveStreamAggregation(); + //opts.Projections.LiveStreamAggregation(); }).AddAsyncDaemon(DaemonMode.Solo); }); } } ``` -snippet source | anchor +snippet source | anchor Okay, after all that, there should be a new command line option called `codegen` for your project. Assuming diff --git a/docs/documents/multi-tenancy.md b/docs/documents/multi-tenancy.md index feb5f58e9d..9dc8ab7711 100644 --- a/docs/documents/multi-tenancy.md +++ b/docs/documents/multi-tenancy.md @@ -446,7 +446,7 @@ builder.Services.AddMarten(opts => opts.Policies.PartitionMultiTenantedDocumentsUsingMartenManagement("tenants"); }); ``` -snippet source | anchor +snippet source | anchor The tenant to partition name mapping will be stored in a table created by Marten called `mt_tenant_partitions` with @@ -467,7 +467,7 @@ await theStore // with the named tenant ids .AddMartenManagedTenantsAsync(CancellationToken.None, "a1", "a2", "a3"); ``` -snippet source | anchor +snippet source | anchor The API above will try to add any missing table partitions to all known document types. There is also a separate overload @@ -492,7 +492,7 @@ public class DocThatShouldBeExempted1 public Guid Id { get; set; } } ``` -snippet source | anchor +snippet source | anchor or exempt a single document type through the fluent interface: @@ -502,7 +502,7 @@ or exempt a single document type through the fluent interface: ```cs opts.Schema.For().DoNotPartition(); ``` -snippet source | anchor +snippet source | anchor ## Implementation Details diff --git a/docs/events/optimizing.md b/docs/events/optimizing.md index 7b57f36a0b..f532e8da06 100644 --- a/docs/events/optimizing.md +++ b/docs/events/optimizing.md @@ -33,9 +33,14 @@ builder.Services.AddMarten(opts => // during command handling with some significant // caveats opts.Events.UseIdentityMapForInlineAggregates = true; + + // Opts into a mode where Marten is able to rebuild single + // stream projections faster by building one stream at a time + // Does require new table migrations for Marten 7 users though + opts.Events.UseOptimizedProjectionRebuilds = true; }); ``` -snippet source | anchor +snippet source | anchor The archived stream option is further described in the section on [Hot/Cold Storage Partitioning](/events/archiving.html#hot-cold-storage-partitioning). @@ -45,3 +50,79 @@ applicability and drawbacks of the "Quick" event appending. Lastly, see [Optimizing FetchForWriting with Inline Aggregates](/scenarios/command_handler_workflow.html#optimizing-fetchforwriting-with-inline-aggregates) for more information about the `UseIdentityMapForInlineAggregates` option. + +## Caching for Asynchronous Projections + +You may be able to wring out more throughput for aggregated projections (`SingleStreamProjection`, `MultiStreamProjection`, `CustomProjection`) +by opting into 2nd level caching of the aggregated projected documents during asynchronous projection building. You can +do that by setting a greater than zero value for `CacheLimitPerTenant` directly inside of the aforementioned projection types +like so: + + + +```cs +public class DayProjection: MultiStreamProjection +{ + public DayProjection() + { + // Tell the projection how to group the events + // by Day document + Identity(x => x.Day); + + // This just lets the projection work independently + // on each Movement child of the Travel event + // as if it were its own event + FanOut(x => x.Movements); + + // You can also access Event data + FanOut(x => x.Data.Stops); + + ProjectionName = "Day"; + + // Opt into 2nd level caching of up to 100 + // most recently encountered aggregates as a + // performance optimization + CacheLimitPerTenant = 1000; + + // With large event stores of relatively small + // event objects, moving this number up from the + // default can greatly improve throughput and especially + // improve projection rebuild times + Options.BatchSize = 5000; + } + + public void Apply(Day day, TripStarted e) => day.Started++; + public void Apply(Day day, TripEnded e) => day.Ended++; + + public void Apply(Day day, Movement e) + { + switch (e.Direction) + { + case Direction.East: + day.East += e.Distance; + break; + case Direction.North: + day.North += e.Distance; + break; + case Direction.South: + day.South += e.Distance; + break; + case Direction.West: + day.West += e.Distance; + break; + + default: + throw new ArgumentOutOfRangeException(); + } + } + + public void Apply(Day day, Stop e) => day.Stops++; +} +``` +snippet source | anchor + + +Marten is using a most recently used cache for the projected documents that are being built by an aggregation projection +so that updates from new events can be directly applied to the in memory documents instead of having to constantly +load those documents over and over again from the database as new events trickle in. This is of course much more effective +when your projection is constantly updating a relatively small number of different aggregates. diff --git a/docs/events/projections/aggregate-projections.md b/docs/events/projections/aggregate-projections.md index 77b6c5fe37..e5451e8932 100644 --- a/docs/events/projections/aggregate-projections.md +++ b/docs/events/projections/aggregate-projections.md @@ -99,7 +99,7 @@ public class TripProjection: SingleStreamProjection } } ``` -snippet source | anchor +snippet source | anchor And register that projection like this: @@ -236,7 +236,7 @@ public class Trip internal bool ShouldDelete(VacationOver e) => Traveled > 1000; } ``` -snippet source | anchor +snippet source | anchor Or finally, you can use a method named `Create()` on a projection type as shown in this sample: @@ -272,7 +272,7 @@ public class TripProjection: SingleStreamProjection } } ``` -snippet source | anchor +snippet source | anchor The `Create()` method has to return either the aggregate document type or `Task` where `T` is the aggregate document type. There must be an argument for the specific event type or `IEvent` where `T` is the event type if you need access to event metadata. You can also take in an `IQuerySession` if you need to look up additional data as part of the transformation or `IEvent` in addition to the exact event type just to get at event metadata. @@ -312,7 +312,7 @@ public class TripProjection: SingleStreamProjection } } ``` -snippet source | anchor +snippet source | anchor I'm not personally that wild about using lots of inline Lambdas like the example above, and to that end, Marten now supports the `Apply()` method convention. Here's the same `TripProjection`, but this time using methods to mutate the `Trip` document: @@ -348,7 +348,7 @@ public class TripProjection: SingleStreamProjection } } ``` -snippet source | anchor +snippet source | anchor The `Apply()` methods can accept any combination of these arguments: @@ -517,7 +517,7 @@ public class Trip internal bool ShouldDelete(VacationOver e) => Traveled > 1000; } ``` -snippet source | anchor +snippet source | anchor Here's an example of using the various ways of doing `Trip` stream aggregation: @@ -551,7 +551,7 @@ internal async Task use_a_stream_aggregation() var trip = await session.Events.AggregateStreamAsync(tripId); } ``` -snippet source | anchor +snippet source | anchor ## Aggregate Versioning diff --git a/docs/events/projections/custom-aggregates.md b/docs/events/projections/custom-aggregates.md index 77d3efd70a..0265a8ee9f 100644 --- a/docs/events/projections/custom-aggregates.md +++ b/docs/events/projections/custom-aggregates.md @@ -27,7 +27,7 @@ public class Increment { } ``` -snippet source | anchor +snippet source | anchor And a simple aggregate document type like this: @@ -51,7 +51,7 @@ public class StartAndStopAggregate: ISoftDeleted } } ``` -snippet source | anchor +snippet source | anchor As you can see, `StartAndStopAggregate` as a `Guid` as its identity and is also [soft-deleted](/documents/deletes.html#soft-deletes) when stored by @@ -126,7 +126,7 @@ public class StartAndStopProjection: CustomProjectionsnippet source | anchor +snippet source | anchor ## Custom Grouping diff --git a/docs/events/projections/multi-stream-projections.md b/docs/events/projections/multi-stream-projections.md index 1dbc1513c7..ffbc477f85 100644 --- a/docs/events/projections/multi-stream-projections.md +++ b/docs/events/projections/multi-stream-projections.md @@ -501,6 +501,17 @@ public class DayProjection: MultiStreamProjection FanOut(x => x.Data.Stops); ProjectionName = "Day"; + + // Opt into 2nd level caching of up to 100 + // most recently encountered aggregates as a + // performance optimization + CacheLimitPerTenant = 1000; + + // With large event stores of relatively small + // event objects, moving this number up from the + // default can greatly improve throughput and especially + // improve projection rebuild times + Options.BatchSize = 5000; } public void Apply(Day day, TripStarted e) => day.Started++; @@ -531,7 +542,7 @@ public class DayProjection: MultiStreamProjection public void Apply(Day day, Stop e) => day.Stops++; } ``` -snippet source | anchor +snippet source | anchor ## Using Custom Grouper with Fan Out Feature for Event Projections diff --git a/docs/events/projections/rebuilding.md b/docs/events/projections/rebuilding.md index ea5689decb..4451505fca 100644 --- a/docs/events/projections/rebuilding.md +++ b/docs/events/projections/rebuilding.md @@ -39,6 +39,52 @@ await agent.RebuildProjectionAsync("Distance", CancellationToken.None); snippet source | anchor +## Optimized Projection Rebuilds + +::: tip +This optimization will be turned on by default in Marten 8, but we didn't want to force anyone using Marten 7 to have +to upgrade their database without the explicit opt in configuration. +::: + +Marten can optimize the projection rebuilds of single stream projections by opting into this flag in your configuration: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection("some connection string"); + + // Turn on the PostgreSQL table partitioning for + // hot/cold storage on archived events + opts.Events.UseArchivedStreamPartitioning = true; + + // Use the *much* faster workflow for appending events + // at the cost of *some* loss of metadata usage for + // inline projections + opts.Events.AppendMode = EventAppendMode.Quick; + + // Little more involved, but this can reduce the number + // of database queries necessary to process inline projections + // during command handling with some significant + // caveats + opts.Events.UseIdentityMapForInlineAggregates = true; + + // Opts into a mode where Marten is able to rebuild single + // stream projections faster by building one stream at a time + // Does require new table migrations for Marten 7 users though + opts.Events.UseOptimizedProjectionRebuilds = true; +}); +``` +snippet source | anchor + + +In this mode, Marten will rebuild single stream projection documents stream by stream in the reverse order that the +streams were last modified. This was conceived of as being combined with the [`FetchForWriting()`](/scenarios/command_handler_workflow.html#fetchforwriting) usage with asynchronous +single stream projections for zero downtime deployments while trying to create less load on the database than the original +"left fold" / "from zero" rebuild would be. + ## Rebuilding a Single Stream A long standing request has been to be able to rebuild only a single stream or subset of streams diff --git a/src/CommandLineRunner/Program.cs b/src/CommandLineRunner/Program.cs index 6b9eeb22ea..47338d2aa3 100644 --- a/src/CommandLineRunner/Program.cs +++ b/src/CommandLineRunner/Program.cs @@ -50,6 +50,9 @@ public static IHostBuilder CreateHostBuilder(string[] args) { opts.AutoCreateSchemaObjects = AutoCreate.All; opts.DatabaseSchemaName = "cli"; + opts.DisableNpgsqlLogging = true; + + opts.Events.UseOptimizedProjectionRebuilds = true; opts.MultiTenantedWithSingleServer( ConnectionSource.ConnectionString, @@ -60,7 +63,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) // *try* to use pre-generated code at runtime opts.GeneratedCodeMode = TypeLoadMode.Auto; - opts.Schema.For().AddSubClass(); + //opts.Schema.For().AddSubClass(); // You have to register all persisted document types ahead of time // RegisterDocumentType() is the equivalent of saying Schema.For() @@ -86,7 +89,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) .Add(new SimpleProjection(), ProjectionLifecycle.Inline); // This is actually important to register "live" aggregations too for the code generation - opts.Projections.LiveStreamAggregation(); + //opts.Projections.LiveStreamAggregation(); }).AddAsyncDaemon(DaemonMode.Solo); }); } diff --git a/src/CoreTests/Util/RecentlyUsedCacheTests.cs b/src/CoreTests/Util/RecentlyUsedCacheTests.cs new file mode 100644 index 0000000000..d22a2f7123 --- /dev/null +++ b/src/CoreTests/Util/RecentlyUsedCacheTests.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using Marten.Testing.Harness; +using Marten.Util; +using Shouldly; +using Xunit; + +namespace CoreTests.Util; + +public class RecentlyUsedCacheTests +{ + private readonly RecentlyUsedCache theCache = new (){Limit = 100}; + + [Fact] + public void get_the_same_value_back() + { + var items = new List(); + for (int i = 0; i < 10; i++) + { + var item = new Item(Guid.NewGuid()); + theCache.Store(item.Id, item); + items.Add(item); + } + + foreach (var item in items) + { + theCache.TryFind(item.Id, out var found).ShouldBeTrue(); + found.ShouldBeTheSameAs(item); + } + } + + [Fact] + public void compact_moves_off_the_first_ones() + { + var items = new List(); + for (int i = 0; i < 110; i++) + { + var item = new Item(Guid.NewGuid()); + theCache.Store(item.Id, item); + items.Add(item); + } + + theCache.CompactIfNecessary(); + + // The first 10 should have removed + for (int i = 0; i < 10; i++) + { + theCache.TryFind(items[0].Id, out var _).ShouldBeFalse(); + } + + theCache.Count.ShouldBe(theCache.Limit); + } + + [Fact] + public void request_item_resets() + { + var items = new List(); + for (int i = 0; i < 110; i++) + { + var item = new Item(Guid.NewGuid()); + theCache.Store(item.Id, item); + items.Add(item); + } + + theCache.TryFind(items[0].Id, out var _).ShouldBeTrue(); + theCache.TryFind(items[2].Id, out var _).ShouldBeTrue(); + theCache.TryFind(items[4].Id, out var _).ShouldBeTrue(); + theCache.TryFind(items[8].Id, out var _).ShouldBeTrue(); + + theCache.CompactIfNecessary(); + theCache.Count.ShouldBe(theCache.Limit); + + theCache.TryFind(items[0].Id, out var _).ShouldBeTrue(); + theCache.TryFind(items[2].Id, out var _).ShouldBeTrue(); + theCache.TryFind(items[4].Id, out var _).ShouldBeTrue(); + theCache.TryFind(items[8].Id, out var _).ShouldBeTrue(); + + theCache.TryFind(items[1].Id, out var _).ShouldBeFalse(); + theCache.TryFind(items[3].Id, out var _).ShouldBeFalse(); + theCache.TryFind(items[5].Id, out var _).ShouldBeFalse(); + theCache.TryFind(items[7].Id, out var _).ShouldBeFalse(); + + } +} + +public record Item(Guid Id); diff --git a/src/DaemonTests/DaemonTests.csproj b/src/DaemonTests/DaemonTests.csproj index dcb6bfe382..0cd78f0f66 100644 --- a/src/DaemonTests/DaemonTests.csproj +++ b/src/DaemonTests/DaemonTests.csproj @@ -18,6 +18,7 @@ runtime; build; native; contentfiles; analyzers + diff --git a/src/DaemonTests/EventRangeTests.cs b/src/DaemonTests/EventRangeTests.cs index 0e0352ce62..db6d8daf55 100644 --- a/src/DaemonTests/EventRangeTests.cs +++ b/src/DaemonTests/EventRangeTests.cs @@ -60,4 +60,49 @@ public void skip_event_sequence() range.Events.Count.ShouldBe(4); } -} \ No newline at end of file + + [Fact] + public void combine_shallow() + { + var range1 = new EventRange(new ShardName("name"), 0, 100) + { + Events = new List + { + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + } + }; + + var range2 = new EventRange(new ShardName("name"), 100, 200) + { + Events = new List + { + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + } + }; + + var range3 = new EventRange(new ShardName("name"), 200, 300) + { + Events = new List + { + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + new Event(new AEvent()), + } + }; + + var combined = EventRange.CombineShallow(range1, range2, range3); + combined.SequenceFloor.ShouldBe(0); + combined.SequenceCeiling.ShouldBe(300); + combined.ShardName.ShouldBe(range1.ShardName); + } +} diff --git a/src/DaemonTests/TestingSupport/DaemonContext.cs b/src/DaemonTests/TestingSupport/DaemonContext.cs index 2f6654c7cf..93d38b6a9d 100644 --- a/src/DaemonTests/TestingSupport/DaemonContext.cs +++ b/src/DaemonTests/TestingSupport/DaemonContext.cs @@ -17,7 +17,9 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Npgsql; using Shouldly; +using Weasel.Postgresql; using Xunit; using Xunit.Abstractions; @@ -41,6 +43,15 @@ protected DaemonContext(ITestOutputHelper output) _output = output; } + internal async Task wipeAllStreamTypeMarkers() + { + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + await conn.CreateCommand("update daemon.mt_streams set type = null").ExecuteNonQueryAsync(); + await conn.CreateCommand("delete from daemon.mt_event_progression where name != 'HighWaterMark'") + .ExecuteNonQueryAsync(); + } + public ILogger Logger { get; } internal async Task StartDaemon() @@ -169,6 +180,7 @@ protected StreamAction[] ToStreamActions() return _streams.Select(x => x.ToAction(theStore.Events)).ToArray(); } + // START HERE, NEEDS TO BE GENERALIZED protected async Task CheckAllExpectedAggregatesAgainstActuals() { var actuals = await LoadAllAggregatesFromDatabase(); @@ -195,6 +207,58 @@ protected async Task CheckAllExpectedAggregatesAgainstActuals() } } + protected async Task CheckAllExpectedGuidCentricAggregatesAgainstActuals(Func identifier) where TDoc : class + { + var actuals = await LoadAllAggregatesFromDatabase(identifier); + + foreach (var stream in _streams) + { + var expected = await theSession.Events.AggregateStreamAsync(stream.StreamId); + + if (expected == null) + { + actuals.ContainsKey(stream.StreamId).ShouldBeFalse(); + } + else + { + if (actuals.TryGetValue(stream.StreamId, out var actual)) + { + expected.ShouldBe(actual); + } + else + { + throw new Exception("Missing expected aggregate"); + } + } + } + } + + protected async Task CheckAllExpectedStringCentricAggregatesAgainstActuals(Func identifier) where TDoc : class + { + var actuals = await LoadAllAggregatesFromDatabase(identifier); + + foreach (var stream in _streams) + { + var expected = await theSession.Events.AggregateStreamAsync(stream.StreamId.ToString()); + + if (expected == null) + { + actuals.ContainsKey(stream.StreamId.ToString()).ShouldBeFalse(); + } + else + { + if (actuals.TryGetValue(stream.StreamId.ToString(), out var actual)) + { + expected.ShouldBe(actual); + } + else + { + throw new Exception("Missing expected aggregate"); + } + } + } + } + protected async Task CheckAllExpectedAggregatesAgainstActuals(string tenantId) { var actuals = await LoadAllAggregatesFromDatabase(tenantId); @@ -221,24 +285,34 @@ protected async Task CheckAllExpectedAggregatesAgainstActuals(string tenantId) } } - protected async Task> LoadAllAggregatesFromDatabase(string tenantId = null) + protected Task> LoadAllAggregatesFromDatabase(string tenantId = null) + { + return LoadAllAggregatesFromDatabase(x => x.Id, tenantId); + } + + protected async Task> LoadAllAggregatesFromDatabase(Func identifier,string tenantId = null) { if (string.IsNullOrEmpty(tenantId)) { - var data = await theSession.Query().ToListAsync(); - var dict = data.ToDictionary(x => x.Id); + var data = await theSession.Query().ToListAsync(); + var dict = data.ToDictionary(x => identifier(x)); return dict; } else { await using var session = theStore.LightweightSession(tenantId); - var data = await session.Query().ToListAsync(); - var dict = data.ToDictionary(x => x.Id); + var data = await session.Query().ToListAsync(); + var dict = data.ToDictionary(x => identifier(x)); return dict; } } protected async Task PublishSingleThreaded() + { + await PublishSingleThreaded(); + } + + protected async Task PublishSingleThreaded() where T : class { var groups = _streams.GroupBy(x => x.TenantId).ToArray(); if (groups.Length > 1 || groups.Single().Key != Tenancy.DefaultTenantId) @@ -248,7 +322,17 @@ protected async Task PublishSingleThreaded() foreach (var stream in @group) { await using var session = theStore.LightweightSession(group.Key); - session.Events.StartStream(stream.StreamId, stream.Events); + + if (theStore.Options.EventGraph.StreamIdentity == StreamIdentity.AsGuid) + { + session.Events.StartStream(stream.StreamId, stream.Events); + } + else + { + session.Events.StartStream(stream.StreamId.ToString(), stream.Events); + } + + await session.SaveChangesAsync(); } } @@ -258,7 +342,7 @@ protected async Task PublishSingleThreaded() foreach (var stream in _streams) { await using var session = theStore.LightweightSession(); - session.Events.StartStream(stream.StreamId, stream.Events); + session.Events.StartStream(stream.StreamId, stream.Events); await session.SaveChangesAsync(); } } diff --git a/src/DaemonTests/TestingSupport/Trip.cs b/src/DaemonTests/TestingSupport/Trip.cs index d00a75be00..1e071fcea3 100644 --- a/src/DaemonTests/TestingSupport/Trip.cs +++ b/src/DaemonTests/TestingSupport/Trip.cs @@ -42,4 +42,7 @@ public override string ToString() { return $"{nameof(Id)}: {Id}, {nameof(EndedOn)}: {EndedOn}, {nameof(Traveled)}: {Traveled}, {nameof(State)}: {State}, {nameof(Active)}: {Active}, {nameof(StartedOn)}: {StartedOn}"; } -} \ No newline at end of file +} + + + diff --git a/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs b/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs index 8dfb6a67af..42cc04e017 100644 --- a/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs +++ b/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs @@ -36,6 +36,7 @@ public TripProjectionWithCustomName() { ProjectionName = "TripCustomName"; TeardownDataOnRebuild = true; + Options.BatchSize = 5000; } } diff --git a/src/DaemonTests/ViewProjectionTests.cs b/src/DaemonTests/ViewProjectionTests.cs index a400738363..45a441d188 100644 --- a/src/DaemonTests/ViewProjectionTests.cs +++ b/src/DaemonTests/ViewProjectionTests.cs @@ -148,6 +148,17 @@ public DayProjection() FanOut(x => x.Data.Stops); ProjectionName = "Day"; + + // Opt into 2nd level caching of up to 100 + // most recently encountered aggregates as a + // performance optimization + CacheLimitPerTenant = 1000; + + // With large event stores of relatively small + // event objects, moving this number up from the + // default can greatly improve throughput and especially + // improve projection rebuild times + Options.BatchSize = 5000; } public void Apply(Day day, TripStarted e) => day.Started++; diff --git a/src/DaemonTests/build_aggregate_projection.cs b/src/DaemonTests/build_aggregate_projection.cs index 7480164fcb..3100b3565f 100644 --- a/src/DaemonTests/build_aggregate_projection.cs +++ b/src/DaemonTests/build_aggregate_projection.cs @@ -72,6 +72,33 @@ public async Task end_to_end_with_events_already_published() await CheckAllExpectedAggregatesAgainstActuals(); } + [Fact] + public async Task end_to_end_with_events_already_published_with_caching() + { + NumberOfStreams = 10; + + Logger.LogDebug("The expected number of events is {NumberOfEvents}", NumberOfEvents); + + StoreOptions(x => + { + x.Projections.Add(new TripProjectionWithCustomName(){CacheLimitPerTenant = 100}, ProjectionLifecycle.Async); + x.Logger(new TestOutputMartenLogger(_output)); + }, true); + + var agent = await StartDaemon(); + + await PublishSingleThreaded(); + + var shard = theStore.Options.Projections.AllShards().Single(); + var waiter = agent.Tracker.WaitForShardState(new ShardState(shard, NumberOfEvents), 60.Seconds()); + + await agent.StartAgentAsync(shard.Name.Identity, CancellationToken.None); + + await waiter; + + await CheckAllExpectedAggregatesAgainstActuals(); + } + [Fact] public async Task build_with_multi_tenancy() { @@ -99,6 +126,33 @@ public async Task build_with_multi_tenancy() await CheckAllExpectedAggregatesAgainstActuals("b"); } + [Fact] + public async Task build_with_multi_tenancy_with_caching() + { + StoreOptions(x => + { + x.Events.TenancyStyle = TenancyStyle.Conjoined; + x.Projections.Add(new TripProjectionWithCustomName(){CacheLimitPerTenant = 100}, ProjectionLifecycle.Async); + x.Schema.For().MultiTenanted(); + }, true); + + UseMixOfTenants(5); + + Logger.LogDebug("The expected number of events is {NumberOfEvents}", NumberOfEvents); + + var agent = await StartDaemon(); + + var shard = theStore.Options.Projections.AllShards().Single(); + var waiter = agent.Tracker.WaitForShardState(new ShardState(shard, NumberOfEvents), 60.Seconds()); + + await PublishSingleThreaded(); + + await waiter; + + await CheckAllExpectedAggregatesAgainstActuals("a"); + await CheckAllExpectedAggregatesAgainstActuals("b"); + } + [Fact] public async Task rebuild_the_projection() { @@ -121,6 +175,29 @@ public async Task rebuild_the_projection() await CheckAllExpectedAggregatesAgainstActuals(); } + [Fact] + public async Task rebuild_the_projection_with_caching() + { + NumberOfStreams = 15; + + Logger.LogDebug("The expected number of events is {NumberOfEvents}", NumberOfEvents); + + StoreOptions(x => x.Projections.Add(new TripProjectionWithCustomName(){CacheLimitPerTenant = 100}, ProjectionLifecycle.Async), true); + + var agent = await StartDaemon(); + + // Gotta do this to have it mixed up + await PublishMultiThreaded(3); + + var waiter = agent.Tracker.WaitForShardState(new ShardState("TripCustomName:All", NumberOfEvents), 120.Seconds()); + + await waiter; + Logger.LogDebug("About to rebuild TripCustomName:All"); + await agent.RebuildProjectionAsync("TripCustomName", CancellationToken.None); + Logger.LogDebug("Done rebuilding TripCustomName:All"); + await CheckAllExpectedAggregatesAgainstActuals(); + } + [Fact] public async Task rebuild_the_projection_clears_state() { diff --git a/src/DaemonTests/custom_aggregation_in_async_daemon.cs b/src/DaemonTests/custom_aggregation_in_async_daemon.cs index 3e0bae9ec6..c8fda3eb0a 100644 --- a/src/DaemonTests/custom_aggregation_in_async_daemon.cs +++ b/src/DaemonTests/custom_aggregation_in_async_daemon.cs @@ -73,6 +73,50 @@ public async Task run_end_to_end() .ShouldBe(new CustomAggregate{Id = 3, ACount = 0, BCount = 1, CCount = 0, DCount = 1}); } + + [Fact] + public async Task run_end_to_end_with_caching() + { + StoreOptions(opts => + { + var myCustomAggregation = new MyCustomProjection(){CacheLimitPerTenant = 100}; + opts.Projections.Add(myCustomAggregation, ProjectionLifecycle.Async); + opts.Logger(new TestOutputMartenLogger(_output)); + }); + + await theStore.Advanced.Clean.DeleteAllDocumentsAsync(); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + + appendCustomEvent(1, 'a'); + appendCustomEvent(1, 'a'); + appendCustomEvent(1, 'b'); + appendCustomEvent(1, 'c'); + appendCustomEvent(1, 'd'); + appendCustomEvent(2, 'a'); + appendCustomEvent(2, 'a'); + appendCustomEvent(3, 'b'); + appendCustomEvent(3, 'd'); + appendCustomEvent(1, 'a'); + appendCustomEvent(1, 'a'); + + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(logger:new TestLogger(_output)); + await daemon.StartAllAsync(); + + await daemon.Tracker.WaitForShardState("Custom:All", 11); + + var agg1 = await theSession.LoadAsync(1); + agg1 + .ShouldBe(new CustomAggregate{Id = 1, ACount = 4, BCount = 1, CCount = 1, DCount = 1}); + + (await theSession.LoadAsync(2)) + .ShouldBe(new CustomAggregate{Id = 2, ACount = 2, BCount = 0, CCount = 0, DCount = 0}); + + (await theSession.LoadAsync(3)) + .ShouldBe(new CustomAggregate{Id = 3, ACount = 0, BCount = 1, CCount = 0, DCount = 1}); + + } } @@ -183,4 +227,4 @@ public override string ToString() { return $"{nameof(Id)}: {Id}, {nameof(ACount)}: {ACount}, {nameof(BCount)}: {BCount}, {nameof(CCount)}: {CCount}, {nameof(DCount)}: {DCount}"; } -} \ No newline at end of file +} diff --git a/src/DaemonTests/optimized_rebuilds.cs b/src/DaemonTests/optimized_rebuilds.cs new file mode 100644 index 0000000000..d6f93359c7 --- /dev/null +++ b/src/DaemonTests/optimized_rebuilds.cs @@ -0,0 +1,492 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DaemonTests.TestingSupport; +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Aggregation.Rebuilds; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Events.Projections; +using Marten.Events.Schema; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Storage; +using Marten.Testing.Harness; +using Npgsql; +using NSubstitute; +using Shouldly; +using Vogen; +using Weasel.Core; +using Weasel.Postgresql.Tables; +using Xunit; +using Xunit.Abstractions; + +namespace DaemonTests; + +public class optimized_rebuilds : DaemonContext +{ + public optimized_rebuilds(ITestOutputHelper output): base(output) + { + + } + + [Fact] + public async Task add_extra_columns_to_progression_table() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + }); + + await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent)); + + var table = await findExistingTable(EventProgressionTable.Name); + + table.HasColumn("mode").ShouldBeTrue(); + table.HasColumn("rebuild_threshold").ShouldBeTrue(); + table.HasColumn("assigned_node").ShouldBeTrue(); + } + + private async Task findExistingTable(string tableName) + { + using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + + return await new Table(new DbObjectName(theStore.Events.DatabaseSchemaName, tableName)).FetchExistingAsync(conn); + } + + [Fact] + public async Task adds_the_aggregate_rebuild_table() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + }); + + await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent)); + + var table = await findExistingTable(AggregateRebuildTable.Name); + + table.ShouldNotBeNull(); + } + + [Fact] + public async Task use_guid_identified_rebuilder_from_scratch() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + await wipeAllStreamTypeMarkers(); + + theStore.Options.Projections.TryFindProjection("Trip", out var source).ShouldBeTrue(); + + var projection = source.Build(theStore); + + var builder = new SingleStreamRebuilder(theStore, theStore.Storage.Database, (IAggregationRuntime)projection); + + var daemon = Substitute.For(); + daemon.HighWaterMark().Returns(NumberOfEvents); + + await builder.RebuildAllAsync(daemon, new ShardName("Trip", "All"), projection.As().Projection, CancellationToken.None); + + await CheckAllExpectedAggregatesAgainstActuals(); + + var states = await theStore.Advanced.AllProjectionProgress(); + var state = states.Single(x => x.ShardName == "Trip:All"); + state.Mode.ShouldBe(ShardMode.continuous); + state.Sequence.ShouldBe(NumberOfEvents); + } + + [Fact] + public async Task optimize_build_kicks_in_at_startup() + { + // Honestly, walk through this with the debugger to see that it's doing the right thing + + // Publish with NO projection + NumberOfStreams = 10; + await PublishSingleThreaded(); + + await wipeAllStreamTypeMarkers(); + + // Add the projection and restart the store + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }, cleanAll:false); + + using var daemon = await StartDaemon(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(5.Seconds()); + + await CheckAllExpectedAggregatesAgainstActuals(); + } + + [Fact] + public async Task use_guid_identified_rebuilder_from_daemon() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + await wipeAllStreamTypeMarkers(); + + using var daemon = await StartDaemon(); + await daemon.RebuildProjectionAsync(10.Seconds(), CancellationToken.None); + + await CheckAllExpectedAggregatesAgainstActuals(); + + var states = await theStore.Advanced.AllProjectionProgress(); + var state = states.Single(x => x.ShardName == "Trip:All"); + state.Mode.ShouldBe(ShardMode.continuous); + state.Sequence.ShouldBe(NumberOfEvents); + } + + [Fact] + public async Task rebuild_with_conjoined_tenancy_against_guid_based_ids() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + opts.Schema.For().MultiTenanted(); + }, true); + + NumberOfStreams = 10; + UseMixOfTenants(5); + + await PublishSingleThreaded(); + + await wipeAllStreamTypeMarkers(); + + theStore.Options.Projections.TryFindProjection("Trip", out var source).ShouldBeTrue(); + + var projection = source.Build(theStore); + + var builder = new SingleStreamRebuilder(theStore, theStore.Storage.Database, (IAggregationRuntime)projection); + + var asyncOptions = new AsyncOptions(); + asyncOptions.DeleteViewTypeOnTeardown(typeof(Trip)); + + var daemon = Substitute.For(); + daemon.HighWaterMark().Returns(NumberOfEvents); + + await builder.RebuildAllAsync(daemon, new ShardName("Trip", "All"), projection.As().Projection, CancellationToken.None); + + await CheckAllExpectedAggregatesAgainstActuals("a"); + await CheckAllExpectedAggregatesAgainstActuals("b"); + + var states = await theStore.Advanced.AllProjectionProgress(); + var state = states.Single(x => x.ShardName == "Trip:All"); + state.Mode.ShouldBe(ShardMode.continuous); + state.Sequence.ShouldBe(NumberOfEvents); + + } + + [Fact] + public async Task SeedAggregateRebuildTable_on_Guid_identified_streams() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + await theStore.Advanced.Clean.DeleteAllDocumentsAsync(); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + var operation = new SeedAggregateRebuildTable(theStore.Options, typeof(Trip)); + + theSession.QueueOperation(operation); + await theSession.SaveChangesAsync(); + + using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + + var count = (long)await conn.CreateCommand( + $"select count(*) from {SchemaName}.{AggregateRebuildTable.Name} where stream_type = 'trip'") + .ExecuteScalarAsync(); + + count.ShouldBe(NumberOfStreams); + } + + [Fact] + public async Task QueryForNextAggregateIds() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + var operation = new SeedAggregateRebuildTable(theStore.Options, typeof(Trip)); + + theSession.QueueOperation(operation); + await theSession.SaveChangesAsync(); + + var handler = new QueryForNextAggregateIds(theStore.Options, typeof(Trip)); + var ids = await theSession.As().ExecuteHandlerAsync(handler, CancellationToken.None); + + ids.Count.ShouldBe(NumberOfStreams); + } + + [Fact] + public async Task DequeuePendingAggregateRebuilds() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + await wipeAllStreamTypeMarkers(); + + var operation = new SeedAggregateRebuildTable(theStore.Options, typeof(Trip)); + + theSession.QueueOperation(operation); + await theSession.SaveChangesAsync(); + + var handler = new QueryForNextAggregateIds(theStore.Options, typeof(Trip)); + var ids = await theSession.As().ExecuteHandlerAsync(handler, CancellationToken.None); + + var dequeue = new DequeuePendingAggregateRebuilds(theStore.Options, ids.Select(x => x.Number)); + theSession.QueueOperation(dequeue); + await theSession.SaveChangesAsync(); + + ids = await theSession.As().ExecuteHandlerAsync(handler, CancellationToken.None); + ids.Count.ShouldBe(0); + + + } + + + [Fact] + public async Task use_guid_identified_rebuilder_from_scratch_using_strong_typed_id() + { + StoreOptions(opts => + { + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + await wipeAllStreamTypeMarkers(); + + theStore.Options.Projections.TryFindProjection("StrongTrip", out var source).ShouldBeTrue(); + + var projection = source.Build(theStore); + + var builder = new SingleStreamRebuilder(theStore, theStore.Storage.Database, (IAggregationRuntime)projection); + + var daemon = Substitute.For(); + daemon.HighWaterMark().Returns(NumberOfEvents); + + await builder.RebuildAllAsync(daemon, new ShardName("StrongTrip", "All"), projection.As().Projection, CancellationToken.None); + + await CheckAllExpectedGuidCentricAggregatesAgainstActuals(x => x.Id.Value.Value); + + var states = await theStore.Advanced.AllProjectionProgress(); + var state = states.Single(x => x.ShardName == "StrongTrip:All"); + state.Mode.ShouldBe(ShardMode.continuous); + state.Sequence.ShouldBe(NumberOfEvents); + } + + [Fact] + public async Task use_string_identified_rebuilder_from_scratch() + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.UseOptimizedProjectionRebuilds = true; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + NumberOfStreams = 10; + await PublishSingleThreaded(); + + theStore.Options.Projections.TryFindProjection("StringTrip", out var source).ShouldBeTrue(); + + var projection = source.Build(theStore); + + var builder = new SingleStreamRebuilder(theStore, theStore.Storage.Database, (IAggregationRuntime)projection); + + var daemon = Substitute.For(); + daemon.HighWaterMark().Returns(NumberOfEvents); + + await builder.RebuildAllAsync(daemon, new ShardName("StringTrip", "All"), projection.As().Projection, CancellationToken.None); + + await CheckAllExpectedStringCentricAggregatesAgainstActuals(doc => doc.Id); + + var states = await theStore.Advanced.AllProjectionProgress(); + var state = states.Single(x => x.ShardName == "StringTrip:All"); + state.Mode.ShouldBe(ShardMode.continuous); + state.Sequence.ShouldBe(NumberOfEvents); + } + +} + + +[ValueObject] +public partial struct TripId; + +public class StrongTrip +{ + public TripId? Id { get; set; } + + public int EndedOn { get; set; } + + public double Traveled { get; set; } + + public string State { get; set; } + + public bool Active { get; set; } + + public int StartedOn { get; set; } + public Guid? RepairShopId { get; set; } + + protected bool Equals(StrongTrip other) + { + return Id.Value.Equals(other.Id.Value) && EndedOn == other.EndedOn && Traveled.Equals(other.Traveled) && State == other.State && Active == other.Active && StartedOn == other.StartedOn; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((StrongTrip) obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(Id, EndedOn, Traveled, State, Active, StartedOn); + } + + public override string ToString() + { + return $"{nameof(Id)}: {Id}, {nameof(EndedOn)}: {EndedOn}, {nameof(Traveled)}: {Traveled}, {nameof(State)}: {State}, {nameof(Active)}: {Active}, {nameof(StartedOn)}: {StartedOn}"; + } +} + +public class StrongTripProjection: SingleStreamProjection +{ + public StrongTripProjection() + { + DeleteEvent(); + + DeleteEvent(x => x.IsCritical); + + DeleteEvent((trip, _) => trip.Traveled > 1000); + } + + // These methods can be either public, internal, or private but there's + // a small performance gain to making them public + public void Apply(Arrival e, StrongTrip trip) => trip.State = e.State; + public void Apply(Travel e, StrongTrip trip) => trip.Traveled += e.TotalDistance(); + + public void Apply(TripEnded e, StrongTrip trip) + { + trip.Active = false; + trip.EndedOn = e.Day; + } + + public StrongTrip Create(TripStarted started) + { + return new StrongTrip { StartedOn = started.Day, Active = true }; + } +} + + +public class StringTrip +{ + public string Id { get; set; } + + public int EndedOn { get; set; } + + public double Traveled { get; set; } + + public string State { get; set; } + + public bool Active { get; set; } + + public int StartedOn { get; set; } + public Guid? RepairShopId { get; set; } + + protected bool Equals(StringTrip other) + { + return Id.Equals(other.Id) && EndedOn == other.EndedOn && Traveled.Equals(other.Traveled) && State == other.State && Active == other.Active && StartedOn == other.StartedOn; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((StringTrip) obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(Id, EndedOn, Traveled, State, Active, StartedOn); + } + + public override string ToString() + { + return $"{nameof(Id)}: {Id}, {nameof(EndedOn)}: {EndedOn}, {nameof(Traveled)}: {Traveled}, {nameof(State)}: {State}, {nameof(Active)}: {Active}, {nameof(StartedOn)}: {StartedOn}"; + } +} + +public class StringTripProjection: SingleStreamProjection +{ + public StringTripProjection() + { + DeleteEvent(); + + DeleteEvent(x => x.IsCritical); + + DeleteEvent((trip, _) => trip.Traveled > 1000); + } + + // These methods can be either public, internal, or private but there's + // a small performance gain to making them public + public void Apply(Arrival e, StringTrip trip) => trip.State = e.State; + public void Apply(Travel e, StringTrip trip) => trip.Traveled += e.TotalDistance(); + + public void Apply(TripEnded e, StringTrip trip) + { + trip.Active = false; + trip.EndedOn = e.Day; + } + + public StringTrip Create(TripStarted started) + { + return new StringTrip { StartedOn = started.Day, Active = true }; + } +} diff --git a/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs b/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs index 46582cecc1..e737330df5 100644 --- a/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs +++ b/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs @@ -9,8 +9,11 @@ using Marten.Internal.Sessions; using Marten.Metadata; using Marten.Schema; +using Marten.Storage; using Marten.Testing.Documents; using Marten.Testing.Harness; +using Marten.Util; +using NSubstitute; using Shouldly; using Xunit; @@ -37,6 +40,65 @@ public void configure_mapping(bool isSingleGrouper, EventAppendMode appendMode, mapping.UseVersionFromMatchingStream.ShouldBe(useVersionFromStream); } + [Fact] + public void caches_aggregate_caches_correctly() + { + var projection = new MyCustomProjection(); + var db = Substitute.For(); + db.Identifier.Returns("main"); + var tenant1 = new Tenant("one", db); + var tenant2 = new Tenant("two", db); + var tenant3 = new Tenant("three", db); + + var cache1 = projection.CacheFor(tenant1); + var cache2 = projection.CacheFor(tenant2); + var cache3 = projection.CacheFor(tenant3); + + projection.CacheFor(tenant1).ShouldBeTheSameAs(cache1); + projection.CacheFor(tenant2).ShouldBeTheSameAs(cache2); + projection.CacheFor(tenant3).ShouldBeTheSameAs(cache3); + + cache1.ShouldNotBeTheSameAs(cache2); + cache1.ShouldNotBeTheSameAs(cache3); + cache2.ShouldNotBeTheSameAs(cache3); + } + + [Fact] + public void build_nullo_cache_with_no_limit() + { + var projection = new MyCustomProjection(); + projection.CacheLimitPerTenant = 0; + + var db = Substitute.For(); + db.Identifier.Returns("main"); + var tenant1 = new Tenant("one", db); + var tenant2 = new Tenant("two", db); + var tenant3 = new Tenant("three", db); + + projection.CacheFor(tenant1).ShouldBeOfType>(); + projection.CacheFor(tenant2).ShouldBeOfType>(); + projection.CacheFor(tenant3).ShouldBeOfType>(); + + } + + [Fact] + public void build_real_cache_with_limit() + { + var projection = new MyCustomProjection(); + projection.CacheLimitPerTenant = 1000; + + var db = Substitute.For(); + db.Identifier.Returns("main"); + var tenant1 = new Tenant("one", db); + var tenant2 = new Tenant("two", db); + var tenant3 = new Tenant("three", db); + + projection.CacheFor(tenant1).ShouldBeOfType>().Limit.ShouldBe(projection.CacheLimitPerTenant); + projection.CacheFor(tenant2).ShouldBeOfType>().Limit.ShouldBe(projection.CacheLimitPerTenant); + projection.CacheFor(tenant3).ShouldBeOfType>().Limit.ShouldBe(projection.CacheLimitPerTenant); + + } + [Fact] public void default_projection_name_is_type_name() { diff --git a/src/EventSourcingTests/EventGraphTests.cs b/src/EventSourcingTests/EventGraphTests.cs index 406a3bbdc1..640f876ca1 100644 --- a/src/EventSourcingTests/EventGraphTests.cs +++ b/src/EventSourcingTests/EventGraphTests.cs @@ -13,6 +13,12 @@ public class EventGraphTests { private readonly EventGraph theGraph = new EventGraph(new StoreOptions()); + [Fact] + public void use_optimized_projection_rebuilds_is_false_by_default() + { + theGraph.UseOptimizedProjectionRebuilds.ShouldBeFalse(); + } + [Fact] public void build_event() { diff --git a/src/EventSourcingTests/Examples/Optimizations.cs b/src/EventSourcingTests/Examples/Optimizations.cs index e1349c0d76..3d0cd5df59 100644 --- a/src/EventSourcingTests/Examples/Optimizations.cs +++ b/src/EventSourcingTests/Examples/Optimizations.cs @@ -49,8 +49,15 @@ public static async Task use_optimizations() // during command handling with some significant // caveats opts.Events.UseIdentityMapForInlineAggregates = true; + + + // Opts into a mode where Marten is able to rebuild single + // stream projections faster by building one stream at a time + // Does require new table migrations for Marten 7 users though + opts.Events.UseOptimizedProjectionRebuilds = true; }); #endregion } + } diff --git a/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs b/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs index 415797cd6e..c66116e449 100644 --- a/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs +++ b/src/EventSourcingTests/Projections/SingleStreamProjectionTests.cs @@ -1,9 +1,13 @@ +using EventSourcingTests.Aggregation; using Marten; using Marten.Events; using Marten.Events.Aggregation; using Marten.Events.Projections; using Marten.Schema; +using Marten.Storage; using Marten.Testing.Documents; +using Marten.Util; +using NSubstitute; using Shouldly; using Xunit; diff --git a/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs b/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs index 8ed0ec741d..12ace83b2d 100644 --- a/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs +++ b/src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Runtime.Loader; @@ -87,7 +88,10 @@ await Parallel.ForEachAsync(projectionNames, _cancellation.Token, try { + var stopwatch = new Stopwatch(); + stopwatch.Start(); await daemon.RebuildProjectionAsync(projectionName, shardTimeout.Value, token).ConfigureAwait(false); + AnsiConsole.MarkupLine($"[green]Finished rebuilding {projectionName} in {stopwatch.ElapsedMilliseconds} ms[/]"); } catch (Exception e) { diff --git a/src/Marten/AdvancedOperations.cs b/src/Marten/AdvancedOperations.cs index 613c4d2b47..99e9beed37 100644 --- a/src/Marten/AdvancedOperations.cs +++ b/src/Marten/AdvancedOperations.cs @@ -13,7 +13,6 @@ using Marten.Storage; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Microsoft.Extensions.Options; using Weasel.Postgresql; using Weasel.Postgresql.Tables.Partitioning; diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index 72c6b58cb5..90edc8c234 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -4,7 +4,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using JasperFx.Core; using JasperFx.Core.Reflection; +using Marten.Events.Aggregation.Rebuilds; using Marten.Events.Daemon; using Marten.Events.Daemon.Internals; using Marten.Events.Projections; @@ -15,6 +17,7 @@ using Marten.Services; using Marten.Sessions; using Marten.Storage; +using Marten.Util; using Microsoft.Extensions.Options; using Npgsql; @@ -42,6 +45,8 @@ public override bool IsNew(EventSlice slice) public abstract class AggregationRuntime: IAggregationRuntime where TDoc : notnull where TId : notnull { + private readonly Func _identitySource; + public AggregationRuntime(IDocumentStore store, IAggregateProjection projection, IEventSlicer slicer, IDocumentStorage storage) { @@ -49,6 +54,29 @@ public AggregationRuntime(IDocumentStore store, IAggregateProjection projection, Slicer = slicer; Storage = storage; Options = store.As().Options; + CacheLimitPerTenant = projection.CacheLimitPerTenant; + + if (Slicer is ISingleStreamSlicer) + { + if (typeof(TId) == typeof(Guid)) + { + _identitySource = e => e.StreamId.As(); + } + else if (typeof(TId) == typeof(string)) + { + _identitySource = e => e.StreamKey!.As(); + } + else + { + // Strong typed identifiers. Users love them, I hate them + var valueType = store.Options.As().FindOrCreateValueType(typeof(TId)); + _identitySource = valueType.CreateAggregateIdentitySource(); + } + } + else + { + _identitySource = e => throw new NotSupportedException("Not supported for multi-stream projections"); + } } internal StoreOptions Options { get; } @@ -241,4 +269,50 @@ public TDoc CreateDefault(IEvent @event) throw new System.InvalidOperationException($"There is no default constructor for {typeof(TDoc).FullNameInCode()} or Create method for {@event.DotNetTypeName} event type.Check more about the create method convention in documentation: https://martendb.io/events/projections/event-projections.html#create-method-convention. If you're using Upcasting, check if {@event.DotNetTypeName} is an old event type. If it is, make sure to define transformation for it to new event type. Read more in Upcasting docs: https://martendb.io/events/versioning.html#upcasting-advanced-payload-transformations."); } } + + // TODO -- duplicated with CustomProjection + /// + /// If more than 0 (the default), this is the maximum number of aggregates + /// that will be cached in a 2nd level, most recently used cache during async + /// projection. Use this to potentially improve async projection throughput + /// + public int CacheLimitPerTenant { get; set; } = 0; + + private ImHashMap> _caches = ImHashMap>.Empty; + private readonly object _cacheLock = new(); + + public IAggregateCache CacheFor(Tenant tenant) + { + if (_caches.TryFind(tenant, out var cache)) return cache; + + lock (_cacheLock) + { + if (_caches.TryFind(tenant, out cache)) return cache; + + cache = CacheLimitPerTenant == 0 + ? new NulloAggregateCache() + : new RecentlyUsedCache { Limit = CacheLimitPerTenant }; + + _caches = _caches.AddOrUpdate(tenant, cache); + + return cache; + } + } + + public TId IdentityFromEvent(IEvent e) + { + return _identitySource(e); + } + + public bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor) + { + if (Slicer is ISingleStreamSlicer) + { + executor = new SingleStreamRebuilder(store, database, this); + return true; + } + + executor = default; + return false; + } } diff --git a/src/Marten/Events/Aggregation/CustomProjection.cs b/src/Marten/Events/Aggregation/CustomProjection.cs index 34f531a19e..4fee3e0727 100644 --- a/src/Marten/Events/Aggregation/CustomProjection.cs +++ b/src/Marten/Events/Aggregation/CustomProjection.cs @@ -3,7 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using JasperFx.Core; using JasperFx.Core.Reflection; +using Marten.Events.Aggregation.Rebuilds; using Marten.Events.Daemon; using Marten.Events.Daemon.Internals; using Marten.Events.Projections; @@ -14,6 +16,7 @@ using Marten.Services; using Marten.Sessions; using Marten.Storage; +using Marten.Util; namespace Marten.Events.Aggregation; @@ -47,6 +50,8 @@ protected CustomProjection() } } + public IAggregateProjection Projection => this; + public bool IsSingleStream() { return Slicer is ISingleStreamSlicer; @@ -106,6 +111,18 @@ async ValueTask IAggregationRuntime.GroupEvents(DocumentStore s return new TenantSliceRange(store, this, range, groups, cancellationToken); } + public bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor) + { + if (Slicer is ISingleStreamSlicer) + { + executor = new SingleStreamRebuilder(store, database, this); + return true; + } + + executor = default; + return false; + } + /// /// Apply any document changes based on the incoming slice of events to the underlying aggregate document /// @@ -178,7 +195,6 @@ public virtual bool IsNew(EventSlice slice) /// /// /// - /// public ValueTask>> GroupEventRange(DocumentStore store, IMartenDatabase database, EventRange range, CancellationToken cancellation) @@ -350,6 +366,42 @@ async ValueTask ILiveAggregator.BuildAsync(IReadOnlyList eve return slice.Aggregate; } + + // TODO -- duplicated with AggregationRuntime, and that's an ick. + /// + /// If more than 0 (the default), this is the maximum number of aggregates + /// that will be cached in a 2nd level, most recently used cache during async + /// projection. Use this to potentially improve async projection throughput + /// + public int CacheLimitPerTenant { get; set; } = 0; + + private ImHashMap> _caches = ImHashMap>.Empty; + private readonly object _cacheLock = new(); + + public IAggregateCache CacheFor(Tenant tenant) + { + if (_caches.TryFind(tenant, out var cache)) return cache; + + lock (_cacheLock) + { + if (_caches.TryFind(tenant, out cache)) return cache; + + cache = CacheLimitPerTenant == 0 + ? new NulloAggregateCache() + : new RecentlyUsedCache { Limit = CacheLimitPerTenant }; + + _caches = _caches.AddOrUpdate(tenant, cache); + + return cache; + } + } + + public TId IdentityFromEvent(IEvent e) + { + // TODO -- come back here. + throw new NotImplementedException(); + } } + diff --git a/src/Marten/Events/Aggregation/GeneratedAggregateProjectionBase.cs b/src/Marten/Events/Aggregation/GeneratedAggregateProjectionBase.cs index 0f5b112b24..7492e1625a 100644 --- a/src/Marten/Events/Aggregation/GeneratedAggregateProjectionBase.cs +++ b/src/Marten/Events/Aggregation/GeneratedAggregateProjectionBase.cs @@ -61,6 +61,14 @@ protected GeneratedAggregateProjectionBase(AggregationScope scope): base(typeof( } } + // TODO -- duplicated with AggregationRuntime, and that's an ick. + /// + /// If more than 0 (the default), this is the maximum number of aggregates + /// that will be cached in a 2nd level, most recently used cache during async + /// projection. Use this to potentially improve async projection throughput + /// + public int CacheLimitPerTenant { get; set; } = 0; + /// /// Use to create "side effects" when running an aggregation (single stream, custom projection, multi-stream) /// asynchronously in a continuous mode (i.e., not in rebuilds) diff --git a/src/Marten/Events/Aggregation/IAggregateProjection.cs b/src/Marten/Events/Aggregation/IAggregateProjection.cs index 65d5cb385f..3f80819758 100644 --- a/src/Marten/Events/Aggregation/IAggregateProjection.cs +++ b/src/Marten/Events/Aggregation/IAggregateProjection.cs @@ -50,7 +50,6 @@ public interface IAggregateProjection // THIS NEEDS TO REMAIN PUBLIC AsyncOptions Options { get; } - /// /// Specify that this projection is a non 1 version of the original projection definition to opt /// into Marten's parallel blue/green deployment of this projection. @@ -66,4 +65,12 @@ public interface IAggregateProjection // THIS NEEDS TO REMAIN PUBLIC /// /// void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions storeOptions); + + // TODO -- duplicated with AggregationRuntime, and that's an ick. + /// + /// If more than 0 (the default), this is the maximum number of aggregates + /// that will be cached in a 2nd level, most recently used cache during async + /// projection. Use this to potentially improve async projection throughput + /// + public int CacheLimitPerTenant { get; set; } } diff --git a/src/Marten/Events/Aggregation/IAggregationRuntime.cs b/src/Marten/Events/Aggregation/IAggregationRuntime.cs index 1c7d07a50a..99518e70f3 100644 --- a/src/Marten/Events/Aggregation/IAggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/IAggregationRuntime.cs @@ -1,5 +1,3 @@ -using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Marten.Events.Daemon; @@ -8,6 +6,7 @@ using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Storage; +using Marten.Util; namespace Marten.Events.Aggregation; @@ -20,17 +19,26 @@ public interface IAggregationRuntime: IProjection ValueTask GroupEvents(DocumentStore store, IMartenDatabase database, EventRange range, CancellationToken cancellationToken); + + bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor); + + IAggregateProjection Projection { get; } } public interface IAggregationRuntime: IAggregationRuntime where TDoc : notnull where TId : notnull { IDocumentStorage Storage { get; } + IEventSlicer Slicer { get; } + + ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice slice, CancellationToken cancellation, ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline); bool IsNew(EventSlice slice); - IEventSlicer Slicer { get; } + IAggregateCache CacheFor(Tenant tenant); + + TId IdentityFromEvent(IEvent e); } diff --git a/src/Marten/Events/Aggregation/Rebuilds/AggregateIdentity.cs b/src/Marten/Events/Aggregation/Rebuilds/AggregateIdentity.cs new file mode 100644 index 0000000000..611e285ab2 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/AggregateIdentity.cs @@ -0,0 +1,5 @@ +using System; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal record AggregateIdentity(long Number, Guid Id, string Key); diff --git a/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs b/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs new file mode 100644 index 0000000000..19221a2545 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs @@ -0,0 +1,210 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using JasperFx.Core; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Events.Projections; +using Marten.Exceptions; +using Marten.Internal.Sessions; +using Marten.Services; +using Marten.Storage; +using Npgsql; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class AggregatePageHandler +{ + private readonly int _aggregateIndex; + private readonly NpgsqlCommand _command; + private readonly IMartenDatabase _database; + private readonly IReadOnlyList _ids; + private readonly IAggregationRuntime _runtime; + private readonly DocumentSessionBase _session; + private readonly IEventStorage _storage; + private readonly DocumentStore _store; + + // If conjoined, the session will hold the tenant id + public AggregatePageHandler(long ceiling, DocumentStore store, DocumentSessionBase session, + IAggregationRuntime runtime, IReadOnlyList ids) + { + _store = store; + _database = session.Database; + _runtime = runtime; + _ids = ids; + + _session = session; + _storage = (IEventStorage)store.Options.Providers.StorageFor().QueryOnly; + _aggregateIndex = _storage.SelectFields().Length; + + _command = buildFetchCommand(ceiling, store, session, ids); + } + + private NpgsqlCommand buildFetchCommand(long ceiling, DocumentStore store, DocumentSessionBase session, + IReadOnlyList ids) + { + var schemaName = store.Options.Events.DatabaseSchemaName; + + var builder = new CommandBuilder(); + builder.Append($"select {_storage.SelectFields().Select(x => "d." + x).Join(", ")}, s.type as stream_type"); + builder.Append( + $" from {schemaName}.mt_events as d inner join {schemaName}.mt_streams as s on d.stream_id = s.id"); + + if (_store.Options.Events.TenancyStyle == TenancyStyle.Conjoined) + { + builder.Append(" and d.tenant_id = s.tenant_id"); + } + + builder.Append(" where d.stream_id = Any("); + + if (_store.Options.EventGraph.StreamIdentity == StreamIdentity.AsGuid) + { + builder.AppendParameter(ids.Select(x => x.Id).ToArray(), NpgsqlDbType.Array | NpgsqlDbType.Uuid); + } + else + { + builder.AppendParameter(ids.Select(x => x.Key).ToArray(), NpgsqlDbType.Array | NpgsqlDbType.Text); + } + + builder.Append($") and d.seq_id <= {ceiling}"); + + if (_store.Options.Events.TenancyStyle == TenancyStyle.Conjoined) + { + builder.Append(" and s.tenant_id = "); + builder.AppendParameter(session.TenantId); + builder.Append(" and d.tenant_id = "); + builder.AppendParameter(session.TenantId); + } + + builder.Append(" order by d.stream_id, d.version"); + + var command = builder.Compile(); + return command; + } + + // SAME no matter what + internal async IAsyncEnumerable ReadEventsAsync(IDaemonRuntime runtime, ShardName shardName, + ErrorHandlingOptions errorOptions, [EnumeratorCancellation] CancellationToken token) + { + var sessionOptions = SessionOptions.ForDatabase(_session.TenantId, _database); + + await using var session = (QuerySession)_store.QuerySession(sessionOptions); + await using var reader = await session.ExecuteReaderAsync(_command, token).ConfigureAwait(false); + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + IEvent @event = null; + + try + { + // as a decorator + @event = await _storage.ResolveAsync(reader, token).ConfigureAwait(false); + + if (!await reader.IsDBNullAsync(_aggregateIndex, token).ConfigureAwait(false)) + { + @event.AggregateTypeName = + await reader.GetFieldValueAsync(_aggregateIndex, token).ConfigureAwait(false); + } + } + catch (UnknownEventTypeException e) + { + if (errorOptions.SkipUnknownEvents) + { + runtime.Logger.EventUnknown(e.EventTypeName); + } + else + { + // Let any other exception throw + throw; + } + } + catch (EventDeserializationFailureException e) + { + if (errorOptions.SkipSerializationErrors) + { + runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence); + runtime.Logger.EventDeserializationExceptionDebug(e); + await runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(shardName)).ConfigureAwait(false); + } + else + { + // Let any other exception throw + throw; + } + } + + if (@event != null) + { + yield return @event; + } + } + } + + public async Task ProcessPageAsync(IDaemonRuntime runtime, ShardName shardName, ErrorHandlingOptions errorOptions, + CancellationToken token) + { + var batch = new ProjectionUpdateBatch(_store.Options.Projections, _session, ShardExecutionMode.Rebuild, token) + { + ShouldApplyListeners = false + }; + + // Gotta use the current tenant if using conjoined tenancy + var sessionOptions = SessionOptions.ForDatabase(_session.TenantId, _session.Database); + + await using var session = + new ProjectionDocumentSession(_store, batch, sessionOptions, ShardExecutionMode.Rebuild); + + var events = ReadEventsAsync(runtime, shardName, errorOptions, token); + + ITargetBlock> block = new ActionBlock>(async s => + { + // ReSharper disable once AccessToDisposedClosure + await _runtime.ApplyChangesAsync(session, s, token, ProjectionLifecycle.Async).ConfigureAwait(false); + }); + + await collateAndPostSlices(events, block).ConfigureAwait(false); + + session.QueueOperation( + new DequeuePendingAggregateRebuilds(_store.Options, _ids.Select(x => x.Number).ToArray())); + + // Wait for all the SQL to be built out to write all operations + await waitForBatchOperations(block, batch).ConfigureAwait(false); + + // Polly is already around the basic retry here, so anything that gets past this + // probably deserves a full circuit break + await session.ExecuteBatchAsync(batch, token).ConfigureAwait(false); + } + + private static async Task waitForBatchOperations(ITargetBlock> block, ProjectionUpdateBatch batch) + { + block.Complete(); + await block.Completion.ConfigureAwait(false); + await batch.WaitForCompletion().ConfigureAwait(false); + } + + private async Task collateAndPostSlices(IAsyncEnumerable events, ITargetBlock> block) + { + EventSlice slice = null; + await foreach (var e in events) + { + var aggregateId = _runtime.IdentityFromEvent(e); + slice ??= new EventSlice(aggregateId, _session); + + if (!slice.Id.Equals(aggregateId)) + { + block.Post(slice); + slice = new EventSlice(aggregateId, _session); + } + + slice.AddEvent(e); + } + + // Get the last one + block.Post(slice); + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/AggregateRebuildTable.cs b/src/Marten/Events/Aggregation/Rebuilds/AggregateRebuildTable.cs new file mode 100644 index 0000000000..52ecca86ee --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/AggregateRebuildTable.cs @@ -0,0 +1,29 @@ +using System; +using Marten.Storage.Metadata; +using Weasel.Core; +using Weasel.Postgresql.Tables; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class AggregateRebuildTable: Table +{ + public const string Name = "mt_aggregate_rebuild"; + + public AggregateRebuildTable(EventGraph events) : base(new DbObjectName(events.DatabaseSchemaName, Name)) + { + AddColumn("number", "serial").AsPrimaryKey(); + + if (events.StreamIdentity == StreamIdentity.AsGuid) + { + AddColumn("id").NotNull(); + } + else + { + AddColumn("id").NotNull(); + } + + AddColumn("stream_type").NotNull(); + AddColumn(TenantIdColumn.Name).NotNull(); + AddColumn("completed"); + } +} \ No newline at end of file diff --git a/src/Marten/Events/Aggregation/Rebuilds/BackfillStreamTypeOperation.cs b/src/Marten/Events/Aggregation/Rebuilds/BackfillStreamTypeOperation.cs new file mode 100644 index 0000000000..0fd4e09bc8 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/BackfillStreamTypeOperation.cs @@ -0,0 +1,70 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core.Reflection; +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Storage; +using Microsoft.Extensions.Logging; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class BackfillStreamTypeOperation: IStorageOperation +{ + private readonly ILogger _logger; + private readonly long _floor; + private readonly long _ceiling; + private readonly string _streamType; + private readonly string[] _eventTypeNames; + private readonly string _schemaName; + private readonly TenancyStyle _tenancy; + + public BackfillStreamTypeOperation(ILogger logger, long floor, long ceiling, EventGraph events, IAggregateProjection projection) + { + _tenancy = events.TenancyStyle; + _schemaName = events.DatabaseSchemaName; + _logger = logger; + _floor = floor; + _ceiling = ceiling; + _streamType = events.AggregateAliasFor(projection.AggregateType); + _eventTypeNames = projection.AllEventTypes.Select(x => events.EventMappingFor((Type)x).EventTypeName).ToArray(); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append($"update {_schemaName}.mt_streams s set type = "); + builder.AppendParameter(_streamType); + builder.Append($" from {_schemaName}.mt_events e where s.id = e.stream_id"); + + if (_tenancy == TenancyStyle.Conjoined) + { + builder.Append($" and s.tenant_id = e.tenant_id"); + } + + builder.Append($" and s.type is NULL and e.type = ANY("); + builder.AppendParameter(_eventTypeNames, NpgsqlDbType.Array | NpgsqlDbType.Text); + builder.Append(") and s.is_archived = FALSE and e.seq_id > "); + builder.AppendParameter(_floor); + builder.Append(" and e.seq_id <= "); + builder.AppendParameter(_ceiling); + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + // Nothing + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + // TODO -- log something here + return Task.CompletedTask; + } + + public OperationRole Role() => OperationRole.Other; +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/DequeuePendingAggregateRebuilds.cs b/src/Marten/Events/Aggregation/Rebuilds/DequeuePendingAggregateRebuilds.cs new file mode 100644 index 0000000000..f9a8544df4 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/DequeuePendingAggregateRebuilds.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Services; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class DequeuePendingAggregateRebuilds: IStorageOperation, NoDataReturnedCall +{ + private readonly long[] _numbers; + private readonly string _schemaName; + + public DequeuePendingAggregateRebuilds(StoreOptions options, IEnumerable numbers) + { + _numbers = numbers.ToArray(); + _schemaName = options.Events.DatabaseSchemaName; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append($"delete from {_schemaName}.{AggregateRebuildTable.Name} where number = ANY("); + builder.AppendParameter(_numbers, NpgsqlDbType.Array | NpgsqlDbType.Bigint); + builder.Append(")"); + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + // Nothing + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() => OperationRole.Other; +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsContinuous.cs b/src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsContinuous.cs new file mode 100644 index 0000000000..0cd2310400 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsContinuous.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Daemon; +using Marten.Internal; +using Marten.Internal.Operations; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class MarkShardModeAsContinuous : IStorageOperation +{ + private readonly ShardName _shardName; + private readonly EventGraph _events; + private readonly long _lastSequenceId; + + public MarkShardModeAsContinuous(ShardName shardName, EventGraph events, long lastSequenceId) + { + _shardName = shardName; + _events = events; + _lastSequenceId = lastSequenceId; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = + builder.AppendWithParameters($"update {_events.ProgressionTable} set mode = '{ShardMode.continuous}', last_seq_id = ?, rebuild_threshold = 0 where name = ?"); + + parameters[0].Value = _lastSequenceId; + parameters[0].NpgsqlDbType = NpgsqlDbType.Bigint; + + parameters[1].Value = _shardName.Identity; + parameters[1].NpgsqlDbType = NpgsqlDbType.Varchar; + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + throw new NotSupportedException(); + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() + { + return OperationRole.Other; + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsRebuilding.cs b/src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsRebuilding.cs new file mode 100644 index 0000000000..be9a74369d --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/MarkShardModeAsRebuilding.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Daemon; +using Marten.Internal; +using Marten.Internal.Operations; +using NpgsqlTypes; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class MarkShardModeAsRebuilding : IStorageOperation +{ + private readonly ShardName _shardName; + private readonly EventGraph _events; + private readonly long _rebuildThreshold; + + public MarkShardModeAsRebuilding(ShardName shardName, EventGraph events, long rebuildThreshold) + { + _shardName = shardName; + _events = events; + _rebuildThreshold = rebuildThreshold; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = + builder.AppendWithParameters($"insert into {_events.ProgressionTable} (name, last_seq_id, mode, rebuild_threshold) values (?, 0, '{ShardMode.rebuilding}', ?) on conflict (name) do update set mode = '{ShardMode.rebuilding}', last_seq_id = 0, rebuild_threshold = ?"); + + parameters[0].Value = _shardName.Identity; + parameters[0].NpgsqlDbType = NpgsqlDbType.Varchar; + parameters[1].Value = _rebuildThreshold; + parameters[1].NpgsqlDbType = NpgsqlDbType.Bigint; + parameters[2].Value = _rebuildThreshold; + parameters[2].NpgsqlDbType = NpgsqlDbType.Bigint; + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + throw new NotSupportedException(); + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() + { + return OperationRole.Other; + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/QueryForNextAggregateIds.cs b/src/Marten/Events/Aggregation/Rebuilds/QueryForNextAggregateIds.cs new file mode 100644 index 0000000000..26f0a08ff0 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/QueryForNextAggregateIds.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Daemon; +using Marten.Internal; +using Marten.Linq.QueryHandlers; +using Marten.Storage; +using Marten.Storage.Metadata; +using Weasel.Postgresql; +using NotSupportedException = System.NotSupportedException; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class QueryForNextAggregateIds: IQueryHandler> +{ + private readonly string _streamAlias; + private readonly string _schemaName; + private readonly TenancyStyle _tenancy; + private readonly StreamIdentity _streamIdentity; + + public QueryForNextAggregateIds(StoreOptions options, Type aggregateType) + { + _streamAlias = options.EventGraph.AggregateAliasFor(aggregateType); + _schemaName = options.Events.DatabaseSchemaName; + _tenancy = options.Events.TenancyStyle; + _streamIdentity = options.EventGraph.StreamIdentity; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append($"select number, id from {_schemaName}.{AggregateRebuildTable.Name} where stream_type = "); + builder.AppendParameter(_streamAlias); + if (_tenancy == TenancyStyle.Conjoined) + { + builder.Append($" and {TenantIdColumn.Name} = "); + builder.AppendParameter(session.TenantId); + } + + builder.Append(" order by number limit "); + builder.AppendParameter(DaemonSettings.RebuildBatchSize); + } + + public IReadOnlyList Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public async Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + var list = new List(); + + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + var number = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + if (_streamIdentity == StreamIdentity.AsGuid) + { + var id = await reader.GetFieldValueAsync(1, token).ConfigureAwait(false); + list.Add(new AggregateIdentity(number, id, null)); + } + else + { + var key = await reader.GetFieldValueAsync(1, token).ConfigureAwait(false); + list.Add(new AggregateIdentity(number, Guid.Empty, key)); + } + } + + return list; + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/QueryForTenantIds.cs b/src/Marten/Events/Aggregation/Rebuilds/QueryForTenantIds.cs new file mode 100644 index 0000000000..ad876eda90 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/QueryForTenantIds.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal; +using Marten.Linq.QueryHandlers; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class QueryForTenantIds: IQueryHandler> +{ + private readonly string _streamAlias; + private readonly string _schemaName; + + public QueryForTenantIds(StoreOptions options, Type aggregateType) + { + _streamAlias = options.Storage.MappingFor(aggregateType).Alias; + _schemaName = options.Events.DatabaseSchemaName; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + // TODO -- what about an extreme number of tenants? + builder.Append($"select distinct(tenant_id) from {_schemaName}.{AggregateRebuildTable.Name} where stream_type = "); + builder.AppendParameter(_streamAlias); + } + + public IReadOnlyList Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public async Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + var list = new List(); + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + list.Add(await reader.GetFieldValueAsync(0, token).ConfigureAwait(false)); + } + + return list; + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/SeedAggregateRebuildTable.cs b/src/Marten/Events/Aggregation/Rebuilds/SeedAggregateRebuildTable.cs new file mode 100644 index 0000000000..e3fb78ce1f --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/SeedAggregateRebuildTable.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Schema; +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Storage.Metadata; +using Weasel.Postgresql; + +namespace Marten.Events.Aggregation.Rebuilds; + +internal class SeedAggregateRebuildTable: IStorageOperation +{ + private readonly string _streamAlias; + private readonly string _schemaName; + + public SeedAggregateRebuildTable(StoreOptions options, Type aggregateType) + { + _streamAlias = options.EventGraph.AggregateAliasFor(aggregateType); + _schemaName = options.Events.DatabaseSchemaName; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append($"delete from {_schemaName}.{AggregateRebuildTable.Name} where stream_type = "); + builder.AppendParameter(_streamAlias); + builder.StartNewCommand(); + builder.Append($"insert into {_schemaName}.{AggregateRebuildTable.Name} (id, stream_type, {TenantIdColumn.Name}, completed) select id, '{_streamAlias}', {TenantIdColumn.Name}, false from {_schemaName}.{StreamsTable.TableName} where type = '{_streamAlias}' and is_archived = false order by timestamp desc"); + } + + public Type DocumentType => typeof(IEvent); + public void Postprocess(DbDataReader reader, IList exceptions) + { + // Nothing + } + + public Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public OperationRole Role() + { + return OperationRole.Other; + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/SingleStreamRebuilder.cs b/src/Marten/Events/Aggregation/Rebuilds/SingleStreamRebuilder.cs new file mode 100644 index 0000000000..c260342b99 --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/SingleStreamRebuilder.cs @@ -0,0 +1,226 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core.Reflection; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Events.Daemon.Progress; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Marten.Services; +using Marten.Storage; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Marten.Events.Aggregation.Rebuilds; + +public class SingleStreamRebuilder: IReplayExecutor +{ + public const string Backfill = "Backfill"; + private readonly IMartenDatabase _database; + private readonly IAggregationRuntime _runtime; + private readonly DocumentStore _store; + private long _ceiling; + private readonly ILogger _logger; + + public SingleStreamRebuilder(DocumentStore store, IMartenDatabase database, + IAggregationRuntime runtime) + { + _store = store; + _database = database; + _runtime = runtime; + _logger = store.Options.LogFactory?.CreateLogger>() ?? store.Options.DotNetLogger ?? NullLogger>.Instance; + } + + public Task StartAsync(SubscriptionExecutionRequest request, ISubscriptionController controller, + CancellationToken cancellation) + { + return RebuildAllAsync(request.Runtime, controller.Name, _runtime.Projection, cancellation); + } + + public async Task RebuildAllAsync(IDaemonRuntime runtime, ShardName shardName, IAggregateProjection projection, + CancellationToken token) + { + _ceiling = runtime.HighWaterMark(); + + await using var session = await initializeAsync(runtime, projection, shardName, projection.Options, token).ConfigureAwait(false); + + if (_store.Options.EventGraph.TenancyStyle == TenancyStyle.Conjoined) + { + var tenantIds = await session.ExecuteHandlerAsync(new QueryForTenantIds(_store.Options, typeof(TDoc)), token).ConfigureAwait(false); + + // TODO -- could parallelize this maybe? + foreach (var tenantId in tenantIds) + { + _logger.LogInformation("Starting optimized rebuild for {ShardName} at tenant {TenantId}", shardName.Identity, tenantId); + await processSpecificTenant(runtime, shardName, token, tenantId).ConfigureAwait(false); + _logger.LogInformation("Finished optimized rebuild for {ShardName} at tenant {TenantId}", shardName.Identity, tenantId); + } + } + else + { + // Single tenancy, just go + await processSingleTenant(runtime, shardName, token, session).ConfigureAwait(false); + } + + // Mark as running continuously -- come back to this! + session.QueueOperation(new MarkShardModeAsContinuous(shardName, _store.Options.EventGraph, _ceiling)); + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + private async Task processSpecificTenant(IDaemonRuntime runtime, ShardName shardName, CancellationToken token, + string tenantId) + { + var options = SessionOptions.ForDatabase(tenantId, _database); + + await using var tenantSession = (DocumentSessionBase)_store.LightweightSession(options); + await processSingleTenant(runtime, shardName, token, tenantSession).ConfigureAwait(false); + } + + private async Task processSingleTenant(IDaemonRuntime runtime, ShardName shardName, CancellationToken token, + DocumentSessionBase session) + { + while (!token.IsCancellationRequested) + { + var shouldContinue = await executeNextBatch(runtime, shardName, token, session).ConfigureAwait(false); + if (!shouldContinue) + { + break; + } + } + } + + private async Task executeNextBatch(IDaemonRuntime runtime, ShardName shardName, CancellationToken token, + DocumentSessionBase session) + { + // QueryForNextAggregateIds accounts for tenancy by pulling it from the session + var ids = await session + .ExecuteHandlerAsync(new QueryForNextAggregateIds(_store.Options, typeof(TDoc)), token) + .ConfigureAwait(false); + + if (!ids.Any()) + { + return false; + } + + var pageHandler = new AggregatePageHandler(_ceiling, _store, session, _runtime, ids); + await pageHandler.ProcessPageAsync(runtime, shardName, _store.Options.Projections.RebuildErrors, token) + .ConfigureAwait(false); + + return true; + } + + private async Task initializeAsync(IDaemonRuntime runtime, IAggregateProjection projection, + ShardName shardName, + AsyncOptions asyncOptions, + CancellationToken token) + { + var options = SessionOptions.ForDatabase(_database); + options.AllowAnyTenant = true; + + // *This* session's lifecycle will be managed outside of this method + var session = (DocumentSessionBase)_store.LightweightSession(options); + try + { + await _database.EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); + + // TODO -- what if you have two single stream projections for one aggregate? + await backfillStreamTypeAliases(runtime.HighWaterMark(), projection, token).ConfigureAwait(false); + + await _database.EnsureStorageExistsAsync(typeof(TDoc), token).ConfigureAwait(false); + + var state = await tryFindExistingState(shardName, session, token).ConfigureAwait(false); + if (state == null || state.Mode != ShardMode.rebuilding) + { + asyncOptions.Teardown(session); + session.QueueOperation(new SeedAggregateRebuildTable(_store.Options, typeof(TDoc))); + session.QueueOperation(new MarkShardModeAsRebuilding(shardName, _store.Events, _ceiling)); + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + else + { + _ceiling = state.RebuildThreshold; + } + + return session; + } + catch + { + if (session != null) + { + await session.DisposeAsync().ConfigureAwait(false); + } + + throw; + } + } + + private async Task tryFindExistingState(ShardName shardName, DocumentSessionBase session, + CancellationToken token) + { + var handler = (IQueryHandler>)new ListQueryHandler( + new ProjectionProgressStatement(_store.Options.EventGraph) { Name = shardName }, + new ShardStateSelector(_store.Options.EventGraph)); + + var states = await session.ExecuteHandlerAsync(handler, token).ConfigureAwait(false); + return states.SingleOrDefault(); + } + + private async Task backfillStreamTypeAliases(long highWaterMark, IAggregateProjection projection, + CancellationToken token) + { + var backfillName = new ShardName(_store.Options.EventGraph.AggregateAliasFor(projection.AggregateType), Backfill); + + var batchSize = 50000L; + var floor = 0L; + var ceiling = batchSize; + + // By default, let's do this up to the high water mark + var backfillThreshold = highWaterMark; + + var options = SessionOptions.ForDatabase(_database); + options.AllowAnyTenant = true; + + await using var session = (DocumentSessionBase)_store.LightweightSession(options); + var state = await tryFindExistingState(backfillName, session, token).ConfigureAwait(false); + if (state != null) + { + floor = state.Sequence; + if (state.Sequence == backfillThreshold) return; // Unnecessary to do anything here + } + else + { + // Mark this shard as rebuilding + session.QueueOperation(new MarkShardModeAsRebuilding(backfillName, _store.Events, highWaterMark)); + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + _logger.LogInformation("Starting to try to back fill stream aliases for {Aggregate} at Sequence {Sequence}", projection.AggregateType.FullNameInCode(), floor); + + while (!token.IsCancellationRequested && floor < backfillThreshold) + { + try + { + var op = new BackfillStreamTypeOperation(_logger, floor, ceiling, _store.Options.EventGraph, projection); + session.QueueOperation(op); + session.QueueOperation(new UpdateProjectionProgress(_store.Events, new EventRange(backfillName, ceiling))); + await session.SaveChangesAsync(token).ConfigureAwait(false); + + ceiling += batchSize; + floor += batchSize; + } + catch (TaskCanceledException) + { + // Just canceled, get out of here + return; + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to backfill stream type names"); + throw; + } + } + } +} diff --git a/src/Marten/Events/Aggregation/Rebuilds/StringIdentifiedSingleStreamRebuilder.cs b/src/Marten/Events/Aggregation/Rebuilds/StringIdentifiedSingleStreamRebuilder.cs new file mode 100644 index 0000000000..b4c91be47b --- /dev/null +++ b/src/Marten/Events/Aggregation/Rebuilds/StringIdentifiedSingleStreamRebuilder.cs @@ -0,0 +1,22 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Storage; + +namespace Marten.Events.Aggregation.Rebuilds; + +public class StringIdentifiedSingleStreamRebuilder : IReplayExecutor +{ + public StringIdentifiedSingleStreamRebuilder(DocumentStore store, IMartenDatabase database) + { + throw new NotImplementedException(); + } + + public async Task StartAsync(SubscriptionExecutionRequest request, ISubscriptionController controller, + CancellationToken cancellation) + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/src/Marten/Events/Aggregation/TenantSliceGroup.cs b/src/Marten/Events/Aggregation/TenantSliceGroup.cs index 3b36c72b7c..363be01429 100644 --- a/src/Marten/Events/Aggregation/TenantSliceGroup.cs +++ b/src/Marten/Events/Aggregation/TenantSliceGroup.cs @@ -5,7 +5,6 @@ using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using JasperFx.Core; -using JasperFx.Core.Reflection; using Marten.Events.Daemon; using Marten.Events.Daemon.Internals; using Marten.Events.Projections; @@ -151,6 +150,7 @@ await runtime.ApplyChangesAsync(_session, slice, parent.Cancellation, Projection private async Task processEventSlices(IAggregationRuntime runtime, IDocumentStore store, CancellationToken token) { + var cache = runtime.CacheFor(Tenant); var beingFetched = new List>(); foreach (var slice in Slices) { @@ -167,14 +167,23 @@ private async Task processEventSlices(IAggregationRuntime runtime, // Don't use it any farther, it's ready to do its thing Slices.Remove(slice.Id); } + else if (cache.TryFind(slice.Id, out var aggregate)) + { + slice.Aggregate = aggregate; + _builder.Post(slice); + + // Don't use it any farther, it's ready to do its thing + Slices.Remove(slice.Id); + } else { beingFetched.Add(slice); } } - if (token.IsCancellationRequested) + if (token.IsCancellationRequested || !beingFetched.Any()) { + cache.CompactIfNecessary(); return; } @@ -204,6 +213,7 @@ private async Task processEventSlices(IAggregationRuntime runtime, if (dict.TryGetValue(slice.Id, out var aggregate)) { slice.Aggregate = aggregate; + cache.Store(slice.Id, aggregate); } _builder?.Post(slice); diff --git a/src/Marten/Events/Daemon/DaemonSettings.cs b/src/Marten/Events/Daemon/DaemonSettings.cs index d506f8e3b7..9ecda0ecc7 100644 --- a/src/Marten/Events/Daemon/DaemonSettings.cs +++ b/src/Marten/Events/Daemon/DaemonSettings.cs @@ -45,6 +45,8 @@ public interface IReadOnlyDaemonSettings public class DaemonSettings: IReadOnlyDaemonSettings { + public const int RebuildBatchSize = 1000; + /// /// Register session listeners that will ONLY be applied within the asynchronous daemon updates. /// diff --git a/src/Marten/Events/Daemon/HighWater/HighWaterAgent.cs b/src/Marten/Events/Daemon/HighWater/HighWaterAgent.cs index eee1693e78..0c91aec07f 100644 --- a/src/Marten/Events/Daemon/HighWater/HighWaterAgent.cs +++ b/src/Marten/Events/Daemon/HighWater/HighWaterAgent.cs @@ -95,6 +95,18 @@ private async Task detectChanges() { statistics = await _detector.Detect(_token).ConfigureAwait(false); } + catch (ObjectDisposedException ex) + { + if (ex.ObjectName.EqualsIgnoreCase("Npgsql.PoolingDataSource") && _token.IsCancellationRequested) + { + return; + } + + _logger.LogError(ex, "Failed while trying to detect high water statistics for database {Name}", _detector.DatabaseName); + await Task.Delay(_settings.SlowPollingTime, _token).ConfigureAwait(false); + continue; + + } catch (Exception e) { _logger.LogError(e, "Failed while trying to detect high water statistics for database {Name}", _detector.DatabaseName); diff --git a/src/Marten/Events/Daemon/IProjectionDaemon.cs b/src/Marten/Events/Daemon/IProjectionDaemon.cs index dc157a3f37..42a405a800 100644 --- a/src/Marten/Events/Daemon/IProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/IProjectionDaemon.cs @@ -113,7 +113,7 @@ public interface IProjectionDaemon: IDisposable /// Task WaitForNonStaleData(TimeSpan timeout); - long HighWaterMark(); + public long HighWaterMark(); AgentStatus StatusFor(string shardName); /// diff --git a/src/Marten/Events/Daemon/ISubscriptionAgent.cs b/src/Marten/Events/Daemon/ISubscriptionAgent.cs index d815b36660..dbfcb52a6a 100644 --- a/src/Marten/Events/Daemon/ISubscriptionAgent.cs +++ b/src/Marten/Events/Daemon/ISubscriptionAgent.cs @@ -21,6 +21,9 @@ public interface ISubscriptionController /// ErrorHandlingOptions ErrorOptions { get; } + ShardName Name { get; } + AsyncOptions Options { get; } + void MarkSuccess(long processedCeiling); /// @@ -55,7 +58,6 @@ public interface ISubscriptionController // This is public because it's used by the generated code public interface ISubscriptionAgent: ISubscriptionController { - ShardName Name { get; } void MarkHighWater(long sequence); long Position { get; } @@ -75,7 +77,6 @@ public interface ISubscriptionAgent: ISubscriptionController Task RecordDeadLetterEventAsync(DeadLetterEvent @event); DateTimeOffset? PausedTime { get; } - AsyncOptions Options { get; } ISubscriptionMetrics Metrics { get; } Task ReplayAsync(SubscriptionExecutionRequest request, long highWaterMark, TimeSpan timeout); } diff --git a/src/Marten/Events/Daemon/ISubscriptionExecution.cs b/src/Marten/Events/Daemon/ISubscriptionExecution.cs index 4ae57d3458..7e6375499d 100644 --- a/src/Marten/Events/Daemon/ISubscriptionExecution.cs +++ b/src/Marten/Events/Daemon/ISubscriptionExecution.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using Marten.Events.Daemon.Internals; +using Marten.Storage; namespace Marten.Events.Daemon; @@ -14,4 +15,16 @@ public interface ISubscriptionExecution: IAsyncDisposable string DatabaseName { get; } ShardExecutionMode Mode { get; set; } + + bool TryBuildReplayExecutor(out IReplayExecutor executor); +} + +/// +/// Use to create an optimized projection or subscription replay in the case of rewinding all the way +/// back to sequence = 0 (projection rebuilds most likely) +/// +public interface IReplayExecutor +{ + Task StartAsync(SubscriptionExecutionRequest request, + ISubscriptionController controller, CancellationToken cancellation); } diff --git a/src/Marten/Events/Daemon/Internals/EventLoader.cs b/src/Marten/Events/Daemon/Internals/EventLoader.cs index 3c7dfcb3fd..e7e51783d4 100644 --- a/src/Marten/Events/Daemon/Internals/EventLoader.cs +++ b/src/Marten/Events/Daemon/Internals/EventLoader.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using JasperFx.Core; +using Marten.Events.Aggregation.Rebuilds; using Marten.Exceptions; using Marten.Internal.Sessions; using Marten.Services; @@ -82,6 +84,8 @@ public async Task LoadAsync(EventRequest request, var skippedEvents = 0; + var runtime = request.Runtime; + await using var reader = await session.ExecuteReaderAsync(_command, token).ConfigureAwait(false); while (await reader.ReadAsync(token).ConfigureAwait(false)) { @@ -102,7 +106,7 @@ public async Task LoadAsync(EventRequest request, { if (request.ErrorOptions.SkipUnknownEvents) { - request.Runtime.Logger.EventUnknown(e.EventTypeName); + runtime.Logger.EventUnknown(e.EventTypeName); skippedEvents++; } else @@ -115,9 +119,9 @@ public async Task LoadAsync(EventRequest request, { if (request.ErrorOptions.SkipSerializationErrors) { - request.Runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence); - request.Runtime.Logger.EventDeserializationExceptionDebug(e); - await request.Runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false); + runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence); + runtime.Logger.EventDeserializationExceptionDebug(e); + await runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false); skippedEvents++; } else diff --git a/src/Marten/Events/Daemon/Internals/EventPage.cs b/src/Marten/Events/Daemon/Internals/EventPage.cs index 0710213f07..485e3bd19e 100644 --- a/src/Marten/Events/Daemon/Internals/EventPage.cs +++ b/src/Marten/Events/Daemon/Internals/EventPage.cs @@ -13,6 +13,8 @@ public EventPage(long floor) public long Floor { get; } public long Ceiling { get; private set; } + public long HighWaterMark { get; set; } + public void CalculateCeiling(int batchSize, long highWaterMark, int skippedEvents = 0) { Ceiling = (Count + skippedEvents) == batchSize diff --git a/src/Marten/Events/Daemon/Internals/EventRange.cs b/src/Marten/Events/Daemon/Internals/EventRange.cs index ed8b0ffb9c..4cdbb950ff 100644 --- a/src/Marten/Events/Daemon/Internals/EventRange.cs +++ b/src/Marten/Events/Daemon/Internals/EventRange.cs @@ -114,4 +114,12 @@ public void SkipEventSequence(long eventSequence) events.RemoveAll(e => e.Sequence == eventSequence); Events = events; } + + public static EventRange CombineShallow(params EventRange[] ranges) + { + var floor = ranges.Min(x => x.SequenceFloor); + var ceiling = ranges.Max(x => x.SequenceCeiling); + + return new EventRange(ranges[0].ShardName, floor, ceiling); + } } diff --git a/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs b/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs index d2d481ceca..8d909cfa21 100644 --- a/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs +++ b/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -46,6 +47,13 @@ public GroupedProjectionExecution(AsyncProjectionShard shard, DocumentStore stor } public ShardExecutionMode Mode { get; set; } + public bool TryBuildReplayExecutor(out IReplayExecutor executor) + { + if (_store.Events.UseOptimizedProjectionRebuilds && _source.TryBuildReplayExecutor(_store, _database, out executor)) return true; + + executor = default; + return false; + } public string ProjectionShardIdentity { get; } @@ -162,7 +170,7 @@ private async Task applyBatchOperationsToDatabaseAsync(EventRangeGroup group, Do if (Mode == ShardExecutionMode.Continuous) { _logger.LogInformation("Shard '{ProjectionShardIdentity}': Executed updates for {Range}", - ProjectionShardIdentity, batch.Range); + ProjectionShardIdentity, group.Range); } } catch (Exception e) @@ -172,7 +180,7 @@ private async Task applyBatchOperationsToDatabaseAsync(EventRangeGroup group, Do _logger.LogError(e, "Failure in shard '{ProjectionShardIdentity}' trying to execute an update batch for {Range}", ProjectionShardIdentity, - batch.Range); + group.Range); throw; } } @@ -207,8 +215,13 @@ private async Task buildBatchAsync(EventRangeGroup group, ProjectionUpdateBatch batch = default; try { - batch = new ProjectionUpdateBatch(_store.Events, _store.Options.Projections, session, - group.Range, group.Cancellation, group.Agent.Mode); + batch = new ProjectionUpdateBatch(_store.Options.Projections, session, group.Agent.Mode, group.Cancellation) + { + ShouldApplyListeners = group.Agent.Mode == ShardExecutionMode.Continuous && group.Range.Events.Any() + }; + + // Mark the progression + batch.Queue.Post(group.Range.BuildProgressionOperation(_store.Events)); await group.ConfigureUpdateBatch(batch).ConfigureAwait(false); await batch.WaitForCompletion().ConfigureAwait(false); diff --git a/src/Marten/Events/Daemon/Internals/IDeadLetterQueue.cs b/src/Marten/Events/Daemon/Internals/IDeadLetterQueue.cs index fa36cbf27f..097656e269 100644 --- a/src/Marten/Events/Daemon/Internals/IDeadLetterQueue.cs +++ b/src/Marten/Events/Daemon/Internals/IDeadLetterQueue.cs @@ -7,4 +7,5 @@ public interface IDaemonRuntime { Task RecordDeadLetterEventAsync(DeadLetterEvent @event); ILogger Logger { get; } + long HighWaterMark(); } diff --git a/src/Marten/Events/Daemon/Internals/NulloDaemonRuntime.cs b/src/Marten/Events/Daemon/Internals/NulloDaemonRuntime.cs index 8ceb1d8bdc..a6781cd3c0 100644 --- a/src/Marten/Events/Daemon/Internals/NulloDaemonRuntime.cs +++ b/src/Marten/Events/Daemon/Internals/NulloDaemonRuntime.cs @@ -13,4 +13,11 @@ public Task RecordDeadLetterEventAsync(DeadLetterEvent @event) } public ILogger Logger { get; } = NullLogger.Instance; + + public long CurrentHighWaterMark { get; set; } + + public long HighWaterMark() + { + return CurrentHighWaterMark; + } } diff --git a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs index 866149507b..38de552161 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs @@ -35,15 +35,11 @@ private IMartenSession Session public ShardExecutionMode Mode { get; } - public bool IsDisposed() - { - return _session == null; - } + public bool ShouldApplyListeners { get; set; } - internal ProjectionUpdateBatch(EventGraph events, DaemonSettings settings, - DocumentSessionBase? session, EventRange range, CancellationToken token, ShardExecutionMode mode) + internal ProjectionUpdateBatch(DaemonSettings settings, + DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token) { - Range = range; _settings = settings; _session = session ?? throw new ArgumentNullException(nameof(session)); _token = token; @@ -55,13 +51,8 @@ internal ProjectionUpdateBatch(EventGraph events, DaemonSettings settings, }); startNewPage(session); - - var progressOperation = range.BuildProgressionOperation(events); - Queue.Post(progressOperation); } - public EventRange Range { get; } - public Task WaitForCompletion() { Queue.Complete(); @@ -216,7 +207,7 @@ public IReadOnlyList DocumentTypes() public async Task PostUpdateAsync(IMartenSession session) { - if (shouldApplyListeners()) + if (!ShouldApplyListeners) { return; } @@ -232,14 +223,9 @@ await listener.AfterCommitAsync((IDocumentSession)session, unitOfWorkData, _toke } } - private bool shouldApplyListeners() - { - return Mode == ShardExecutionMode.Rebuild || !Range.Events.Any(); - } - public async Task PreUpdateAsync(IMartenSession session) { - if (shouldApplyListeners()) + if (!ShouldApplyListeners) { return; } diff --git a/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs b/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs index 2bf0597a71..e995ce36f9 100644 --- a/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs +++ b/src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs @@ -146,6 +146,7 @@ public async Task StartAsync(SubscriptionExecutionRequest request) ErrorOptions = request.ErrorHandling; _runtime = request.Runtime; await _execution.EnsureStorageExists().ConfigureAwait(false); + _commandBlock.Post(Command.Started(_tracker.HighWaterMark, request.Floor)); _tracker.Publish(new ShardState(Name, request.Floor){Action = ShardAction.Started}); @@ -166,10 +167,20 @@ public async Task ReplayAsync(SubscriptionExecutionRequest request, long highWat try { await _execution.EnsureStorageExists().ConfigureAwait(false); - _tracker.Publish(new ShardState(Name, request.Floor) { Action = ShardAction.Started }); - _commandBlock.Post(Command.Started(highWaterMark, request.Floor)); - await _rebuild.Task.TimeoutAfterAsync((int)timeout.TotalMilliseconds).ConfigureAwait(false); + if (_execution.TryBuildReplayExecutor(out var executor)) + { + _logger.LogInformation("Starting optimized rebuild for projection/subscription {ShardName}", Name.Identity); + var cancellationSource = new CancellationTokenSource(timeout); + await executor.StartAsync(request, this, cancellationSource.Token).ConfigureAwait(false); + } + else + { + _tracker.Publish(new ShardState(Name, request.Floor) { Action = ShardAction.Started }); + _commandBlock.Post(Command.Started(highWaterMark, request.Floor)); + + await _rebuild.Task.TimeoutAfterAsync((int)timeout.TotalMilliseconds).ConfigureAwait(false); + } } catch (Exception e) { @@ -235,6 +246,21 @@ internal async Task Apply(Command command) HighWaterMark = command.HighWaterMark; LastCommitted = LastEnqueued = command.LastCommitted; + if (LastCommitted == 0 && HighWaterMark > 0 && _execution.TryBuildReplayExecutor(out var executor)) + { + try + { + _logger.LogInformation("Starting optimized rebuild for projection/subscription {ShardName}", Name.Identity); + await executor.StartAsync(new SubscriptionExecutionRequest(0, ShardExecutionMode.CatchUp, ErrorOptions, _runtime), this, _cancellation.Token).ConfigureAwait(false); + _logger.LogInformation("Finished with optimized rebuild for projection/subscription {ShardName}, proceeding to normal, continuous operation", Name.Identity); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to perform an optimized rebuild/replay of subscription {ShardName}", Name.Identity); + } + } + + break; case CommandType.RangeCompleted: @@ -300,6 +326,10 @@ private async Task loadNextAsync() { var page = await _loader.LoadAsync(request, _cancellation.Token).ConfigureAwait(false); + // Passing this along helps the individual executions "know" when to switch from + // continuous mode to "catch up" and vice versa + page.HighWaterMark = HighWaterMark; + if (_logger.IsEnabled(LogLevel.Debug) && Mode == ShardExecutionMode.Continuous) { _logger.LogDebug("Loaded {Number} of Events from {Floor} to {Ceiling} for Subscription {Name}", page.Count, page.Floor, page.Ceiling, ProjectionShardIdentity); diff --git a/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs b/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs index b523182525..7b4f135753 100644 --- a/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs +++ b/src/Marten/Events/Daemon/Progress/ProjectionProgressStatement.cs @@ -17,7 +17,16 @@ public ProjectionProgressStatement(EventGraph events) protected override void configure(ICommandBuilder builder) { - builder.Append($"select name, last_seq_id from {_events.DatabaseSchemaName}.mt_event_progression"); + if (_events.UseOptimizedProjectionRebuilds) + { + builder.Append($"select name, last_seq_id, mode, rebuild_threshold, assigned_node from {_events.DatabaseSchemaName}.mt_event_progression"); + } + else + { + builder.Append($"select name, last_seq_id from {_events.DatabaseSchemaName}.mt_event_progression"); + } + + if (Name != null) { builder.Append(" where name = "); diff --git a/src/Marten/Events/Daemon/Progress/ShardStateSelector.cs b/src/Marten/Events/Daemon/Progress/ShardStateSelector.cs index 0ff11c66fa..e98a5e7ae2 100644 --- a/src/Marten/Events/Daemon/Progress/ShardStateSelector.cs +++ b/src/Marten/Events/Daemon/Progress/ShardStateSelector.cs @@ -1,4 +1,5 @@ -using System.Data.Common; +using System; +using System.Data.Common; using System.Threading; using System.Threading.Tasks; using Marten.Linq.Selectors; @@ -7,10 +8,18 @@ namespace Marten.Events.Daemon.Progress; internal class ShardStateSelector: ISelector { + private readonly EventGraph _events; + + public ShardStateSelector(EventGraph events) + { + _events = events; + } + public ShardState Resolve(DbDataReader reader) { var name = reader.GetFieldValue(0); var sequence = reader.GetFieldValue(1); + return new ShardState(name, sequence); } @@ -18,6 +27,20 @@ public async Task ResolveAsync(DbDataReader reader, CancellationToke { var name = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); var sequence = await reader.GetFieldValueAsync(1, token).ConfigureAwait(false); - return new ShardState(name, sequence); + var state = new ShardState(name, sequence); + + if (_events.UseOptimizedProjectionRebuilds) + { + var modeString = await reader.GetFieldValueAsync(2, token).ConfigureAwait(false); + if (Enum.TryParse(modeString, out var mode)) + { + state.Mode = mode; + } + + state.RebuildThreshold = await reader.GetFieldValueAsync(3, token).ConfigureAwait(false); + state.AssignedNodeNumber = await reader.GetFieldValueAsync(4, token).ConfigureAwait(false); + } + + return state; } } diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs b/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs index a092602a08..36a74fe666 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs @@ -116,7 +116,6 @@ private async Task rebuildProjection(IProjectionSource source, TimeSpan shardTim // Teardown the current state await teardownExistingProjectionProgress(source, token, agents).ConfigureAwait(false); - if (token.IsCancellationRequested) { return; diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.cs b/src/Marten/Events/Daemon/ProjectionDaemon.cs index 007ef84efe..bd9f831e5c 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.cs @@ -97,22 +97,30 @@ private async Task tryStartAgentAsync(ISubscriptionAgent agent, ShardExecu var highWaterMark = HighWaterMark(); var position = await agent .Options - .DetermineStartingPositionAsync(highWaterMark, agent.Name, mode, Database, _cancellation.Token).ConfigureAwait(false); + .DetermineStartingPositionAsync(highWaterMark, agent.Name, mode, Database, _cancellation.Token) + .ConfigureAwait(false); if (position.ShouldUpdateProgressFirst) { - await rewindAgentProgress(agent.Name.Identity, _cancellation.Token, position.Floor).ConfigureAwait(false); + await rewindAgentProgress(agent.Name.Identity, _cancellation.Token, position.Floor) + .ConfigureAwait(false); } var errorOptions = mode == ShardExecutionMode.Continuous ? _store.Options.Projections.Errors : _store.Options.Projections.RebuildErrors; - await agent.StartAsync(new SubscriptionExecutionRequest(position.Floor, mode, errorOptions, this)).ConfigureAwait(false); + await agent.StartAsync(new SubscriptionExecutionRequest(position.Floor, mode, errorOptions, this)) + .ConfigureAwait(false); agent.MarkHighWater(highWaterMark); _agents = _agents.AddOrUpdate(agent.Name.Identity, agent); } + catch (Exception ex) + { + Logger.LogError(ex, "Error trying to start agent {ShardName}", agent.Name.Identity); + return false; + } finally { _semaphore.Release(); diff --git a/src/Marten/Events/Daemon/ShardExecutionMode.cs b/src/Marten/Events/Daemon/ShardExecutionMode.cs index ff8adbb35a..87e92c7b9e 100644 --- a/src/Marten/Events/Daemon/ShardExecutionMode.cs +++ b/src/Marten/Events/Daemon/ShardExecutionMode.cs @@ -3,5 +3,6 @@ namespace Marten.Events.Daemon; public enum ShardExecutionMode { Continuous, - Rebuild + Rebuild, + CatchUp } diff --git a/src/Marten/Events/Daemon/ShardState.cs b/src/Marten/Events/Daemon/ShardState.cs index 012549bfb1..1897c2921c 100644 --- a/src/Marten/Events/Daemon/ShardState.cs +++ b/src/Marten/Events/Daemon/ShardState.cs @@ -2,6 +2,13 @@ namespace Marten.Events.Daemon; +public enum ShardMode +{ + none, + continuous, + rebuilding +} + /// /// Point in time state of a single projection shard or the high water mark /// @@ -31,6 +38,12 @@ internal ShardState(AsyncProjectionShard shard, ShardAction action): this(shard. Action = action; } + public long RebuildThreshold { get; set; } + + public ShardMode Mode { get; set; } = ShardMode.continuous; + + public int AssignedNodeNumber { get; set; } = 0; + public ShardAction Action { get; set; } = ShardAction.Updated; /// diff --git a/src/Marten/Events/EventDocumentStorage.cs b/src/Marten/Events/EventDocumentStorage.cs index 971d084eb8..233defbb9a 100644 --- a/src/Marten/Events/EventDocumentStorage.cs +++ b/src/Marten/Events/EventDocumentStorage.cs @@ -109,6 +109,16 @@ public IDeletion HardDeleteForDocument(IEvent document, string tenantId) throw new NotSupportedException(); } + public void SetIdentityFromString(IEvent document, string identityString) + { + throw new NotImplementedException(); + } + + public void SetIdentityFromGuid(IEvent document, Guid identityGuid) + { + throw new NotImplementedException(); + } + public string FromObject { get; } public Type SelectedType => typeof(IEvent); diff --git a/src/Marten/Events/EventGraph.FeatureSchema.cs b/src/Marten/Events/EventGraph.FeatureSchema.cs index de4128aea8..8c85e78bef 100644 --- a/src/Marten/Events/EventGraph.FeatureSchema.cs +++ b/src/Marten/Events/EventGraph.FeatureSchema.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using Marten.Events.Aggregation.Rebuilds; using Marten.Events.Archiving; using Marten.Events.Daemon; using Marten.Events.Projections; @@ -53,13 +54,18 @@ private IEnumerable createAllSchemaObjects() yield return sequence; - yield return new EventProgressionTable(DatabaseSchemaName); + yield return new EventProgressionTable(this); yield return new SystemFunction(DatabaseSchemaName, "mt_mark_event_progression", "varchar, bigint"); yield return new ArchiveStreamFunction(this); yield return new QuickAppendEventFunction(this); + if (UseOptimizedProjectionRebuilds) + { + yield return new AggregateRebuildTable(this); + } + foreach (var schemaSource in Options.Projections.All.OfType()) { diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index a925e807a0..f49482b874 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -66,6 +66,13 @@ internal EventGraph(StoreOptions options) _aggregateTypeByName = new Cache(findAggregateType); } + /// + /// Opt into some performance optimizations for projection rebuilds for both single stream and + /// multi-stream projections. This will result in new table columns and a potential database + /// migration. This will be a default in Marten 8. + /// + public bool UseOptimizedProjectionRebuilds { get; set; } + /// /// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits /// *if* you are also aggressively using event stream archiving diff --git a/src/Marten/Events/EventMapping.cs b/src/Marten/Events/EventMapping.cs index 587265c386..8262ebb137 100644 --- a/src/Marten/Events/EventMapping.cs +++ b/src/Marten/Events/EventMapping.cs @@ -323,6 +323,16 @@ public IDeletion HardDeleteForDocument(T document, string tenantId) throw new NotSupportedException(); } + public void SetIdentityFromString(T document, string identityString) + { + throw new NotImplementedException(); + } + + public void SetIdentityFromGuid(T document, Guid identityGuid) + { + throw new NotImplementedException(); + } + public override IEvent Wrap(object data) { return new Event((T)data) { EventTypeName = EventTypeName, DotNetTypeName = DotNetTypeName }; diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index c845442239..a043ec98d1 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -73,6 +73,13 @@ public interface IEventStoreOptions /// public IMessageOutbox MessageOutbox { get; set; } + /// + /// Opt into some performance optimizations for projection rebuilds for both single stream and + /// multi-stream projections. This will result in new table columns and a potential database + /// migration. This will be a default in Marten 8. + /// + public bool UseOptimizedProjectionRebuilds { get; set; } + /// /// Register an event type with Marten. This isn't strictly necessary for normal usage, /// but can help Marten with asynchronous projections where Marten hasn't yet encountered diff --git a/src/Marten/Events/IReadOnlyEventStoreOptions.cs b/src/Marten/Events/IReadOnlyEventStoreOptions.cs index cfc358f001..00436bcd87 100644 --- a/src/Marten/Events/IReadOnlyEventStoreOptions.cs +++ b/src/Marten/Events/IReadOnlyEventStoreOptions.cs @@ -69,4 +69,11 @@ public interface IReadOnlyEventStoreOptions /// aggregation projections /// IMessageOutbox MessageOutbox { get; set; } + + /// + /// Opt into some performance optimizations for projection rebuilds for both single stream and + /// multi-stream projections. This will result in new table columns and a potential database + /// migration. This will be a default in Marten 8. + /// + bool UseOptimizedProjectionRebuilds { get; set; } } diff --git a/src/Marten/Events/Projections/EventSlice.cs b/src/Marten/Events/Projections/EventSlice.cs index 771f9dd9bd..733e5b24b0 100644 --- a/src/Marten/Events/Projections/EventSlice.cs +++ b/src/Marten/Events/Projections/EventSlice.cs @@ -52,7 +52,7 @@ public EventSlice(TId id, Tenant tenant, IEnumerable? events = null) } public EventSlice(TId id, IQuerySession querySession, IEnumerable? events = null): this(id, - new Tenant(Tenancy.DefaultTenantId, querySession.Database), events) + new Tenant(querySession.TenantId, querySession.Database), events) { } diff --git a/src/Marten/Events/Projections/GeneratedProjection.cs b/src/Marten/Events/Projections/GeneratedProjection.cs index 91013431b4..20256dc2c7 100644 --- a/src/Marten/Events/Projections/GeneratedProjection.cs +++ b/src/Marten/Events/Projections/GeneratedProjection.cs @@ -7,6 +7,7 @@ using JasperFx.CodeGeneration; using JasperFx.Core.Reflection; using JasperFx.RuntimeCompiler; +using Marten.Events.Aggregation; using Marten.Events.Daemon; using Marten.Events.Daemon.Internals; using Marten.Storage; @@ -66,6 +67,20 @@ IProjection IProjectionSource.Build(DocumentStore store) return buildProjectionObject(store); } + public bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor) + { + generateIfNecessary(store); + + var projection = buildProjectionObject(store); + if (projection is IAggregationRuntime runtime) + { + return runtime.TryBuildReplayExecutor(store, database, out executor); + } + + executor = default; + return false; + } + IReadOnlyList IProjectionSource.AsyncProjectionShards(DocumentStore store) { diff --git a/src/Marten/Events/Projections/IProjectionSource.cs b/src/Marten/Events/Projections/IProjectionSource.cs index aa734c612d..ee3aa4c83b 100644 --- a/src/Marten/Events/Projections/IProjectionSource.cs +++ b/src/Marten/Events/Projections/IProjectionSource.cs @@ -38,6 +38,8 @@ ValueTask GroupEvents(DocumentStore store, IMartenDatabase daem /// into Marten's parallel blue/green deployment of this projection. /// public uint ProjectionVersion { get; set; } + + bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor); } /// @@ -48,3 +50,5 @@ public interface IProjectionSchemaSource { IEnumerable CreateSchemaObjects(EventGraph events); } + + diff --git a/src/Marten/Events/Projections/ProjectionWrapper.cs b/src/Marten/Events/Projections/ProjectionWrapper.cs index da0450ee83..8cd473b4cd 100644 --- a/src/Marten/Events/Projections/ProjectionWrapper.cs +++ b/src/Marten/Events/Projections/ProjectionWrapper.cs @@ -70,4 +70,10 @@ public ValueTask GroupEvents(DocumentStore store, IMartenDataba /// into Marten's parallel blue/green deployment of this projection. /// public uint ProjectionVersion { get; set; } = 1; + + public bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor) + { + executor = default; + return false; + } } diff --git a/src/Marten/Events/Projections/ScopedProjectionWrapper.cs b/src/Marten/Events/Projections/ScopedProjectionWrapper.cs index aee2ad7ecd..4780973267 100644 --- a/src/Marten/Events/Projections/ScopedProjectionWrapper.cs +++ b/src/Marten/Events/Projections/ScopedProjectionWrapper.cs @@ -159,4 +159,11 @@ public IProjection Build(DocumentStore store) } public uint ProjectionVersion { get; set; } + + public bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor) + { + // TODO -- this might still be possible + executor = default; + return false; + } } diff --git a/src/Marten/Events/QueryEventStore.cs b/src/Marten/Events/QueryEventStore.cs index 69c0f65771..a936555094 100644 --- a/src/Marten/Events/QueryEventStore.cs +++ b/src/Marten/Events/QueryEventStore.cs @@ -126,10 +126,7 @@ public async Task> FetchStreamAsync(string streamKey, long var aggregate = aggregator.Build(events, _session, state); var storage = _session.StorageFor(); - if (storage is IDocumentStorage s) - { - s.SetIdentity(aggregate, streamId); - } + storage.SetIdentityFromGuid(aggregate, streamId); return aggregate; } @@ -152,10 +149,7 @@ public async Task> FetchStreamAsync(string streamKey, long } var storage = _session.StorageFor(); - if (storage is IDocumentStorage s) - { - s.SetIdentity(aggregate, streamId); - } + storage.SetIdentityFromGuid(aggregate, streamId); return aggregate; } @@ -173,10 +167,7 @@ public async Task> FetchStreamAsync(string streamKey, long var aggregate = aggregator.Build(events, _session, state); var storage = _session.StorageFor(); - if (storage is IDocumentStorage s) - { - s.SetIdentity(aggregate, streamKey); - } + storage.SetIdentityFromString(aggregate, streamKey); return aggregate; } @@ -195,10 +186,7 @@ public async Task> FetchStreamAsync(string streamKey, long var aggregate = await aggregator.BuildAsync(events, _session, state, token).ConfigureAwait(false); var storage = _session.StorageFor(); - if (storage is IDocumentStorage s) - { - s.SetIdentity(aggregate, streamKey); - } + storage.SetIdentityFromString(aggregate, streamKey); return aggregate; } diff --git a/src/Marten/Events/Schema/EventProgressionTable.cs b/src/Marten/Events/Schema/EventProgressionTable.cs index fc5694dbd8..3b2094df13 100644 --- a/src/Marten/Events/Schema/EventProgressionTable.cs +++ b/src/Marten/Events/Schema/EventProgressionTable.cs @@ -1,4 +1,4 @@ -using Weasel.Core; +using Marten.Events.Daemon; using Weasel.Postgresql; using Weasel.Postgresql.Tables; @@ -6,13 +6,22 @@ namespace Marten.Events.Schema; internal class EventProgressionTable: Table { - public EventProgressionTable(string schemaName): base(new PostgresqlObjectName(schemaName, "mt_event_progression")) + public const string Name = "mt_event_progression"; + + public EventProgressionTable(EventGraph eventGraph): base(new PostgresqlObjectName(eventGraph.DatabaseSchemaName, Name)) { AddColumn("name").AsPrimaryKey(); AddColumn("last_seq_id", "bigint").AllowNulls(); AddColumn("last_updated", "timestamp with time zone") .DefaultValueByExpression("(transaction_timestamp())"); + if (eventGraph.UseOptimizedProjectionRebuilds) + { + AddColumn("mode").DefaultValueByString(ShardMode.none.ToString()); + AddColumn("rebuild_threshold").DefaultValueByExpression("0"); + AddColumn("assigned_node").DefaultValueByExpression("0"); + } + PrimaryKeyName = "pk_mt_event_progression"; } } diff --git a/src/Marten/Internal/Storage/DocumentStorage.cs b/src/Marten/Internal/Storage/DocumentStorage.cs index a4d7f22659..674bffc48a 100644 --- a/src/Marten/Internal/Storage/DocumentStorage.cs +++ b/src/Marten/Internal/Storage/DocumentStorage.cs @@ -18,6 +18,7 @@ using Marten.Linq.SqlGeneration; using Marten.Linq.SqlGeneration.Filters; using Marten.Schema; +using Marten.Schema.Identity; using Marten.Services; using Marten.Storage; using Marten.Storage.Metadata; @@ -46,6 +47,10 @@ public abstract class DocumentStorage: IDocumentStorage, IHaveMe private readonly string[] _selectFields; private ISqlFragment? _defaultWhere; protected Action _setter; + protected Action _setFromString = (_, _) => throw new NotSupportedException(); + protected Action _setFromGuid = (_, _) => throw new NotSupportedException(); + + private readonly DocumentMapping _document; public DocumentStorage(StorageStyle storageStyle, DocumentMapping document) @@ -84,6 +89,27 @@ public DocumentStorage(StorageStyle storageStyle, DocumentMapping document) UseNumericRevisions = document.UseNumericRevisions; _setter = LambdaBuilder.Setter(document.IdMember)!; + if (typeof(TId) == typeof(Guid)) + { + _setFromGuid = _setter.As>(); + } + else if (typeof(TId) == typeof(string)) + { + _setFromString = _setter.As>(); + } + else if (document.IdStrategy is ValueTypeIdGeneration valueType) + { + if (valueType.SimpleType == typeof(Guid)) + { + var converter = valueType.CreateConverter(); + _setFromGuid = (doc, guid) => _setter(doc, converter(guid)); + } + else if (valueType.SimpleType == typeof(string)) + { + var converter = valueType.CreateConverter(); + _setFromString = (doc, s) => _setter(doc, converter(s)); + } + } DeleteFragment = _mapping.DeleteStyle == DeleteStyle.Remove ? new HardDelete(this) @@ -164,6 +190,16 @@ public void SetIdentity(T document, TId identity) _setter(document, identity); } + public void SetIdentityFromString(T document, string identityString) + { + _setFromString(document, identityString); + } + + public void SetIdentityFromGuid(T document, Guid identityGuid) + { + _setFromGuid(document, identityGuid); + } + public TenancyStyle TenancyStyle => _mapping.TenancyStyle; diff --git a/src/Marten/Internal/Storage/IDocumentStorage.cs b/src/Marten/Internal/Storage/IDocumentStorage.cs index 052ad9ee39..d66b0b03c9 100644 --- a/src/Marten/Internal/Storage/IDocumentStorage.cs +++ b/src/Marten/Internal/Storage/IDocumentStorage.cs @@ -112,6 +112,8 @@ public interface IDocumentStorage: IDocumentStorage where T : notnull void RemoveDirtyTracker(IMartenSession session, object id); IDeletion HardDeleteForDocument(T document, string tenantId); + void SetIdentityFromString(T document, string identityString); + void SetIdentityFromGuid(T document, Guid identityGuid); } public interface IDocumentStorage: IDocumentStorage where T : notnull where TId : notnull diff --git a/src/Marten/Internal/Storage/SubClassDocumentStorage.cs b/src/Marten/Internal/Storage/SubClassDocumentStorage.cs index 3709ffea75..605ecf1039 100644 --- a/src/Marten/Internal/Storage/SubClassDocumentStorage.cs +++ b/src/Marten/Internal/Storage/SubClassDocumentStorage.cs @@ -272,6 +272,16 @@ public IDeletion HardDeleteForDocument(T document, string tenantId) return _parent.HardDeleteForDocument(document, tenantId); } + public void SetIdentityFromString(T document, string identityString) + { + _parent.SetIdentityFromString(document, identityString); + } + + public void SetIdentityFromGuid(T document, Guid identityGuid) + { + _parent.SetIdentityFromGuid(document, identityGuid); + } + private IEnumerable extraFilters(ISqlFragment query, IMartenSession session) { yield return toBasicWhere(); diff --git a/src/Marten/Internal/ValueTypeInfo.cs b/src/Marten/Internal/ValueTypeInfo.cs index b702ba6586..cb3a5219f3 100644 --- a/src/Marten/Internal/ValueTypeInfo.cs +++ b/src/Marten/Internal/ValueTypeInfo.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Immutable; using System.Linq.Expressions; using System.Reflection; using FastExpressionCompiler; using JasperFx.Core.Reflection; +using Marten.Events; using Marten.Linq.Members.ValueCollections; namespace Marten.Internal; @@ -13,6 +15,7 @@ namespace Marten.Internal; /// public class ValueTypeInfo { + private object _converter; public Type OuterType { get; } public Type SimpleType { get; } public PropertyInfo ValueProperty { get; } @@ -37,6 +40,11 @@ public ValueTypeInfo(Type outerType, Type simpleType, PropertyInfo valueProperty public Func CreateConverter() { + if (_converter != null) + { + return (Func)_converter; + } + var inner = Expression.Parameter(typeof(TInner), "inner"); Expression builder; if (Builder != null) @@ -55,7 +63,8 @@ public Func CreateConverter() var lambda = Expression.Lambda>(builder, inner); - return lambda.CompileFast(); + _converter = lambda.CompileFast(); + return (Func)_converter; } public Func ValueAccessor() @@ -65,6 +74,34 @@ public Func ValueAccessor() var lambda = Expression.Lambda>(Expression.Call(outer, getter), outer); return lambda.CompileFast(); } + + public Func CreateAggregateIdentitySource() where TId : notnull + { + var e = Expression.Parameter(typeof(IEvent), "e"); + var eMember = SimpleType == typeof(Guid) + ? ReflectionHelper.GetProperty(x => x.StreamId) + : ReflectionHelper.GetProperty(x => x.StreamKey); + + var raw = Expression.Call(e, eMember.GetMethod); + Expression wrapped = null; + if (Builder != null) + { + wrapped = Expression.Call(null, Builder, raw); + } + else if (Ctor != null) + { + wrapped = Expression.New(Ctor, raw); + } + else + { + throw new NotSupportedException("Marten cannot build a type converter for strong typed id type " + + OuterType.FullNameInCode()); + } + + var lambda = Expression.Lambda>(wrapped, e); + + return lambda.CompileFast(); + } } internal class ValueTypeElementMember: ElementMember diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs index 07833442c8..0c6ff0eb45 100644 --- a/src/Marten/Storage/MartenDatabase.EventStorage.cs +++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs @@ -106,7 +106,7 @@ public async Task> AllProjectionProgress( var handler = (IQueryHandler>)new ListQueryHandler( new ProjectionProgressStatement(Options.EventGraph), - new ShardStateSelector()); + new ShardStateSelector(Options.EventGraph)); await using var conn = CreateConnection(); await conn.OpenAsync(token).ConfigureAwait(false); @@ -135,7 +135,7 @@ public async Task ProjectionProgressFor(ShardName name, var statement = new ProjectionProgressStatement(Options.EventGraph) { Name = name }; var handler = new OneResultHandler(statement, - new ShardStateSelector(), true, false); + new ShardStateSelector(Options.EventGraph), true, false); await using var conn = CreateConnection(); await conn.OpenAsync(token).ConfigureAwait(false); diff --git a/src/Marten/Storage/Tenant2.cs b/src/Marten/Storage/Tenant2.cs index 7cbf8d320a..e855277d1a 100644 --- a/src/Marten/Storage/Tenant2.cs +++ b/src/Marten/Storage/Tenant2.cs @@ -1,3 +1,5 @@ +using System; + namespace Marten.Storage; public class Tenant @@ -16,4 +18,34 @@ public static Tenant ForDatabase(IMartenDatabase database) { return new Tenant(Tenancy.DefaultTenantId, database); } + + protected bool Equals(Tenant other) + { + return TenantId == other.TenantId && Equals(Database, other.Database); + } + + public override bool Equals(object obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((Tenant)obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(TenantId, Database.Identifier); + } } diff --git a/src/Marten/StoreOptions.Identity.cs b/src/Marten/StoreOptions.Identity.cs index 025c4d71e1..5e5efc1bbc 100644 --- a/src/Marten/StoreOptions.Identity.cs +++ b/src/Marten/StoreOptions.Identity.cs @@ -68,6 +68,12 @@ private bool idMemberIsSettable(MemberInfo idMember) return ValueTypes.FirstOrDefault(x => x.OuterType == idType); } + internal ValueTypeInfo FindOrCreateValueType(Type idType) + { + var valueType = ValueTypes.FirstOrDefault(x => x.OuterType == idType); + return valueType ?? RegisterValueType(idType); + } + /// /// Register a custom value type with Marten. Doing this enables Marten /// to use this type correctly within LINQ expressions. The "value type" diff --git a/src/Marten/Subscriptions/SubscriptionExecution.cs b/src/Marten/Subscriptions/SubscriptionExecution.cs index d399c07100..7a05250689 100644 --- a/src/Marten/Subscriptions/SubscriptionExecution.cs +++ b/src/Marten/Subscriptions/SubscriptionExecution.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -49,8 +50,12 @@ private async Task executeRange(EventRange range) { await using var parent = (DocumentSessionBase)_store.OpenSession(SessionOptions.ForDatabase(_database)); - var batch = new ProjectionUpdateBatch(_store.Events, _store.Options.Projections, parent, - range, _cancellation.Token, Mode); + var batch = new ProjectionUpdateBatch(_store.Options.Projections, parent, Mode, _cancellation.Token) { + ShouldApplyListeners = Mode == ShardExecutionMode.Continuous && range.Events.Any() + };; + + // Mark the progression + batch.Queue.Post(range.BuildProgressionOperation(_store.Events)); await using var session = new ProjectionDocumentSession(_store, batch, new SessionOptions @@ -75,7 +80,7 @@ private async Task executeRange(EventRange range) if (Mode == ShardExecutionMode.Continuous) { _logger.LogInformation("Subscription '{ShardIdentity}': Executed for {Range}", - ShardIdentity, batch.Range); + ShardIdentity, range); } range.Agent.Metrics.UpdateProcessed(range.Size); @@ -144,4 +149,9 @@ public Task EnsureStorageExists() public string DatabaseName => _database.Identifier; public ShardExecutionMode Mode { get; set; } = ShardExecutionMode.Continuous; + public bool TryBuildReplayExecutor(out IReplayExecutor executor) + { + executor = default; + return false; + } } diff --git a/src/Marten/Util/RecentlyUsedCache.cs b/src/Marten/Util/RecentlyUsedCache.cs new file mode 100644 index 0000000000..44fb3f72dc --- /dev/null +++ b/src/Marten/Util/RecentlyUsedCache.cs @@ -0,0 +1,82 @@ +using System; +using System.Linq; +using JasperFx.Core; + +namespace Marten.Util; + + +// TODO -- move to JasperFx +public interface IAggregateCache +{ + bool TryFind(TKey key, out TItem item); + void Store(TKey key, TItem item); + void CompactIfNecessary(); +} + +public class NulloAggregateCache : IAggregateCache +{ + public bool TryFind(TKey key, out TItem item) + { + item = default; + return false; + } + + public void Store(TKey key, TItem item) + { + // nothing + } + + public void CompactIfNecessary() + { + // nothing + } +} + +public class RecentlyUsedCache: IAggregateCache +{ + private ImHashMap _items = ImHashMap.Empty; + private ImHashMap _times = ImHashMap.Empty; + + public int Limit = 100; + + public int Count => _items.Count(); + + public bool TryFind(TKey key, out TItem item) + { + if (_items.TryFind(key, out item)) + { + _times = _times.AddOrUpdate(key, DateTimeOffset.UtcNow); + return true; + } + + item = default; + return false; + } + + public void Store(TKey key, TItem item) + { + _items = _items.AddOrUpdate(key, item); + _times = _times.AddOrUpdate(key, DateTimeOffset.UtcNow); + } + + public void CompactIfNecessary() + { + var extraCount = _items.Count() - Limit; + if (extraCount <= 0) return; + + var toRemove = _times + .Enumerate() + .OrderBy(x => x.Value) + .Select(x => x.Key) + .Take(extraCount) + .ToArray(); + + foreach (var key in toRemove) + { + _items = _items.Remove(key); + _items = _items.Remove(key); + } + } + + +} From 4aeac6bdab59f044f12aec7dbf8d84f3b8e057b1 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 8 Oct 2024 17:34:30 -0500 Subject: [PATCH 2/5] Upgraded Weasel, fixed a couple bugs related to externally managed partitioning --- src/Marten/Marten.csproj | 2 +- src/Marten/Storage/DocumentTable.cs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 47b3b47107..421a07d41f 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -48,7 +48,7 @@ - + diff --git a/src/Marten/Storage/DocumentTable.cs b/src/Marten/Storage/DocumentTable.cs index 923b83ca49..720ea8e792 100644 --- a/src/Marten/Storage/DocumentTable.cs +++ b/src/Marten/Storage/DocumentTable.cs @@ -81,6 +81,8 @@ public DocumentTable(DocumentMapping mapping): base(mapping.TableName) Indexes.AddRange(mapping.Indexes); ForeignKeys.AddRange(mapping.ForeignKeys); + Partitioning = mapping.Partitioning; + if (mapping.Partitioning != null && !mapping.IgnorePartitions) { if (mapping.Partitioning.Columns.All(HasColumn)) From 01529aaa8b13be6fe67beba17a0eba74c5d6dc74 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 8 Oct 2024 17:37:00 -0500 Subject: [PATCH 3/5] Fixed a couple of stress tests --- src/StressTests/projections_with_IoC_services.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/StressTests/projections_with_IoC_services.cs b/src/StressTests/projections_with_IoC_services.cs index 0cd176de46..dc65bc2d62 100644 --- a/src/StressTests/projections_with_IoC_services.cs +++ b/src/StressTests/projections_with_IoC_services.cs @@ -314,7 +314,7 @@ public async Task use_projection_as_scoped_and_inline_on_martenStore() services.AddMartenStore(opts => { opts.Connection(ConnectionSource.ConnectionString); - opts.DatabaseSchemaName = "ioc"; + opts.DatabaseSchemaName = "ioc3"; opts.ApplyChangesLockId = opts.ApplyChangesLockId + 9; }).AddProjectionWithServices(ProjectionLifecycle.Inline, ServiceLifetime.Scoped, "MyProjection") .ApplyAllDatabaseChangesOnStartup(); @@ -345,7 +345,7 @@ public async Task get_async_shards_with_custom_name_on_martenStore() services.AddMartenStore(opts => { opts.Connection(ConnectionSource.ConnectionString); - opts.DatabaseSchemaName = "ioc"; + opts.DatabaseSchemaName = "ioc2"; opts.ApplyChangesLockId = opts.ApplyChangesLockId + 10; }).AddProjectionWithServices(ProjectionLifecycle.Async, ServiceLifetime.Scoped, "MyProjection") .ApplyAllDatabaseChangesOnStartup(); From c1677dffeebc58d233203f297ad6cd16541bdac2 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 8 Oct 2024 17:42:59 -0500 Subject: [PATCH 4/5] Fixed Daemon start_subscription_at_sequence_floor test Closes GH-3454 --- src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs b/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs index fb16547268..daaaaa3c82 100644 --- a/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs +++ b/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs @@ -62,6 +62,9 @@ public async Task run_events_through() [Fact] public async Task start_subscription_at_sequence_floor() { + // Just need it reset to nothing + StoreOptions(opts => { }); + using var daemon = await theStore.BuildProjectionDaemonAsync(); await daemon.StartAllAsync(); From 6292d3f8af5095235cada59b676c12fac0f2e579 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 8 Oct 2024 18:20:03 -0500 Subject: [PATCH 5/5] Fixed Daemon EventProjection_follow_up_operations test Closes GH-3453 --- .../EventProjection_follow_up_operations.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DaemonTests/DocumentTrackingByIdentity/EventProjection_follow_up_operations.cs b/src/DaemonTests/DocumentTrackingByIdentity/EventProjection_follow_up_operations.cs index 12a56fb169..29fb750544 100644 --- a/src/DaemonTests/DocumentTrackingByIdentity/EventProjection_follow_up_operations.cs +++ b/src/DaemonTests/DocumentTrackingByIdentity/EventProjection_follow_up_operations.cs @@ -34,7 +34,7 @@ public async Task rebuild_with_follow_up_operations_should_work() { Guid.NewGuid(), nestedEntity }, { Guid.NewGuid(), nestedEntity } })); - session.Events.Append(Guid.NewGuid(), new SomeOtherEntityWithNestedIdentifierPublished(guid)); + session.Events.Append(guid, new SomeOtherEntityWithNestedIdentifierPublished(guid)); await session.SaveChangesAsync();