diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 6c4093a2c9..450461b6f5 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -164,6 +164,8 @@ const config: UserConfig = { { text: 'Querying Events', link: '/events/querying' }, { text: 'Metadata', link: '/events/metadata' }, { text: 'Archiving Streams', link: '/events/archiving' }, + { text: 'Optimizing Performance', link: '/events/optimizing' }, + { text: 'Projections Overview', link: '/events/projections/', collapsed: true, items: [ { diff --git a/docs/configuration/storeoptions.md b/docs/configuration/storeoptions.md index 242214e5f1..b3299d5b87 100644 --- a/docs/configuration/storeoptions.md +++ b/docs/configuration/storeoptions.md @@ -211,7 +211,7 @@ public class ConfiguresItself } } ``` -snippet source | anchor +snippet source | anchor The `DocumentMapping` type is the core configuration class representing how a document type is persisted or @@ -235,7 +235,7 @@ public class ConfiguresItselfSpecifically } } ``` -snippet source | anchor +snippet source | anchor ## Document Policies diff --git a/docs/diagnostics.md b/docs/diagnostics.md index 4e4ca37241..26792ef765 100644 --- a/docs/diagnostics.md +++ b/docs/diagnostics.md @@ -4,6 +4,37 @@ So far, Marten has diagnostics, command logging, and unit of work life cycle tra For information on accessing and previewing the database schema objects generated by Marten, see [Marten and Postgres Schema](/schema/) +## Disabling Npgsql Logging + +The built in Npgsql logging is turned on by default in Marten, so to disable that logging so you +can actually glean some value from your logs without blowing up the storage costs for your logging +provider, use this flag: + + + +```cs +var builder = Host.CreateDefaultBuilder(); +builder.ConfigureServices(services => +{ + services.AddMarten(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + + // Disable the absurdly verbose Npgsql logging + opts.DisableNpgsqlLogging = true; + + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.UseIdentityMapForInlineAggregates = true; + + opts.Projections.Add(ProjectionLifecycle.Inline); + }); +}); +``` +snippet source | anchor + + +The Marten team will be considering reversing the default for this behavior in Marten 8. + ## Listening for Document Store Events ::: tip INFO @@ -508,7 +539,7 @@ The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSessi // session to pipe Marten logging to the xUnit.Net output theSession.Logger = new TestOutputMartenLogger(_output); ``` -snippet source | anchor +snippet source | anchor ## Previewing the PostgreSQL Query Plan diff --git a/docs/documents/identity.md b/docs/documents/identity.md index 9ec555d505..90aa18e68c 100644 --- a/docs/documents/identity.md +++ b/docs/documents/identity.md @@ -337,7 +337,7 @@ public struct Task2Id public static Task2Id From(Guid value) => new Task2Id(value); } ``` -snippet source | anchor +snippet source | anchor In _all_ cases, the type name will have to be suffixed with "Id" (and it's case sensitive) to be considered by Marten to be diff --git a/docs/documents/storage.md b/docs/documents/storage.md index b5b80e0c3c..f92a5ea58e 100644 --- a/docs/documents/storage.md +++ b/docs/documents/storage.md @@ -55,7 +55,7 @@ public class Customer [Identity] public string Name { get; set; } } ``` -snippet source | anchor +snippet source | anchor ## Type Aliases diff --git a/docs/documents/storing.md b/docs/documents/storing.md index d3b4146303..4f38b9ca94 100644 --- a/docs/documents/storing.md +++ b/docs/documents/storing.md @@ -81,7 +81,7 @@ using (var session = theStore.LightweightSession()) session.SaveChanges(); } ``` -snippet source | anchor +snippet source | anchor ## Bulk Loading @@ -151,7 +151,7 @@ await store.BulkInsertDocumentsAsync(data, BulkInsertMode.InsertsOnly); // being loaded await store.BulkInsertDocumentsAsync(data, BulkInsertMode.OverwriteExisting); ``` -snippet source | anchor +snippet source | anchor The bulk insert feature can also be used with multi-tenanted documents, but in that @@ -173,5 +173,5 @@ using var store = DocumentStore.For(opts => // If multi-tenanted await store.BulkInsertDocumentsAsync("a tenant id", data); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/events/appending.md b/docs/events/appending.md index 830eeeb54f..63852fc4e1 100644 --- a/docs/events/appending.md +++ b/docs/events/appending.md @@ -1,8 +1,10 @@ # Appending Events ::: tip -Marten V5.4 introduced the new `FetchForWriting()` and `IEventStream` models that streamline the workflow of capturing events against -an aggregated "write" model. +For CQRS style command handlers that append events to an existing event stream, the Marten team very +strongly recommends the [FetchForWriting](/scenarios/command_handler_workflow) API. This API is used underneath +the Wolverine [Aggregate Handler Workflow](https://wolverinefx.net/guide/durability/marten/event-sourcing.html) that is probably the very simplest possible way to build command handlers +with Marten event sourcing today. ::: With Marten, events are captured and appended to logical "streams" of events. Marten provides @@ -18,21 +20,55 @@ The event data is persisted to two tables: Events can be captured by either starting a new stream or by appending events to an existing stream. In addition, Marten has some tricks up its sleeve for dealing with concurrency issues that may result from multiple transactions trying to simultaneously append events to the same stream. -## "Rich" vs "Quick" Appends +## "Rich" vs "Quick" Appends + +::: tip +Long story short, the new "Quick" model appears to provide much better performance and scalability. +::: Before diving into starting new event streams or appending events to existing streams, just know that there are two different modes of event appending you can use with Marten: -snippet: sample_configuring_event_append_mode + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => + { + // This is the default Marten behavior from 4.0 on + opts.Events.AppendMode = EventAppendMode.Rich; + + // Lighter weight mode that should result in better + // performance, but with a loss of available metadata + // within inline projections + opts.Events.AppendMode = EventAppendMode.Quick; + }) + .UseNpgsqlDataSource(); +``` +snippet source | anchor + The classic `Rich` mode will append events in a two step process where the local session will first determine all possible metadata for the events about to be appended such that inline projections can use event versions and the global event sequence numbers at the time that the inline projections are created. +::: warning +If you are using `Inline` projections with the "Quick" mode, just be aware that you will not have access to the final +event sequence or stream version at the time the projections are built. Marten _is_ able to set the stream version into +a single stream projection document built `Inline`, but that's done on the server side. Just be warned. +::: + The newer `Quick` mode eschews version and sequence metadata in favor of performing the event append and stream creation operations with minimal overhead. The improved performance comes at the cost of not having the `IEvent.Version` and `IEvent.Sequence` information available at the time that inline projections are executed. +From initial load testing, the "Quick" mode appears to lead to a 40-50% time reduction Marten's process of appending +events. Your results will vary of course. Maybe more importantly, the "Quick" mode seems to make a large positive +in the functioning of the asynchronous projections and subscriptions by preventing the event "skipping" issue that +can happen with the "Rich" mode when a system becomes slow under heavy loads. Lastly, the Marten team believes that the +"Quick" mode can alleviate concurrency issues from trying to append events to the same stream without utilizing optimistic +or exclusive locking on the stream. + If using inline projections for a single stream (`SingleStreamProjection` or _snapshots_) and the `Quick` mode, the Marten team highly recommends using the `IRevisioned` interface on your projected aggregate documents so that Marten can "move" the version set by the database operations to the version of the projected documents loaded from the database later. Mapping a custom member @@ -167,3 +203,4 @@ in the event store sequence due to failed transactions. Marten V4 introduced sup event sequence numbers that failed in a Marten transaction. This is done strictly to improve the functioning of the [async daemon](/events/projections/async-daemon) that looks for gaps in the event sequence to "know" how far it's safe to process asynchronous projections. If you see event rows in your database of type "tombstone", it's representative of failed transactions (maybe from optimistic concurrency violations, transient network issues, timeouts, etc.). + diff --git a/docs/events/archiving.md b/docs/events/archiving.md index c6f18a9e74..0b8d5c0c8e 100644 --- a/docs/events/archiving.md +++ b/docs/events/archiving.md @@ -1,9 +1,24 @@ # Archiving Event Streams -New in Marten V4 is the ability to mark an event stream and all of its events as "archived." While -in the future this may have serious optimization benefits when Marten is able to utilize -Postgresql sharding, today it's metadata and default filtering in the Linq querying against event -data as well as asynchronous projections inside of the [async daemon](/events/projections/async-daemon). +Like most (all?) event stores, Marten is designed around the idea of the events being persisted to a single file, immutable +log of events. All the same though, there are going to be problem domains where certain event streams become obsolete. Maybe +because a workflow is completed, maybe through time based expiry rules, or maybe because a customer or user is removed +from the system. To help optimize Marten's event store usage, you can take advantage of the stream archiving to +mark events as archived on a stream by stream basis. + +::: warning +You can obviously use pure SQL to modify the events persisted by Marten. While that might be valuable in some cases, +we urge you to be cautious about doing so. +::: + +The impact of archiving an event stream is: + +* In the "classic" usage of Marten, the relevant stream and event rows are marked with an `is_archived = TRUE` +* With the "opt in" table partitioning model for "hot/cold" storage described in the next section, the stream and event rows are + moved to the archived partition tables for streams and events +* The [async daemon](/events/projections/async-daemon) subsystem process that processes projections and subscriptions in a background process automatically ignores + archived events -- but that can be modified on a per projection/subscription basis +* Archived events are excluded by default from any event data queries through the LINQ support in Marten To mark a stream as archived, it's just this syntax: @@ -16,13 +31,18 @@ public async Task SampleArchive(IDocumentSession session, string streamId) await session.SaveChangesAsync(); } ``` -snippet source | anchor +snippet source | anchor As in all cases with an `IDocumentSession`, you need to call `SaveChanges()` to commit the unit of work. -The `mt_events` and `mt_streams` tables now both have a boolean column named `is_archived`. +::: tip +At this point, you will also have to manually delete any projected aggregates based on the event streams being +archived if that is desirable +::: + +The `mt_events` and `mt_streams` tables both have a boolean column named `is_archived`. Archived events are filtered out of all event Linq queries by default. But of course, there's a way to query for archived events with the `IsArchived` property of `IEvent` as shown below: @@ -35,7 +55,7 @@ var events = await theSession.Events .Where(x => x.IsArchived) .ToListAsync(); ``` -snippet source | anchor +snippet source | anchor You can also query for all events both archived and not archived with `MaybeArchived()` @@ -47,5 +67,45 @@ like so: var events = await theSession.Events.QueryAllRawEvents() .Where(x => x.MaybeArchived()).ToListAsync(); ``` -snippet source | anchor +snippet source | anchor + + +## Hot/Cold Storage Partitioning + +::: warning +This option will only be beneficial if you are being aggressive about marking obsolete, old, or expired event data +as archived. +::: + +Want your system using Marten to scale and perform even better than it already does? If you're leveraging +event archiving in your application workflow, you can possibly derive some significant performance and scalability +improvements by opting into using PostgreSQL native table partitioning on the event and event stream data +to partition the "hot" (active) and "cold" (archived) events into separate partition tables. + +The long and short of this option is that it keeps the active `mt_streams` and `mt_events` tables smaller, which pretty +well always results in better performance over time. + +The simple flag for this option is: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection("some connection string"); + + // Turn on the PostgreSQL table partitioning for + // hot/cold storage on archived events + opts.Events.UseArchivedStreamPartitioning = true; +}); +``` +snippet source | anchor + +::: warning +If you are turning this option on to an existing system, you may want to run the database schema migration script +by hand rather than trying to let Marten do it automatically. The data migration from non-partitioned to partitioned +will probably require system downtime because it actually has to copy the old table data, drop the old table, create the new +table, copy all the existing data from the temp table to the new partitioned table, and finally drop the temporary table. +::: diff --git a/docs/events/optimizing.md b/docs/events/optimizing.md new file mode 100644 index 0000000000..7b57f36a0b --- /dev/null +++ b/docs/events/optimizing.md @@ -0,0 +1,47 @@ +# Optimizing for Performance and Scalability + +::: tip +The asynchronous projection and subscription support can in some cases suffer some event "skipping" when transactions +that are appending transactions become slower than the `StoreOptions.Projections.StaleSequenceThreshold` (the default is only 3 seconds). + +From initial testing, the `Quick` append mode seems to stop this problem altogether. This only seems to be an issue with +very large data loads. +::: + +Marten has several options to potentially increase the performance and scalability of a system that uses +the event sourcing functionality: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection("some connection string"); + + // Turn on the PostgreSQL table partitioning for + // hot/cold storage on archived events + opts.Events.UseArchivedStreamPartitioning = true; + + // Use the *much* faster workflow for appending events + // at the cost of *some* loss of metadata usage for + // inline projections + opts.Events.AppendMode = EventAppendMode.Quick; + + // Little more involved, but this can reduce the number + // of database queries necessary to process inline projections + // during command handling with some significant + // caveats + opts.Events.UseIdentityMapForInlineAggregates = true; +}); +``` +snippet source | anchor + + +The archived stream option is further described in the section on [Hot/Cold Storage Partitioning](/events/archiving.html#hot-cold-storage-partitioning). + +See the ["Rich" vs "Quick" Appends](/events/appending.html#rich-vs-quick-appends) section for more information about the +applicability and drawbacks of the "Quick" event appending. + +Lastly, see [Optimizing FetchForWriting with Inline Aggregates](/scenarios/command_handler_workflow.html#optimizing-fetchforwriting-with-inline-aggregates) for more information +about the `UseIdentityMapForInlineAggregates` option. diff --git a/docs/events/projections/custom-aggregates.md b/docs/events/projections/custom-aggregates.md index 6591d2550c..bf737c45ef 100644 --- a/docs/events/projections/custom-aggregates.md +++ b/docs/events/projections/custom-aggregates.md @@ -25,7 +25,7 @@ public class Increment { } ``` -snippet source | anchor +snippet source | anchor And a simple aggregate document type like this: @@ -49,7 +49,7 @@ public class StartAndStopAggregate: ISoftDeleted } } ``` -snippet source | anchor +snippet source | anchor As you can see, `StartAndStopAggregate` as a `Guid` as its identity and is also [soft-deleted](/documents/deletes.html#soft-deletes) when stored by @@ -124,7 +124,7 @@ public class StartAndStopProjection: CustomProjectionsnippet source | anchor +snippet source | anchor ## Custom Grouping diff --git a/docs/events/projections/ioc.md b/docs/events/projections/ioc.md index da6a2ee554..8b1808b7ca 100644 --- a/docs/events/projections/ioc.md +++ b/docs/events/projections/ioc.md @@ -56,7 +56,7 @@ public class ProductProjection: CustomProjection } } ``` -snippet source | anchor +snippet source | anchor Now, we *want* to use this projection at runtime within Marten, and need to register the projection @@ -75,6 +75,7 @@ using var host = await Host.CreateDefaultBuilder() { opts.Connection(ConnectionSource.ConnectionString); opts.DatabaseSchemaName = "ioc"; + opts.ApplyChangesLockId = opts.ApplyChangesLockId + 2; }) // Note that this is chained after the call to AddMarten() .AddProjectionWithServices( @@ -84,7 +85,7 @@ using var host = await Host.CreateDefaultBuilder() }) .StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that we're having to explicitly specify the projection lifecycle for the projection used within diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md index c4300ce665..dd54b9ab50 100644 --- a/docs/scenarios/command_handler_workflow.md +++ b/docs/scenarios/command_handler_workflow.md @@ -279,3 +279,24 @@ public Task Handle4(MarkItemReady command, IDocumentSession session) ``` snippet source | anchor + +## Optimizing FetchForWriting with Inline Aggregates + +If you are utilizing `FetchForWriting()` for your command handlers -- and you really, really should! -- and at least +some of your aggregates are updated `Inline` as shown below: + +snippet: sample_registering_Order_as_Inline + +You can potentially gain some significant performance optimization by using the `UseIdentityMapForInlineAggregates` +flag shown above. To be clear, this optimization mostly helps when you have the combination in a command handler that: + +1. Uses `FetchForWriting` for an aggregate type +2. That aggregate type is updated or built through an `Inline` projection or snapshot + +With this optimization, Marten will take steps to make sure that it uses the version of the aggregate document that was +originally fetched by `FetchForWriting()` as the starting point for updating that aggregate in its `Inline` projection +with the events that were appended by the command itself. + +**This optimization will be harmful if you alter the loaded aggregate in any way between `FetchForWriting()` and `SaveChangesAsync()` +by potentially making your projected data being saved be invalid.** + diff --git a/src/EventAppenderPerfTester/Program.cs b/src/EventAppenderPerfTester/Program.cs index 343c3a95e3..68f5cdd567 100644 --- a/src/EventAppenderPerfTester/Program.cs +++ b/src/EventAppenderPerfTester/Program.cs @@ -5,6 +5,8 @@ using Microsoft.Extensions.Hosting; using Oakton; +#region sample_disabling_npgsql_logging + var builder = Host.CreateDefaultBuilder(); builder.ConfigureServices(services => { @@ -12,6 +14,7 @@ { opts.Connection(ConnectionSource.ConnectionString); + // Disable the absurdly verbose Npgsql logging opts.DisableNpgsqlLogging = true; opts.Events.AppendMode = EventAppendMode.Quick; @@ -21,5 +24,7 @@ }); }); +#endregion + return await builder.RunOaktonCommands(args); diff --git a/src/EventSourcingTests/Examples/Optimizations.cs b/src/EventSourcingTests/Examples/Optimizations.cs new file mode 100644 index 0000000000..e1349c0d76 --- /dev/null +++ b/src/EventSourcingTests/Examples/Optimizations.cs @@ -0,0 +1,56 @@ +using System.Threading.Tasks; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; + +namespace EventSourcingTests.Examples; + +public class Optimizations +{ + public static async Task use_partitioning() + { + + #region sample_turn_on_stream_archival_partitioning + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection("some connection string"); + + // Turn on the PostgreSQL table partitioning for + // hot/cold storage on archived events + opts.Events.UseArchivedStreamPartitioning = true; + }); + + #endregion + } + + public static async Task use_optimizations() + { + + #region sample_turn_on_optimizations_for_event_sourcing + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection("some connection string"); + + // Turn on the PostgreSQL table partitioning for + // hot/cold storage on archived events + opts.Events.UseArchivedStreamPartitioning = true; + + // Use the *much* faster workflow for appending events + // at the cost of *some* loss of metadata usage for + // inline projections + opts.Events.AppendMode = EventAppendMode.Quick; + + // Little more involved, but this can reduce the number + // of database queries necessary to process inline projections + // during command handling with some significant + // caveats + opts.Events.UseIdentityMapForInlineAggregates = true; + }); + + #endregion + } +} diff --git a/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs b/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs index b9d9f96d75..fcf396e837 100644 --- a/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs +++ b/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs @@ -4,6 +4,8 @@ using System.Threading.Tasks; using Marten; using Marten.Events; +using Marten.Events.Projections; +using Microsoft.Extensions.Hosting; using NSubstitute; namespace EventSourcingTests.Examples; @@ -197,3 +199,32 @@ public Task Handle4(MarkItemReady command, IDocumentSession session) #endregion } + +public static class BootstrappingSample +{ + public static async Task bootstrap() + { + #region sample_registering_Order_as_Inline + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection("some connection string"); + + // The Order aggregate is updated Inline inside the + // same transaction as the events being appended + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + + // Opt into an optimization for the inline aggregates + // used with FetchForWriting() + opts.Projections.UseIdentityMapForInlineAggregates = true; + }) + + // This is also a performance optimization in Marten to disable the + // identity map tracking overall in Marten sessions if you don't + // need that tracking at runtime + .UseLightweightSessions(); + + #endregion + } +} diff --git a/src/EventSourcingTests/archiving_events.cs b/src/EventSourcingTests/archiving_events.cs index 5967f5cfdc..6854d373c1 100644 --- a/src/EventSourcingTests/archiving_events.cs +++ b/src/EventSourcingTests/archiving_events.cs @@ -6,6 +6,7 @@ using Marten.Events; using Marten.Events.Archiving; using Marten.Exceptions; +using Marten.Storage; using Marten.Testing.Harness; using Shouldly; using Weasel.Core; @@ -15,11 +16,11 @@ namespace EventSourcingTests; -public class archiving_events: IntegrationContext +public class archiving_events: OneOffConfigurationsContext { private readonly ITestOutputHelper _output; - public archiving_events(DefaultStoreFixture fixture, ITestOutputHelper output): base(fixture) + public archiving_events(ITestOutputHelper output) { _output = output; } @@ -34,9 +35,13 @@ public async Task SampleArchive(IDocumentSession session, string streamId) #endregion - [Fact] - public async Task archive_stream_by_guid() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task archive_stream_by_guid(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + var stream = Guid.NewGuid(); theSession.Events.StartStream(stream, new AEvent(), new BEvent(), new CEvent()); @@ -48,7 +53,7 @@ public async Task archive_stream_by_guid() stream1.IsArchived.ShouldBeFalse(); var isArchived = await theSession.Connection - .CreateCommand("select is_archived from mt_events where stream_id = :stream") + .CreateCommand($"select is_archived from {SchemaName}.mt_events where stream_id = :stream") .With("stream", stream).FetchListAsync(); // None of the events should be archived @@ -61,7 +66,50 @@ public async Task archive_stream_by_guid() stream2.IsArchived.ShouldBeTrue(); isArchived = await theSession.Connection - .CreateCommand("select is_archived from mt_events where stream_id = :stream") + .CreateCommand($"select is_archived from {SchemaName}.mt_events where stream_id = :stream") + .With("stream", stream).FetchListAsync(); + + // All of the events should be archived + isArchived.All(x => x).ShouldBeTrue(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task archive_stream_by_guid_when_tenanted(bool usePartitioning) + { + StoreOptions(opts => + { + opts.Events.UseArchivedStreamPartitioning = usePartitioning; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + }); + + var stream = Guid.NewGuid(); + + var session = theStore.LightweightSession("one"); + session.Events.StartStream(stream, new AEvent(), new BEvent(), new CEvent()); + await session.SaveChangesAsync(); + + session.Logger = new TestOutputMartenLogger(_output); + + var stream1 = await session.Events.FetchStreamStateAsync(stream); + stream1.IsArchived.ShouldBeFalse(); + + var isArchived = await session.Connection + .CreateCommand($"select is_archived from {SchemaName}.mt_events where stream_id = :stream") + .With("stream", stream).FetchListAsync(); + + // None of the events should be archived + isArchived.All(x => !x).ShouldBeTrue(); + + session.Events.ArchiveStream(stream); + await session.SaveChangesAsync(); + + var stream2 = await session.Events.FetchStreamStateAsync(stream); + stream2.IsArchived.ShouldBeTrue(); + + isArchived = await session.Connection + .CreateCommand($"select is_archived from {SchemaName}.mt_events where stream_id = :stream") .With("stream", stream).FetchListAsync(); // All of the events should be archived @@ -76,7 +124,7 @@ public async Task fetch_stream_filters_out_archived_events() theSession.Events.StartStream(stream, new AEvent(), new BEvent(), new CEvent(), new CEvent(), new CEvent()); await theSession.SaveChangesAsync(); - await theSession.Connection.CreateCommand("update mt_events set is_archived = TRUE where version < 2") + await theSession.Connection.CreateCommand($"update {SchemaName}.mt_events set is_archived = TRUE where version < 2") .ExecuteNonQueryAsync(); var events = await theSession.Events.FetchStreamAsync(stream); @@ -85,10 +133,16 @@ await theSession.Connection.CreateCommand("update mt_events set is_archived = TR } - [Fact] - public async Task archive_stream_by_string() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task archive_stream_by_string(bool usePartitioning) { - UseStreamIdentity(StreamIdentity.AsString); + StoreOptions(opts => + { + opts.Events.UseArchivedStreamPartitioning = usePartitioning; + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); var stream = Guid.NewGuid().ToString(); @@ -99,7 +153,7 @@ public async Task archive_stream_by_string() stream1.IsArchived.ShouldBeFalse(); var isArchived = await theSession.Connection - .CreateCommand("select is_archived from string_events.mt_events where stream_id = :stream") + .CreateCommand($"select is_archived from {SchemaName}.mt_events where stream_id = :stream") .With("stream", stream).FetchListAsync(); // None of the events should be archived @@ -112,16 +166,20 @@ public async Task archive_stream_by_string() stream2.IsArchived.ShouldBeTrue(); isArchived = await theSession.Connection - .CreateCommand("select is_archived from string_events.mt_events where stream_id = :stream") + .CreateCommand($"select is_archived from {SchemaName}.mt_events where stream_id = :stream") .With("stream", stream).FetchListAsync(); // All of the events should be archived isArchived.All(x => x).ShouldBeTrue(); } - [Fact] - public async Task query_by_events_filters_out_archived_events_by_default() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task query_by_events_filters_out_archived_events_by_default(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); var stream1 = Guid.NewGuid(); @@ -143,9 +201,13 @@ public async Task query_by_events_filters_out_archived_events_by_default() events.All(x => x.StreamId != stream2).ShouldBeTrue(); } - [Fact] - public async Task query_by_events_and_explicitly_search_for_archived_events() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task query_by_events_and_explicitly_search_for_archived_events(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); var stream1 = Guid.NewGuid(); @@ -158,11 +220,11 @@ public async Task query_by_events_and_explicitly_search_for_archived_events() await theSession.SaveChangesAsync(); + theSession.Logger = new TestOutputMartenLogger(_output); + theSession.Events.ArchiveStream(stream2); await theSession.SaveChangesAsync(); - theSession.Logger = new TestOutputMartenLogger(_output); - #region sample_querying_for_archived_events var events = await theSession.Events @@ -176,9 +238,13 @@ public async Task query_by_events_and_explicitly_search_for_archived_events() events.All(x => x.StreamId == stream2).ShouldBeTrue(); } - [Fact] - public async Task query_by_events_and_explicitly_search_for_maybe_archived_events() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task query_by_events_and_explicitly_search_for_maybe_archived_events(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); var stream1 = Guid.NewGuid(); @@ -207,9 +273,13 @@ public async Task query_by_events_and_explicitly_search_for_maybe_archived_event events.Count.ShouldBe(9); } - [Fact] - public async Task query_by_a_specific_event_filters_out_archived_events_by_default() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task query_by_a_specific_event_filters_out_archived_events_by_default(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); var stream1 = Guid.NewGuid(); @@ -242,9 +312,13 @@ public async Task query_by_a_specific_event_filters_out_archived_events_by_defau events.All(x => x.Tracker != aEvent2.Tracker).ShouldBeTrue(); } - [Fact] - public async Task prevent_append_operation_for_archived_stream_on_sync_commit() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task prevent_append_operation_for_archived_stream_on_sync_commit(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); var streamId = Guid.NewGuid(); @@ -262,9 +336,13 @@ public async Task prevent_append_operation_for_archived_stream_on_sync_commit() thrownException.Message.ShouldBe($"Attempted to append event to archived stream with Id '{streamId}'."); } - [Fact] - public async Task prevent_append_operation_for_archived_stream_on_async_commit() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task prevent_append_operation_for_archived_stream_on_async_commit(bool usePartitioning) { + StoreOptions(opts => opts.Events.UseArchivedStreamPartitioning = usePartitioning); + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); var streamId = Guid.NewGuid(); diff --git a/src/Marten.sln b/src/Marten.sln index 0182c60c0a..665f4fb4e4 100644 --- a/src/Marten.sln +++ b/src/Marten.sln @@ -116,10 +116,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventAppenderPerfTester", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StressTests", "StressTests\StressTests.csproj", "{C9D33381-3AD3-4005-B854-F04F10EA837F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Weasel.Core", "..\..\weasel\src\Weasel.Core\Weasel.Core.csproj", "{F1524942-FA31-4A11-A5DD-EE92B7806511}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Weasel.Postgresql", "..\..\weasel\src\Weasel.Postgresql\Weasel.Postgresql.csproj", "{0042C325-5957-4F76-AC4C-B1C417AAF93D}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -286,14 +282,6 @@ Global {C9D33381-3AD3-4005-B854-F04F10EA837F}.Debug|Any CPU.Build.0 = Debug|Any CPU {C9D33381-3AD3-4005-B854-F04F10EA837F}.Release|Any CPU.ActiveCfg = Release|Any CPU {C9D33381-3AD3-4005-B854-F04F10EA837F}.Release|Any CPU.Build.0 = Release|Any CPU - {F1524942-FA31-4A11-A5DD-EE92B7806511}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {F1524942-FA31-4A11-A5DD-EE92B7806511}.Debug|Any CPU.Build.0 = Debug|Any CPU - {F1524942-FA31-4A11-A5DD-EE92B7806511}.Release|Any CPU.ActiveCfg = Release|Any CPU - {F1524942-FA31-4A11-A5DD-EE92B7806511}.Release|Any CPU.Build.0 = Release|Any CPU - {0042C325-5957-4F76-AC4C-B1C417AAF93D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {0042C325-5957-4F76-AC4C-B1C417AAF93D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {0042C325-5957-4F76-AC4C-B1C417AAF93D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {0042C325-5957-4F76-AC4C-B1C417AAF93D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Marten/Events/Archiving/ArchiveStreamFunction.cs b/src/Marten/Events/Archiving/ArchiveStreamFunction.cs index 2638b1f8bb..bbc3e01343 100644 --- a/src/Marten/Events/Archiving/ArchiveStreamFunction.cs +++ b/src/Marten/Events/Archiving/ArchiveStreamFunction.cs @@ -1,4 +1,8 @@ using System.IO; +using System.Linq; +using JasperFx.Core; +using Marten.Events.Schema; +using Marten.Storage; using Weasel.Core; using Weasel.Postgresql; using Weasel.Postgresql.Functions; @@ -22,12 +26,52 @@ public override void WriteCreateStatement(Migrator rules, TextWriter writer) ? "streamid uuid" : "streamid varchar"; + if (_events.TenancyStyle == TenancyStyle.Conjoined) + { + argList += ", tenantid varchar"; + } + + var tenantWhere = _events.TenancyStyle == TenancyStyle.Conjoined ? " and tenant_id = tenantid" : ""; + + if (_events.UseArchivedStreamPartitioning) + { + writeWithPartitioning(writer, argList, tenantWhere); + } + else + { + writeSimple(writer, argList, tenantWhere); + } + } + + private void writeWithPartitioning(TextWriter writer, string argList, string tenantWhere) + { + var eventColumns = new EventsTable(_events).Columns.Where(x => x.Name != IsArchivedColumn.ColumnName) + .Select(x => x.Name).Join(", "); + + var streamColumns = new StreamsTable(_events).Columns.Where(x => x.Name != IsArchivedColumn.ColumnName) + .Select(x => x.Name).Join(", "); + + writer.WriteLine($@" +CREATE OR REPLACE FUNCTION {_events.DatabaseSchemaName}.{Name}({argList}) RETURNS VOID LANGUAGE plpgsql AS +$function$ +BEGIN + insert into {_events.DatabaseSchemaName}.mt_streams select {streamColumns}, TRUE from {_events.DatabaseSchemaName}.mt_streams where id = streamid {tenantWhere}; + insert into {_events.DatabaseSchemaName}.mt_events select {eventColumns}, TRUE from {_events.DatabaseSchemaName}.mt_events where stream_id = streamid {tenantWhere}; + delete from {_events.DatabaseSchemaName}.mt_events where stream_id = streamid and {IsArchivedColumn.ColumnName} = FALSE {tenantWhere}; + delete from {_events.DatabaseSchemaName}.mt_streams where id = streamid and {IsArchivedColumn.ColumnName} = FALSE {tenantWhere}; +END; +$function$; +"); + } + + private void writeSimple(TextWriter writer, string argList, string tenantWhere) + { writer.WriteLine($@" CREATE OR REPLACE FUNCTION {_events.DatabaseSchemaName}.{Name}({argList}) RETURNS VOID LANGUAGE plpgsql AS $function$ BEGIN - update {_events.DatabaseSchemaName}.mt_streams set {IsArchivedColumn.ColumnName} = TRUE where id = streamid; - update {_events.DatabaseSchemaName}.mt_events set is_archived = TRUE where stream_id = streamid; + update {_events.DatabaseSchemaName}.mt_streams set {IsArchivedColumn.ColumnName} = TRUE where id = streamid {tenantWhere}; + update {_events.DatabaseSchemaName}.mt_events set is_archived = TRUE where stream_id = streamid {tenantWhere}; END; $function$; "); diff --git a/src/Marten/Events/Archiving/ArchiveStreamOperation.cs b/src/Marten/Events/Archiving/ArchiveStreamOperation.cs index 6a35d46be5..193776b95e 100644 --- a/src/Marten/Events/Archiving/ArchiveStreamOperation.cs +++ b/src/Marten/Events/Archiving/ArchiveStreamOperation.cs @@ -5,6 +5,8 @@ using System.Threading.Tasks; using Marten.Internal; using Marten.Internal.Operations; +using Marten.Storage; +using NpgsqlTypes; using Weasel.Postgresql; namespace Marten.Events.Archiving; @@ -22,11 +24,24 @@ public ArchiveStreamOperation(EventGraph events, object streamId) public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) { - var parameter = - builder.AppendWithParameters($"select {_events.DatabaseSchemaName}.{ArchiveStreamFunction.Name}(?)")[0]; - parameter.Value = _streamId; + if (_events.TenancyStyle == TenancyStyle.Conjoined) + { + var parameters = builder.AppendWithParameters($"select {_events.DatabaseSchemaName}.{ArchiveStreamFunction.Name}(?, ?)"); + parameters[0].Value = _streamId; + parameters[0].NpgsqlDbType = _events.StreamIdDbType; + parameters[1].Value = session.TenantId; + parameters[1].NpgsqlDbType = NpgsqlDbType.Varchar; + } + else + { + var parameter = + builder.AppendWithParameters($"select {_events.DatabaseSchemaName}.{ArchiveStreamFunction.Name}(?)")[0]; + parameter.Value = _streamId; + + parameter.NpgsqlDbType = _events.StreamIdDbType; + } + - parameter.NpgsqlDbType = _events.StreamIdDbType; } diff --git a/src/Marten/Events/Projections/ProjectionOptions.cs b/src/Marten/Events/Projections/ProjectionOptions.cs index e89785c824..616988bfe5 100644 --- a/src/Marten/Events/Projections/ProjectionOptions.cs +++ b/src/Marten/Events/Projections/ProjectionOptions.cs @@ -88,6 +88,17 @@ internal IEnumerable allPlanners() internal IList All { get; } = new List(); + /// + /// Opt into a performance optimization that directs Marten to always use the identity map for an + /// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false. + /// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten + /// + public bool UseIdentityMapForInlineAggregates + { + get => _options.Events.UseIdentityMapForInlineAggregates; + set => _options.Events.UseIdentityMapForInlineAggregates = value; + } + internal bool HasAnyAsyncProjections() { return All.Any(x => x.Lifecycle == ProjectionLifecycle.Async) || _subscriptions.Any(); diff --git a/src/Marten/Events/Schema/EventsTable.cs b/src/Marten/Events/Schema/EventsTable.cs index d920d379cd..bde1cf6d31 100644 --- a/src/Marten/Events/Schema/EventsTable.cs +++ b/src/Marten/Events/Schema/EventsTable.cs @@ -34,32 +34,69 @@ public EventsTable(EventGraph events): base(new PostgresqlObjectName(events.Data if (events.TenancyStyle == TenancyStyle.Conjoined) { - ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id_tenant_id") + if (events.UseArchivedStreamPartitioning) { - ColumnNames = new[] { TenantIdColumn.Name, "stream_id" }, - LinkedNames = new[] { TenantIdColumn.Name, "id" }, - LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_streams") - }); - - Indexes.Add(new IndexDefinition("pk_mt_events_stream_and_version") + ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id_tenant_id_is_archived") + { + ColumnNames = new[] { TenantIdColumn.Name, "stream_id", "is_archived" }, + LinkedNames = new[] { TenantIdColumn.Name, "id", "is_archived" }, + LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_streams") + }); + + Indexes.Add(new IndexDefinition("pk_mt_events_stream_and_version") + { + IsUnique = true, Columns = new[] { TenantIdColumn.Name, "stream_id", "version", "is_archived" } + }); + } + else { - IsUnique = true, Columns = new[] { TenantIdColumn.Name, "stream_id", "version" } - }); + ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id_tenant_id") + { + ColumnNames = new[] { TenantIdColumn.Name, "stream_id" }, + LinkedNames = new[] { TenantIdColumn.Name, "id" }, + LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_streams") + }); + + Indexes.Add(new IndexDefinition("pk_mt_events_stream_and_version") + { + IsUnique = true, Columns = new[] { TenantIdColumn.Name, "stream_id", "version" } + }); + } + + } else { - ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id") + if (events.UseArchivedStreamPartitioning) { - ColumnNames = new[] { "stream_id" }, - LinkedNames = new[] { "id" }, - LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_streams"), - OnDelete = CascadeAction.Cascade - }); - - Indexes.Add(new IndexDefinition("pk_mt_events_stream_and_version") + ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id_is_archived") + { + ColumnNames = new[] { "stream_id", "is_archived" }, + LinkedNames = new[] { "id", "is_archived" }, + LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_streams"), + OnDelete = CascadeAction.Cascade + }); + + Indexes.Add(new IndexDefinition("pk_mt_events_stream_and_version") + { + IsUnique = true, Columns = new[] { "stream_id", "version", "is_archived" } + }); + } + else { - IsUnique = true, Columns = new[] { "stream_id", "version" } - }); + ForeignKeys.Add(new ForeignKey("fkey_mt_events_stream_id") + { + ColumnNames = new[] { "stream_id" }, + LinkedNames = new[] { "id" }, + LinkedTable = new PostgresqlObjectName(events.DatabaseSchemaName, "mt_streams"), + OnDelete = CascadeAction.Cascade + }); + + Indexes.Add(new IndexDefinition("pk_mt_events_stream_and_version") + { + IsUnique = true, Columns = new[] { "stream_id", "version" } + }); + } } if (events.EnableUniqueIndexOnEventId) diff --git a/src/Marten/Internal/CompiledQueries/CompiledQueryPlan.cs b/src/Marten/Internal/CompiledQueries/CompiledQueryPlan.cs index 2ed3210fa7..c40ba79b00 100644 --- a/src/Marten/Internal/CompiledQueries/CompiledQueryPlan.cs +++ b/src/Marten/Internal/CompiledQueries/CompiledQueryPlan.cs @@ -145,6 +145,11 @@ NpgsqlParameter ICommandBuilder.AppendParameter(T value) return usage.Parameter; } + NpgsqlParameter ICommandBuilder.AppendParameter(T value, NpgsqlDbType dbType) + { + throw new NotSupportedException(); + } + private int _parameterIndex = 0; NpgsqlParameter ICommandBuilder.AppendParameter(object value) @@ -184,6 +189,11 @@ void ICommandBuilder.AppendParameters(params object[] parameters) throw new NotSupportedException(); } + public IGroupedParameterBuilder CreateGroupedParameterBuilder(char? seperator = null) + { + throw new NotImplementedException(); + } + NpgsqlParameter[] ICommandBuilder.AppendWithParameters(string text) { _current ??= appendCommand(); diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 01c993680d..a327809f2d 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -48,7 +48,7 @@ - + @@ -69,8 +69,5 @@ - - -