Skip to content

Commit

Permalink
New system "Archived" event for easier usage of event archiving. Closes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Dec 12, 2024
1 parent 8519af7 commit 9ad8680
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs/diagnostics.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSessi
// session to pipe Marten logging to the xUnit.Net output
theSession.Logger = new TestOutputMartenLogger(_output);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L301-L307' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_replacing_logger_per_session' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L307-L313' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_replacing_logger_per_session' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Previewing the PostgreSQL Query Plan
Expand Down
133 changes: 130 additions & 3 deletions docs/events/archiving.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task SampleArchive(IDocumentSession session, string streamId)
await session.SaveChangesAsync();
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L28-L36' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_archive_stream_usage' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L34-L42' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_archive_stream_usage' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

As in all cases with an `IDocumentSession`, you need to call `SaveChanges()` to commit the
Expand All @@ -55,7 +55,7 @@ var events = await theSession.Events
.Where(x => x.IsArchived)
.ToListAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L228-L235' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_querying_for_archived_events' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L234-L241' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_querying_for_archived_events' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You can also query for all events both archived and not archived with `MaybeArchived()`
Expand All @@ -67,7 +67,7 @@ like so:
var events = await theSession.Events.QueryAllRawEvents()
.Where(x => x.MaybeArchived()).ToListAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L263-L268' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_query_for_maybe_archived_events' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/archiving_events.cs#L269-L274' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_query_for_maybe_archived_events' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Hot/Cold Storage Partitioning <Badge type="tip" text="7.25" />
Expand Down Expand Up @@ -112,4 +112,131 @@ table, copy all the existing data from the temp table to the new partitioned tab

## Archived Event <Badge type="tip" text="7.34" />

Marten has a built in event named `Archived` that can be appended to any event stream:

<!-- snippet: sample_Archived_event -->
<a id='snippet-sample_archived_event'></a>
```cs
namespace Marten.Events;

/// <summary>
/// The presence of this event marks a stream as "archived" when it is processed
/// by a single stream projection of any sort
/// </summary>
public record Archived(string Reason);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/Events/Archived.cs#L1-L11' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_archived_event' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

When this event is appended to an event stream *and* that event is processed through any type of single stream projection
for that event stream (snapshot or what we used to call a "self-aggregate", `SingleStreamProjection`, or `CustomProjection` with the `AggregateByStream` option),
Marten will automatically mark that entire event stream as archived as part of processing the projection. This applies for
both `Inline` and `Async` execution of projections.

Let's try to make this concrete by building a simple order processing system that might include this
aggregate:

<!-- snippet: sample_Order_for_optimized_command_handling -->
<a id='snippet-sample_order_for_optimized_command_handling'></a>
```cs
public class Item
{
public string Name { get; set; }
public bool Ready { get; set; }
}

public class Order
{
// This would be the stream id
public Guid Id { get; set; }

// This is important, by Marten convention this would
// be the
public int Version { get; set; }

public Order(OrderCreated created)
{
foreach (var item in created.Items)
{
Items[item.Name] = item;
}
}

public void Apply(IEvent<OrderShipped> shipped) => Shipped = shipped.Timestamp;
public void Apply(ItemReady ready) => Items[ready.Name].Ready = true;

public DateTimeOffset? Shipped { get; private set; }

public Dictionary<string, Item> Items { get; set; } = new();

public bool IsReadyToShip()
{
return Shipped == null && Items.Values.All(x => x.Ready);
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs#L23-L61' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_order_for_optimized_command_handling' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Next, let's say we're having the `Order` aggregate snapshotted so that it's updated every time new events
are captured like so:

<!-- snippet: sample_registering_Order_as_Inline -->
<a id='snippet-sample_registering_order_as_inline'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection("some connection string");

// The Order aggregate is updated Inline inside the
// same transaction as the events being appended
opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);

// Opt into an optimization for the inline aggregates
// used with FetchForWriting()
opts.Projections.UseIdentityMapForAggregates = true;
})

// This is also a performance optimization in Marten to disable the
// identity map tracking overall in Marten sessions if you don't
// need that tracking at runtime
.UseLightweightSessions();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs#L209-L230' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_registering_order_as_inline' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Now, let's say as a way to keep our application performing as well as possible, we'd like to be aggressive about archiving
shipped orders to keep the "hot" event storage table small. One way we can do that is to append the `Archived` event
as part of processing a command to ship an order like so:

<!-- snippet: sample_handling_shiporder_and_emitting_archived_event -->
<a id='snippet-sample_handling_shiporder_and_emitting_archived_event'></a>
```cs
public static async Task HandleAsync(ShipOrder command, IDocumentSession session)
{
var stream = await session.Events.FetchForWriting<Order>(command.OrderId);
var order = stream.Aggregate;

if (!order.Shipped.HasValue)
{
// Mark it as shipped
stream.AppendOne(new OrderShipped());

// But also, the order is done, so let's mark it as archived too!
stream.AppendOne(new Archived("Shipped"));

await session.SaveChangesAsync();
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/OptimizedCommandHandling.cs#L233-L252' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_handling_shiporder_and_emitting_archived_event' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

If an `Order` hasn't already shipped, one of the outcomes of that command handler executing is that the entire event stream
for the `Order` will be marked as archived.

::: info
This was originally conceived as a way to improve the Wolverine aggregate handler workflow usability while also encouraging
Marten users to take advantage of the event archiving feature.
:::
36 changes: 18 additions & 18 deletions docs/events/projections/read-aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ builder.Services.AddMarten(opts =>
opts.Projections.LiveStreamAggregation<Projections.Invoice>();
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L17-L32' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_live' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L18-L33' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_live' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Then we could use the `AggregateStreamAsync` API to read the current `Invoice` state for any
Expand All @@ -88,7 +88,7 @@ public static async Task read_live_invoice(
.Events.AggregateStreamAsync<Invoice>(invoiceId);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L35-L45' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_read_live_invoice' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L36-L46' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_read_live_invoice' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: info
Expand All @@ -110,7 +110,7 @@ builder.Services.AddMarten(opts =>
opts.Projections.Snapshot<Projections.Invoice>(SnapshotLifecycle.Inline);
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L49-L59' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_inline' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L50-L60' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_inline' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Then we can just treat the `Invoice` as any old Marten document (because it is) and use
Expand All @@ -127,7 +127,7 @@ public static async Task read_inline_invoice(
.LoadAsync<Invoice>(invoiceId);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L62-L72' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_read_inline_invoice' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L63-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_read_inline_invoice' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And lastly, if we wanted to run the `Invoice` snapshot updates as an asynchronous projection (maybe to take advantage
Expand All @@ -145,7 +145,7 @@ builder.Services.AddMarten(opts =>
})
.AddAsyncDaemon(DaemonMode.HotCold);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L76-L87' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_async' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L77-L88' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_async' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

We would still just the same `LoadAsync()` API, but you just hope that
Expand All @@ -166,7 +166,7 @@ builder.Services.AddMarten(opts =>
opts.Projections.Snapshot<Projections.Invoice>(SnapshotLifecycle.Inline);
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L49-L59' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_inline' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L50-L60' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_as_inline' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## FetchLatest <Badge type="tip" text="7.34" />
Expand Down Expand Up @@ -196,7 +196,7 @@ public static async Task read_latest(
.Events.FetchLatest<Projections.Invoice>(invoiceId);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L90-L101' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_read_latest_invoice' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L91-L102' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_read_latest_invoice' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Just to understand how this API works, under the covers, if `Invoice` is registered as:
Expand Down Expand Up @@ -225,7 +225,7 @@ builder.Services.AddMarten(opts =>
opts.Projections.Snapshot<Projections.Invoice>(SnapshotLifecycle.Inline);
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L105-L118' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_with_optimizations' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L106-L119' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configure_aggregate_with_optimizations' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: warning
Expand All @@ -247,10 +247,10 @@ fun, let's say we wrote a helper function like this:
```cs
public static class MutationExtensions
{
public static async Task<T> Mutate<T>(this IDocumentSession session, Guid id, Func<T, object[]> decider,
CancellationToken token = default) where T : class
public static async Task<Projections.Invoice> MutateInvoice(this IDocumentSession session, Guid id, Func<Projections.Invoice, IEnumerable<object>> decider,
CancellationToken token = default)
{
var stream = await session.Events.FetchForWriting<T>(id, token);
var stream = await session.Events.FetchForWriting<Projections.Invoice>(id, token);

// Decide what new events should be appended based on the current
// state of the aggregate and application logic
Expand All @@ -260,32 +260,32 @@ public static class MutationExtensions
// Persist any new events
await session.SaveChangesAsync(token);

return await session.Events.FetchLatest<T>(id, token);
return await session.Events.FetchLatest<Projections.Invoice>(id, token);
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L139-L160' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_mutation_extensions' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L140-L161' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_mutation_extensions' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And used it for a command handler something like this:

<!-- snippet: sample_invoice_approval_workflow_with_mutate -->
<a id='snippet-sample_invoice_approval_workflow_with_mutate'></a>
```cs
public static Task<Projections.Invoice> Approve(IDocumentSession session, Guid invoiceId)
public static Task Approve(IDocumentSession session, Guid invoiceId)
{
return session.Mutate<Projections.Invoice>(invoiceId, invoice =>
return session.MutateInvoice(invoiceId, invoice =>
{
if (invoice.Status != InvoiceStatus.Approved)
{
return new object[] { new InvoiceApproved() };
return [new InvoiceApproved()];
}

return Array.Empty<object>();
return [];
});
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L121-L136' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_invoice_approval_workflow_with_mutate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/FetchLatest.cs#L122-L137' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_invoice_approval_workflow_with_mutate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Okay, so for some context, if using the full fledged `UseIdentityMapForAggregates` + `FetchForWriting`, then `FetchLatest`
Expand Down
Loading

0 comments on commit 9ad8680

Please sign in to comment.