Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection Rebuild optimization and other improvements to async projections & tests #3466

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/configuration/prebuilding.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ public static class Program
{
opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.DatabaseSchemaName = "cli";
opts.DisableNpgsqlLogging = true;

opts.Events.UseOptimizedProjectionRebuilds = true;

opts.MultiTenantedWithSingleServer(
ConnectionSource.ConnectionString,
Expand All @@ -162,7 +165,7 @@ public static class Program
// *try* to use pre-generated code at runtime
opts.GeneratedCodeMode = TypeLoadMode.Auto;

opts.Schema.For<Activity>().AddSubClass<DaemonTests.TestingSupport.Trip>();
//opts.Schema.For<Activity>().AddSubClass<DaemonTests.TestingSupport.Trip>();

// You have to register all persisted document types ahead of time
// RegisterDocumentType<T>() is the equivalent of saying Schema.For<T>()
Expand All @@ -188,13 +191,13 @@ public static class Program
.Add(new SimpleProjection(), ProjectionLifecycle.Inline);

// This is actually important to register "live" aggregations too for the code generation
opts.Projections.LiveStreamAggregation<Trip>();
//opts.Projections.LiveStreamAggregation<Trip>();
}).AddAsyncDaemon(DaemonMode.Solo);
});
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/Program.cs#L28-L95' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_pre_build_types' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/Program.cs#L28-L98' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_pre_build_types' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Okay, after all that, there should be a new command line option called `codegen` for your project. Assuming
Expand Down
8 changes: 4 additions & 4 deletions docs/documents/multi-tenancy.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ builder.Services.AddMarten(opts =>
opts.Policies.PartitionMultiTenantedDocumentsUsingMartenManagement("tenants");
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L113-L130' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_marten_managed_tenant_partitioning' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L151-L168' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_marten_managed_tenant_partitioning' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The tenant to partition name mapping will be stored in a table created by Marten called `mt_tenant_partitions` with
Expand All @@ -467,7 +467,7 @@ await theStore
// with the named tenant ids
.AddMartenManagedTenantsAsync(CancellationToken.None, "a1", "a2", "a3");
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L56-L64' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_add_managed_tenants_at_runtime' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L57-L65' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_add_managed_tenants_at_runtime' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The API above will try to add any missing table partitions to all known document types. There is also a separate overload
Expand All @@ -492,7 +492,7 @@ public class DocThatShouldBeExempted1
public Guid Id { get; set; }
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L184-L192' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_donotpartitionattribute' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L222-L230' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_donotpartitionattribute' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

or exempt a single document type through the fluent interface:
Expand All @@ -502,7 +502,7 @@ or exempt a single document type through the fluent interface:
```cs
opts.Schema.For<DocThatShouldBeExempted2>().DoNotPartition();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L169-L173' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_exempt_from_partitioning_through_fluent_interface' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs#L207-L211' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_exempt_from_partitioning_through_fluent_interface' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Implementation Details
Expand Down
83 changes: 82 additions & 1 deletion docs/events/optimizing.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ builder.Services.AddMarten(opts =>
// during command handling with some significant
// caveats
opts.Events.UseIdentityMapForInlineAggregates = true;

// Opts into a mode where Marten is able to rebuild single
// stream projections faster by building one stream at a time
// Does require new table migrations for Marten 7 users though
opts.Events.UseOptimizedProjectionRebuilds = true;
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/Optimizations.cs#L31-L54' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_turn_on_optimizations_for_event_sourcing' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/Optimizations.cs#L31-L60' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_turn_on_optimizations_for_event_sourcing' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The archived stream option is further described in the section on [Hot/Cold Storage Partitioning](/events/archiving.html#hot-cold-storage-partitioning).
Expand All @@ -45,3 +50,79 @@ applicability and drawbacks of the "Quick" event appending.

Lastly, see [Optimizing FetchForWriting with Inline Aggregates](/scenarios/command_handler_workflow.html#optimizing-fetchforwriting-with-inline-aggregates) for more information
about the `UseIdentityMapForInlineAggregates` option.

## Caching for Asynchronous Projections

You may be able to wring out more throughput for aggregated projections (`SingleStreamProjection`, `MultiStreamProjection`, `CustomProjection`)
by opting into 2nd level caching of the aggregated projected documents during asynchronous projection building. You can
do that by setting a greater than zero value for `CacheLimitPerTenant` directly inside of the aforementioned projection types
like so:

<!-- snippet: sample_showing_fanout_rules -->
<a id='snippet-sample_showing_fanout_rules'></a>
```cs
public class DayProjection: MultiStreamProjection<Day, int>
{
public DayProjection()
{
// Tell the projection how to group the events
// by Day document
Identity<IDayEvent>(x => x.Day);

// This just lets the projection work independently
// on each Movement child of the Travel event
// as if it were its own event
FanOut<Travel, Movement>(x => x.Movements);

// You can also access Event data
FanOut<Travel, Stop>(x => x.Data.Stops);

ProjectionName = "Day";

// Opt into 2nd level caching of up to 100
// most recently encountered aggregates as a
// performance optimization
CacheLimitPerTenant = 1000;

// With large event stores of relatively small
// event objects, moving this number up from the
// default can greatly improve throughput and especially
// improve projection rebuild times
Options.BatchSize = 5000;
}

public void Apply(Day day, TripStarted e) => day.Started++;
public void Apply(Day day, TripEnded e) => day.Ended++;

public void Apply(Day day, Movement e)
{
switch (e.Direction)
{
case Direction.East:
day.East += e.Distance;
break;
case Direction.North:
day.North += e.Distance;
break;
case Direction.South:
day.South += e.Distance;
break;
case Direction.West:
day.West += e.Distance;
break;

default:
throw new ArgumentOutOfRangeException();
}
}

public void Apply(Day day, Stop e) => day.Stops++;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/ViewProjectionTests.cs#L132-L192' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_showing_fanout_rules' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Marten is using a most recently used cache for the projected documents that are being built by an aggregation projection
so that updates from new events can be directly applied to the in memory documents instead of having to constantly
load those documents over and over again from the database as new events trickle in. This is of course much more effective
when your projection is constantly updating a relatively small number of different aggregates.
14 changes: 7 additions & 7 deletions docs/events/projections/aggregate-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And register that projection like this:
Expand Down Expand Up @@ -236,7 +236,7 @@ public class Trip
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L112-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L113-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Or finally, you can use a method named `Create()` on a projection type as shown in this sample:
Expand Down Expand Up @@ -272,7 +272,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Create()` method has to return either the aggregate document type or `Task<T>` where `T` is the aggregate document type. There must be an argument for the specific event type or `IEvent<T>` where `T` is the event type if you need access to event metadata. You can also take in an `IQuerySession` if you need to look up additional data as part of the transformation or `IEvent` in addition to the exact event type just to get at event metadata.
Expand Down Expand Up @@ -312,7 +312,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L168-L193' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_projectevent_in_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L169-L194' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_projectevent_in_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

I'm not personally that wild about using lots of inline Lambdas like the example above, and to that end, Marten now supports the `Apply()` method convention. Here's the same `TripProjection`, but this time using methods to mutate the `Trip` document:
Expand Down Expand Up @@ -348,7 +348,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Apply()` methods can accept any combination of these arguments:
Expand Down Expand Up @@ -517,7 +517,7 @@ public class Trip
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L112-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L113-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Here's an example of using the various ways of doing `Trip` stream aggregation:
Expand Down Expand Up @@ -551,7 +551,7 @@ internal async Task use_a_stream_aggregation()
var trip = await session.Events.AggregateStreamAsync<Trip>(tripId);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L80-L108' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TestingSupport/TripProjectionWithCustomName.cs#L81-L109' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Aggregate Versioning
Expand Down
6 changes: 3 additions & 3 deletions docs/events/projections/custom-aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Increment
{
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs#L462-L480' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom_aggregate_events' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs#L524-L542' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom_aggregate_events' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And a simple aggregate document type like this:
Expand All @@ -51,7 +51,7 @@ public class StartAndStopAggregate: ISoftDeleted
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs#L442-L460' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_startandstopaggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs#L504-L522' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_startandstopaggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

As you can see, `StartAndStopAggregate` as a `Guid` as its identity and is also [soft-deleted](/documents/deletes.html#soft-deletes) when stored by
Expand Down Expand Up @@ -126,7 +126,7 @@ public class StartAndStopProjection: CustomProjection<StartAndStopAggregate, Gui
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs#L482-L550' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom_aggregate_with_start_and_stop' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/CustomProjectionTests.cs#L544-L612' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom_aggregate_with_start_and_stop' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Custom Grouping
Expand Down
13 changes: 12 additions & 1 deletion docs/events/projections/multi-stream-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,17 @@ public class DayProjection: MultiStreamProjection<Day, int>
FanOut<Travel, Stop>(x => x.Data.Stops);

ProjectionName = "Day";

// Opt into 2nd level caching of up to 100
// most recently encountered aggregates as a
// performance optimization
CacheLimitPerTenant = 1000;

// With large event stores of relatively small
// event objects, moving this number up from the
// default can greatly improve throughput and especially
// improve projection rebuild times
Options.BatchSize = 5000;
}

public void Apply(Day day, TripStarted e) => day.Started++;
Expand Down Expand Up @@ -531,7 +542,7 @@ public class DayProjection: MultiStreamProjection<Day, int>
public void Apply(Day day, Stop e) => day.Stops++;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/ViewProjectionTests.cs#L132-L181' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_showing_fanout_rules' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/ViewProjectionTests.cs#L132-L192' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_showing_fanout_rules' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Using Custom Grouper with Fan Out Feature for Event Projections
Expand Down
Loading
Loading