From 854005e6fc021a4b99c43d2d2160929b7afc6b86 Mon Sep 17 00:00:00 2001 From: JT Date: Fri, 14 Jun 2024 21:00:15 +0800 Subject: [PATCH] Modernize EventSourcing quickstart --- docs/configuration/hostbuilder.md | 4 +- docs/configuration/storeoptions.md | 11 +- docs/diagnostics.md | 1156 ++++++++--------- docs/documents/deletes.md | 20 +- docs/documents/hierarchies.md | 2 +- docs/documents/querying/compiled-queries.md | 10 +- .../projections/aggregate-projections.md | 36 +- docs/events/projections/async-daemon.md | 2 +- docs/events/projections/index.md | 55 +- docs/events/projections/ioc.md | 2 +- docs/events/querying.md | 4 +- docs/events/quickstart.md | 315 ++--- docs/events/storage.md | 15 +- .../Examples/StartStreamSamples.cs | 2 +- .../Examples/event_store_quickstart.cs | 134 -- .../Projections/QuestParty.cs | 5 +- .../projections_with_IoC_services.cs | 4 - src/EventSourcingTests/QuestTypes.cs | 5 +- .../delete_single_event_stream.cs | 2 +- ...et_committed_events_from_listener_Tests.cs | 2 +- .../marking_stream_as_isnew_on_capture.cs | 2 +- .../query_against_event_documents_Tests.cs | 2 +- src/Marten.sln | 7 + src/samples/DocsSamples/DocsSamples.csproj | 14 + .../DocsSamples/EventSourcingQuickstart.cs | 163 +++ src/samples/DocsSamples/ExampleConstants.cs | 6 + 26 files changed, 1001 insertions(+), 979 deletions(-) create mode 100644 src/samples/DocsSamples/DocsSamples.csproj create mode 100644 src/samples/DocsSamples/EventSourcingQuickstart.cs create mode 100644 src/samples/DocsSamples/ExampleConstants.cs diff --git a/docs/configuration/hostbuilder.md b/docs/configuration/hostbuilder.md index e18a956817..4f6c7a3ae2 100644 --- a/docs/configuration/hostbuilder.md +++ b/docs/configuration/hostbuilder.md @@ -244,7 +244,7 @@ public interface IConfigureMarten void Configure(IServiceProvider services, StoreOptions options); } ``` -snippet source | anchor +snippet source | anchor You could alternatively implement a custom `IConfigureMarten` (or `IConfigureMarten where T : IDocumentStore` if you're [working with multiple databases](#working-with-multiple-marten-databases)) class like so: @@ -308,7 +308,7 @@ public interface IAsyncConfigureMarten ValueTask Configure(StoreOptions options, CancellationToken cancellationToken); } ``` -snippet source | anchor +snippet source | anchor As an example from the tests, here's a custom version that uses the Feature Management service: diff --git a/docs/configuration/storeoptions.md b/docs/configuration/storeoptions.md index 67b9d59696..242214e5f1 100644 --- a/docs/configuration/storeoptions.md +++ b/docs/configuration/storeoptions.md @@ -156,9 +156,18 @@ public abstract class MartenAttribute: Attribute /// /// public virtual void Modify(DocumentMapping mapping, MemberInfo member) { } + + /// + /// When used with the automatic type discovery (assembly scanning), this will be called + /// to make registrations to the Marten configuration with the type that this attribute + /// decorates + /// + /// + /// + public virtual void Register(Type discoveredType, StoreOptions options){} } ``` -snippet source | anchor +snippet source | anchor And decorate either classes or individual field or properties on a document type, your custom attribute will be diff --git a/docs/diagnostics.md b/docs/diagnostics.md index a8b7f0c992..4e4ca37241 100644 --- a/docs/diagnostics.md +++ b/docs/diagnostics.md @@ -1,578 +1,578 @@ -# Diagnostics and Instrumentation - -So far, Marten has diagnostics, command logging, and unit of work life cycle tracking. - -For information on accessing and previewing the database schema objects generated by Marten, see [Marten and Postgres Schema](/schema/) - -## Listening for Document Store Events - -::: tip INFO -All of the functionality in this section was added as part of Marten v0.8 -::: - -Marten has a facility for listening and even intercepting document persistence events with the `IDocumentSessionListener` interface: - - - -```cs -public interface IChangeListener -{ - /// - /// Used to carry out actions on potentially changed projected documents generated and updated - /// during the execution of asynchronous projections. This will give you "at most once" delivery guarantees - /// - /// - /// - /// - /// - Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token); - - /// - /// Used to carry out actions on potentially changed projected documents generated and updated - /// during the execution of asynchronous projections. This will execute *before* database changes - /// are committed. Use this for "at least once" delivery guarantees. - /// - /// - /// - /// - /// - Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token); -} - -/// -/// Used to listen to and intercept operations within an IDocumentSession.SaveChanges()/SaveChangesAsync() -/// operation -/// -public interface IDocumentSessionListener -{ - /// - /// After an IDocumentSession is committed - /// - /// - /// - /// - /// - Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token); - - /// - /// Called just after IDocumentSession.SaveChanges() is called, but before - /// any database calls are made - /// - /// - void BeforeSaveChanges(IDocumentSession session); - - /// - /// Called just after IDocumentSession.SaveChanges() is called, - /// but before any database calls are made - /// - /// - /// - /// - Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token); - - /// - /// After an IDocumentSession is committed - /// - /// - /// - void AfterCommit(IDocumentSession session, IChangeSet commit); - - /// - /// Called after a document is loaded - /// - void DocumentLoaded(object id, object document); - - /// - /// Called after a document is explicitly added to a session - /// as a staged insert or update - /// - void DocumentAddedForStorage(object id, object document); -} -``` -snippet source | anchor - - -You can build and inject your own listeners by adding them to the `StoreOptions` object you use to configure a `DocumentStore`: - - - -```cs -var stub1 = new StubDocumentSessionListener(); -var stub2 = new StubDocumentSessionListener(); - -using (var store = SeparateStore(_ => - { - _.Connection(ConnectionSource.ConnectionString); - _.AutoCreateSchemaObjects = AutoCreate.All; - - _.Listeners.Add(stub1); - _.Listeners.Add(stub2); - })) -``` -snippet source | anchor - -```cs -var stub1 = new StubDocumentSessionListener(); -var stub2 = new StubDocumentSessionListener(); - -using (var store = SeparateStore(_ => - { - _.Connection(ConnectionSource.ConnectionString); - _.AutoCreateSchemaObjects = AutoCreate.All; - })) -``` -snippet source | anchor - - -The listeners can be used to modify an `IDocumentSession` and its related unit of work just before persisting. Marten itself will be using this mechanism -internally to perform projections in the future. - -The following fake, sample listener demonstrates how you can query into the pending changes before making a transactional commit, and also how to -query what was done after a commit is made: - - - -```cs -// DocumentSessionListenerBase is a helper abstract class in Marten -// with empty implementations of each method you may find helpful -public class SimpleSessionListener: DocumentSessionListenerBase -{ - public override void BeforeSaveChanges(IDocumentSession session) - { - // Use pending changes to preview what is about to be - // persisted - var pending = session.PendingChanges; - - // Careful here, Marten can only sort documents into "inserts" or "updates" based - // on whether or not Marten had to assign a new Id to that document upon DocumentStore() - pending.InsertsFor() - .Each(user => Debug.WriteLine($"New user: {user.UserName}")); - - pending.UpdatesFor() - .Each(user => Debug.WriteLine($"Updated user {user.UserName}")); - - pending.DeletionsFor() - .Each(d => Debug.WriteLine(d)); - - // This is a convenience method to find all the pending events - // organized into streams that will be appended to the event store - pending.Streams() - .Each(s => Debug.WriteLine(s)); - } - - public override void AfterCommit(IDocumentSession session, IChangeSet commit) - { - // See what was just persisted, and possibly carry out post - // commit actions - - var last = commit; - - last.Updated.Each(x => Debug.WriteLine($"{x} was updated")); - last.Deleted.Each(x => Debug.WriteLine($"{x} was deleted")); - last.Inserted.Each(x => Debug.WriteLine($"{x} was inserted")); - } -} -``` -snippet source | anchor - - -As of Marten 1.4, you can also register `IDocumentSessionListener` objects scoped to a particular session with the -`DocumentStore.OpenSession(SessionOptions)` signature. - -As of Marten v5, separate listeners will need to be registered for Document Store and Async Daemon. Adding listeners for Async Daemon are covered in the next section. - -## Listening for Async Daemon Events - -Use `AsyncListeners` to register session listeners that will ONLY be applied within the asynchronous daemon updates. - -::: tip INFO -Listeners will never get activated during projection rebuilds to safe guard against any side effects. -::: - -A sample listener: - - -```cs -public class FakeListener: IChangeListener -{ - public List Befores = new(); - public IList Changes = new List(); - - public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) - { - session.ShouldNotBeNull(); - Changes.Add(commit); - return Task.CompletedTask; - } - - public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) - { - session.ShouldNotBeNull(); - Befores.Add(commit); - - Changes.Count.ShouldBeLessThan(Befores.Count); - - return Task.CompletedTask; - } -} -``` -snippet source | anchor - - -Wiring a Async Daemon listener: - - -```cs -var listener = new FakeListener(); -StoreOptions(x => -{ - x.Projections.Add(new TripProjectionWithCustomName(), ProjectionLifecycle.Async); - x.Projections.AsyncListeners.Add(listener); -}); -``` -snippet source | anchor - - -## Custom Logging - -Marten v0.8 comes with a new mechanism to plug in custom logging to the `IDocumentStore`, `IQuerySession`, and `IDocumentSession` activity: - - - -```cs -/// -/// Records command usage, schema changes, and sessions within Marten -/// -public interface IMartenLogger -{ - /// - /// Called when the session is initialized - /// - /// - /// - IMartenSessionLogger StartSession(IQuerySession session); - - /// - /// Capture any DDL executed at runtime by Marten - /// - /// - void SchemaChange(string sql); -} - -/// -/// Use to create custom logging within an IQuerySession or IDocumentSession -/// -public interface IMartenSessionLogger -{ - /// - /// Log a command that executed successfully - /// - /// - void LogSuccess(NpgsqlCommand command); - - /// - /// Log a command that failed - /// - /// - /// - void LogFailure(NpgsqlCommand command, Exception ex); - - /// - /// Log a command that executed successfully - /// - /// - void LogSuccess(NpgsqlBatch batch); - - /// - /// Log a batch that failed - /// - /// - /// - void LogFailure(NpgsqlBatch batch, Exception ex); - - /// - /// Log a message for generic errors - /// - /// - /// - /// - void LogFailure(Exception ex, string message); - - /// - /// Called immediately after committing an IDocumentSession - /// through SaveChanges() or SaveChangesAsync() - /// - /// - /// - void RecordSavedChanges(IDocumentSession session, IChangeSet commit); - - /// - /// Called just before a command is to be executed. Use this to create - /// performance logging of Marten operations - /// - /// - public void OnBeforeExecute(NpgsqlCommand command); - - /// - /// Called just before a command is to be executed. Use this to create - /// performance logging of Marten operations - /// - /// - public void OnBeforeExecute(NpgsqlBatch batch); -} -``` -snippet source | anchor - - -To apply these logging abstractions, you can either plug your own `IMartenLogger` into the `StoreOptions` object and allow that default logger to create the individual session loggers: - - - -```cs -var store = DocumentStore.For(_ => -{ - _.Logger(new ConsoleMartenLogger()); -}); -``` -snippet source | anchor - - -You can also directly apply a session logger to any `IQuerySession` or `IDocumentSession` like this: - - - -```cs -using var session = store.LightweightSession(); -// Replace the logger for only this one session -session.Logger = new RecordingLogger(); -``` -snippet source | anchor - - -The session logging is a different abstraction specifically so that you _could_ track database commands issued per session. In effect, my own shop is going to use this capability to understand what HTTP endpoints or service bus message handlers are being unnecessarily chatty in their database interactions. We also hope that the contextual logging of commands per document session makes it easier to understand how our systems behave. - - - -```cs -public class ConsoleMartenLogger: IMartenLogger, IMartenSessionLogger -{ - private Stopwatch? _stopwatch; - - public IMartenSessionLogger StartSession(IQuerySession session) - { - return this; - } - - public void SchemaChange(string sql) - { - Console.WriteLine("Executing DDL change:"); - Console.WriteLine(sql); - Console.WriteLine(); - } - - public void LogSuccess(NpgsqlCommand command) - { - Console.WriteLine(command.CommandText); - foreach (var p in command.Parameters.OfType()) - Console.WriteLine($" {p.ParameterName}: {GetParameterValue(p)}"); - } - - public void LogSuccess(NpgsqlBatch batch) - { - foreach (var command in batch.BatchCommands) - { - Console.WriteLine(command.CommandText); - foreach (var p in command.Parameters.OfType()) - Console.WriteLine($" {p.ParameterName}: {GetParameterValue(p)}"); - } - } - - private static object? GetParameterValue(NpgsqlParameter p) - { - if (p.Value is IList enumerable) - { - var result = ""; - for (var i = 0; i < Math.Min(enumerable.Count, 5); i++) - { - result += $"[{i}] {enumerable[i]}; "; - } - if (enumerable.Count > 5) result += $" + {enumerable.Count - 5} more"; - return result; - } - return p.Value; - } - - public void LogFailure(NpgsqlCommand command, Exception ex) - { - Console.WriteLine("Postgresql command failed!"); - Console.WriteLine(command.CommandText); - foreach (var p in command.Parameters.OfType()) - Console.WriteLine($" {p.ParameterName}: {p.Value}"); - Console.WriteLine(ex); - } - - public void LogFailure(NpgsqlBatch batch, Exception ex) - { - Console.WriteLine("Postgresql command failed!"); - foreach (var command in batch.BatchCommands) - { - Console.WriteLine(command.CommandText); - foreach (var p in command.Parameters.OfType()) - Console.WriteLine($" {p.ParameterName}: {p.Value}"); - } - - Console.WriteLine(ex); - } - - public void LogFailure(Exception ex, string message) - { - Console.WriteLine("Failure: " + message); - Console.WriteLine(ex.ToString()); - } - - public void RecordSavedChanges(IDocumentSession session, IChangeSet commit) - { - _stopwatch?.Stop(); - - var lastCommit = commit; - Console.WriteLine( - $"Persisted {lastCommit.Updated.Count()} updates in {_stopwatch?.ElapsedMilliseconds ?? 0} ms, {lastCommit.Inserted.Count()} inserts, and {lastCommit.Deleted.Count()} deletions"); - } - - public void OnBeforeExecute(NpgsqlCommand command) - { - _stopwatch = new Stopwatch(); - _stopwatch.Start(); - } - - public void OnBeforeExecute(NpgsqlBatch batch) - { - _stopwatch = new Stopwatch(); - _stopwatch.Start(); - } -} -``` -snippet source | anchor - - -## Accessing Diagnostics - -All the diagnostics are going to be exposed off of the `IDocumentStore.Diagnostics` property. Today, the only capabilities are to get a preview of the generated storage code or a preview of the ADO.NET code that will be generated for a LINQ query. - -## Previewing LINQ Queries - -Let's say that we have a small document type called `Trade`: - - - -```cs -public class Trade -{ - public int Id { get; set; } - - [DuplicateField] - public double Value { get; set; } -} -``` -snippet source | anchor - - -The `[DuplicateField]` attribute directs Marten to duplicate the value of `Value` into a separate database field for more efficient querying. Now, let's say that we want to search for every `Trade` document with a value of over 2,000, but we want to see the SQL query that Marten will build for that query first: - - - -```cs -// store is the active IDocumentStore -var queryable = theStore.QuerySession().Query().Where(x => x.Value > 2000); -var cmd = queryable.ToCommand(FetchType.FetchMany); - -Debug.WriteLine(cmd.CommandText); -``` -snippet source | anchor - - -The sql string in our debug window for the code above is: - -```sql -select d.data from mt_doc_trade as d where d.value > :arg0 -``` - -## Session Specific Logging - -The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSession` like this example from the unit tests: - - - -```cs -// We frequently use this special marten logger per -// session to pipe Marten logging to the xUnit.Net output -theSession.Logger = new TestOutputMartenLogger(_output); -``` -snippet source | anchor - - -## Previewing the PostgreSQL Query Plan - -Marten has a helper to find and preview the [PostgreSQL EXPLAIN plan](http://www.postgresql.org/docs/9.5/static/using-explain.html) for a Linq query. Our hope is that this will be a valuable aid to teams who need face performance problems while using Marten. The syntax for fetching the EXPLAIN plan for the Linq query from the previous section is shown below: - - - -```cs -// Explain() is an extension method off of IQueryable -var plan = queryable.Explain(); -Console.WriteLine($"NodeType: {plan.NodeType}"); -Console.WriteLine($"RelationName: {plan.RelationName}"); -Console.WriteLine($"Alias: {plan.Alias}"); -Console.WriteLine($"StartupCost: {plan.StartupCost}"); -Console.WriteLine($"TotalCost: {plan.TotalCost}"); -Console.WriteLine($"PlanRows: {plan.PlanRows}"); -Console.WriteLine($"PlanWidth: {plan.PlanWidth}"); -``` -snippet source | anchor - - -The console output for the code below (on my box) was: - -```bash -NodeType: Seq Scan -RelationName: mt_doc_trade -Alias: d -StartupCost: 0 -TotalCost: 24.13 -PlanRows: 377 -PlanWidth: 36 -``` - -## Request Counting and Thresholds - -Marten has several facilities for improving system performance by reducing the number of network round trips to the server, but the first step maybe to -just understand what kinds of operations are being chatty in the first place. To that end, Marten exposes the request count for each `IQuerySession` or `IDocumentSession` that simply tells you how many commands have been issued to Postgresql by that session: - - - -```cs -using (var session = theStore.QuerySession()) -{ - var users = session.Query().ToList(); - var count = session.Query().Count(); - var any = session.Query().Any(); - - session.RequestCount.ShouldBe(3); -} -``` -snippet source | anchor - - -At this point, Marten does not have any built in support for asserting requests per session thresholds like other tools. While I think that we are uncomfortable with that functionality ever being turned on in production, it should be easily feasible to build those kinds of automated threshold testing like "fail the test if there were more than 25 requests issued for any given HTTP request." - -## Getting PostgreSQL server version - -Marten provides a helper method to fetch the PostgreSQL server version exposed via `IDocumentStore.Diagnostics`. This is helpful to enable feature toggles based on features available in PostgreSQL versions or perform any diagnostics. - - - -```cs -var pgVersion = theStore.Diagnostics.GetPostgresVersion(); -``` -snippet source | anchor - +# Diagnostics and Instrumentation + +So far, Marten has diagnostics, command logging, and unit of work life cycle tracking. + +For information on accessing and previewing the database schema objects generated by Marten, see [Marten and Postgres Schema](/schema/) + +## Listening for Document Store Events + +::: tip INFO +All of the functionality in this section was added as part of Marten v0.8 +::: + +Marten has a facility for listening and even intercepting document persistence events with the `IDocumentSessionListener` interface: + + + +```cs +public interface IChangeListener +{ + /// + /// Used to carry out actions on potentially changed projected documents generated and updated + /// during the execution of asynchronous projections. This will give you "at most once" delivery guarantees + /// + /// + /// + /// + /// + Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token); + + /// + /// Used to carry out actions on potentially changed projected documents generated and updated + /// during the execution of asynchronous projections. This will execute *before* database changes + /// are committed. Use this for "at least once" delivery guarantees. + /// + /// + /// + /// + /// + Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token); +} + +/// +/// Used to listen to and intercept operations within an IDocumentSession.SaveChanges()/SaveChangesAsync() +/// operation +/// +public interface IDocumentSessionListener +{ + /// + /// After an IDocumentSession is committed + /// + /// + /// + /// + /// + Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token); + + /// + /// Called just after IDocumentSession.SaveChanges() is called, but before + /// any database calls are made + /// + /// + void BeforeSaveChanges(IDocumentSession session); + + /// + /// Called just after IDocumentSession.SaveChanges() is called, + /// but before any database calls are made + /// + /// + /// + /// + Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token); + + /// + /// After an IDocumentSession is committed + /// + /// + /// + void AfterCommit(IDocumentSession session, IChangeSet commit); + + /// + /// Called after a document is loaded + /// + void DocumentLoaded(object id, object document); + + /// + /// Called after a document is explicitly added to a session + /// as a staged insert or update + /// + void DocumentAddedForStorage(object id, object document); +} +``` +snippet source | anchor + + +You can build and inject your own listeners by adding them to the `StoreOptions` object you use to configure a `DocumentStore`: + + + +```cs +var stub1 = new StubDocumentSessionListener(); +var stub2 = new StubDocumentSessionListener(); + +using (var store = SeparateStore(_ => + { + _.Connection(ConnectionSource.ConnectionString); + _.AutoCreateSchemaObjects = AutoCreate.All; + + _.Listeners.Add(stub1); + _.Listeners.Add(stub2); + })) +``` +snippet source | anchor + +```cs +var stub1 = new StubDocumentSessionListener(); +var stub2 = new StubDocumentSessionListener(); + +using (var store = SeparateStore(_ => + { + _.Connection(ConnectionSource.ConnectionString); + _.AutoCreateSchemaObjects = AutoCreate.All; + })) +``` +snippet source | anchor + + +The listeners can be used to modify an `IDocumentSession` and its related unit of work just before persisting. Marten itself will be using this mechanism +internally to perform projections in the future. + +The following fake, sample listener demonstrates how you can query into the pending changes before making a transactional commit, and also how to +query what was done after a commit is made: + + + +```cs +// DocumentSessionListenerBase is a helper abstract class in Marten +// with empty implementations of each method you may find helpful +public class SimpleSessionListener: DocumentSessionListenerBase +{ + public override void BeforeSaveChanges(IDocumentSession session) + { + // Use pending changes to preview what is about to be + // persisted + var pending = session.PendingChanges; + + // Careful here, Marten can only sort documents into "inserts" or "updates" based + // on whether or not Marten had to assign a new Id to that document upon DocumentStore() + pending.InsertsFor() + .Each(user => Debug.WriteLine($"New user: {user.UserName}")); + + pending.UpdatesFor() + .Each(user => Debug.WriteLine($"Updated user {user.UserName}")); + + pending.DeletionsFor() + .Each(d => Debug.WriteLine(d)); + + // This is a convenience method to find all the pending events + // organized into streams that will be appended to the event store + pending.Streams() + .Each(s => Debug.WriteLine(s)); + } + + public override void AfterCommit(IDocumentSession session, IChangeSet commit) + { + // See what was just persisted, and possibly carry out post + // commit actions + + var last = commit; + + last.Updated.Each(x => Debug.WriteLine($"{x} was updated")); + last.Deleted.Each(x => Debug.WriteLine($"{x} was deleted")); + last.Inserted.Each(x => Debug.WriteLine($"{x} was inserted")); + } +} +``` +snippet source | anchor + + +As of Marten 1.4, you can also register `IDocumentSessionListener` objects scoped to a particular session with the +`DocumentStore.OpenSession(SessionOptions)` signature. + +As of Marten v5, separate listeners will need to be registered for Document Store and Async Daemon. Adding listeners for Async Daemon are covered in the next section. + +## Listening for Async Daemon Events + +Use `AsyncListeners` to register session listeners that will ONLY be applied within the asynchronous daemon updates. + +::: tip INFO +Listeners will never get activated during projection rebuilds to safe guard against any side effects. +::: + +A sample listener: + + +```cs +public class FakeListener: IChangeListener +{ + public List Befores = new(); + public IList Changes = new List(); + + public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) + { + session.ShouldNotBeNull(); + Changes.Add(commit); + return Task.CompletedTask; + } + + public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) + { + session.ShouldNotBeNull(); + Befores.Add(commit); + + Changes.Count.ShouldBeLessThan(Befores.Count); + + return Task.CompletedTask; + } +} +``` +snippet source | anchor + + +Wiring a Async Daemon listener: + + +```cs +var listener = new FakeListener(); +StoreOptions(x => +{ + x.Projections.Add(new TripProjectionWithCustomName(), ProjectionLifecycle.Async); + x.Projections.AsyncListeners.Add(listener); +}); +``` +snippet source | anchor + + +## Custom Logging + +Marten v0.8 comes with a new mechanism to plug in custom logging to the `IDocumentStore`, `IQuerySession`, and `IDocumentSession` activity: + + + +```cs +/// +/// Records command usage, schema changes, and sessions within Marten +/// +public interface IMartenLogger +{ + /// + /// Called when the session is initialized + /// + /// + /// + IMartenSessionLogger StartSession(IQuerySession session); + + /// + /// Capture any DDL executed at runtime by Marten + /// + /// + void SchemaChange(string sql); +} + +/// +/// Use to create custom logging within an IQuerySession or IDocumentSession +/// +public interface IMartenSessionLogger +{ + /// + /// Log a command that executed successfully + /// + /// + void LogSuccess(NpgsqlCommand command); + + /// + /// Log a command that failed + /// + /// + /// + void LogFailure(NpgsqlCommand command, Exception ex); + + /// + /// Log a command that executed successfully + /// + /// + void LogSuccess(NpgsqlBatch batch); + + /// + /// Log a batch that failed + /// + /// + /// + void LogFailure(NpgsqlBatch batch, Exception ex); + + /// + /// Log a message for generic errors + /// + /// + /// + /// + void LogFailure(Exception ex, string message); + + /// + /// Called immediately after committing an IDocumentSession + /// through SaveChanges() or SaveChangesAsync() + /// + /// + /// + void RecordSavedChanges(IDocumentSession session, IChangeSet commit); + + /// + /// Called just before a command is to be executed. Use this to create + /// performance logging of Marten operations + /// + /// + public void OnBeforeExecute(NpgsqlCommand command); + + /// + /// Called just before a command is to be executed. Use this to create + /// performance logging of Marten operations + /// + /// + public void OnBeforeExecute(NpgsqlBatch batch); +} +``` +snippet source | anchor + + +To apply these logging abstractions, you can either plug your own `IMartenLogger` into the `StoreOptions` object and allow that default logger to create the individual session loggers: + + + +```cs +var store = DocumentStore.For(_ => +{ + _.Logger(new ConsoleMartenLogger()); +}); +``` +snippet source | anchor + + +You can also directly apply a session logger to any `IQuerySession` or `IDocumentSession` like this: + + + +```cs +using var session = store.LightweightSession(); +// Replace the logger for only this one session +session.Logger = new RecordingLogger(); +``` +snippet source | anchor + + +The session logging is a different abstraction specifically so that you _could_ track database commands issued per session. In effect, my own shop is going to use this capability to understand what HTTP endpoints or service bus message handlers are being unnecessarily chatty in their database interactions. We also hope that the contextual logging of commands per document session makes it easier to understand how our systems behave. + + + +```cs +public class ConsoleMartenLogger: IMartenLogger, IMartenSessionLogger +{ + private Stopwatch? _stopwatch; + + public IMartenSessionLogger StartSession(IQuerySession session) + { + return this; + } + + public void SchemaChange(string sql) + { + Console.WriteLine("Executing DDL change:"); + Console.WriteLine(sql); + Console.WriteLine(); + } + + public void LogSuccess(NpgsqlCommand command) + { + Console.WriteLine(command.CommandText); + foreach (var p in command.Parameters.OfType()) + Console.WriteLine($" {p.ParameterName}: {GetParameterValue(p)}"); + } + + public void LogSuccess(NpgsqlBatch batch) + { + foreach (var command in batch.BatchCommands) + { + Console.WriteLine(command.CommandText); + foreach (var p in command.Parameters.OfType()) + Console.WriteLine($" {p.ParameterName}: {GetParameterValue(p)}"); + } + } + + private static object? GetParameterValue(NpgsqlParameter p) + { + if (p.Value is IList enumerable) + { + var result = ""; + for (var i = 0; i < Math.Min(enumerable.Count, 5); i++) + { + result += $"[{i}] {enumerable[i]}; "; + } + if (enumerable.Count > 5) result += $" + {enumerable.Count - 5} more"; + return result; + } + return p.Value; + } + + public void LogFailure(NpgsqlCommand command, Exception ex) + { + Console.WriteLine("Postgresql command failed!"); + Console.WriteLine(command.CommandText); + foreach (var p in command.Parameters.OfType()) + Console.WriteLine($" {p.ParameterName}: {p.Value}"); + Console.WriteLine(ex); + } + + public void LogFailure(NpgsqlBatch batch, Exception ex) + { + Console.WriteLine("Postgresql command failed!"); + foreach (var command in batch.BatchCommands) + { + Console.WriteLine(command.CommandText); + foreach (var p in command.Parameters.OfType()) + Console.WriteLine($" {p.ParameterName}: {p.Value}"); + } + + Console.WriteLine(ex); + } + + public void LogFailure(Exception ex, string message) + { + Console.WriteLine("Failure: " + message); + Console.WriteLine(ex.ToString()); + } + + public void RecordSavedChanges(IDocumentSession session, IChangeSet commit) + { + _stopwatch?.Stop(); + + var lastCommit = commit; + Console.WriteLine( + $"Persisted {lastCommit.Updated.Count()} updates in {_stopwatch?.ElapsedMilliseconds ?? 0} ms, {lastCommit.Inserted.Count()} inserts, and {lastCommit.Deleted.Count()} deletions"); + } + + public void OnBeforeExecute(NpgsqlCommand command) + { + _stopwatch = new Stopwatch(); + _stopwatch.Start(); + } + + public void OnBeforeExecute(NpgsqlBatch batch) + { + _stopwatch = new Stopwatch(); + _stopwatch.Start(); + } +} +``` +snippet source | anchor + + +## Accessing Diagnostics + +All the diagnostics are going to be exposed off of the `IDocumentStore.Diagnostics` property. Today, the only capabilities are to get a preview of the generated storage code or a preview of the ADO.NET code that will be generated for a LINQ query. + +## Previewing LINQ Queries + +Let's say that we have a small document type called `Trade`: + + + +```cs +public class Trade +{ + public int Id { get; set; } + + [DuplicateField] + public double Value { get; set; } +} +``` +snippet source | anchor + + +The `[DuplicateField]` attribute directs Marten to duplicate the value of `Value` into a separate database field for more efficient querying. Now, let's say that we want to search for every `Trade` document with a value of over 2,000, but we want to see the SQL query that Marten will build for that query first: + + + +```cs +// store is the active IDocumentStore +var queryable = theStore.QuerySession().Query().Where(x => x.Value > 2000); +var cmd = queryable.ToCommand(FetchType.FetchMany); + +Debug.WriteLine(cmd.CommandText); +``` +snippet source | anchor + + +The sql string in our debug window for the code above is: + +```sql +select d.data from mt_doc_trade as d where d.value > :arg0 +``` + +## Session Specific Logging + +The `IMartenLogger` can be swapped out on any `IQuerySession` or `IDocumentSession` like this example from the unit tests: + + + +```cs +// We frequently use this special marten logger per +// session to pipe Marten logging to the xUnit.Net output +theSession.Logger = new TestOutputMartenLogger(_output); +``` +snippet source | anchor + + +## Previewing the PostgreSQL Query Plan + +Marten has a helper to find and preview the [PostgreSQL EXPLAIN plan](http://www.postgresql.org/docs/9.5/static/using-explain.html) for a Linq query. Our hope is that this will be a valuable aid to teams who need face performance problems while using Marten. The syntax for fetching the EXPLAIN plan for the Linq query from the previous section is shown below: + + + +```cs +// Explain() is an extension method off of IQueryable +var plan = queryable.Explain(); +Console.WriteLine($"NodeType: {plan.NodeType}"); +Console.WriteLine($"RelationName: {plan.RelationName}"); +Console.WriteLine($"Alias: {plan.Alias}"); +Console.WriteLine($"StartupCost: {plan.StartupCost}"); +Console.WriteLine($"TotalCost: {plan.TotalCost}"); +Console.WriteLine($"PlanRows: {plan.PlanRows}"); +Console.WriteLine($"PlanWidth: {plan.PlanWidth}"); +``` +snippet source | anchor + + +The console output for the code below (on my box) was: + +```bash +NodeType: Seq Scan +RelationName: mt_doc_trade +Alias: d +StartupCost: 0 +TotalCost: 24.13 +PlanRows: 377 +PlanWidth: 36 +``` + +## Request Counting and Thresholds + +Marten has several facilities for improving system performance by reducing the number of network round trips to the server, but the first step maybe to +just understand what kinds of operations are being chatty in the first place. To that end, Marten exposes the request count for each `IQuerySession` or `IDocumentSession` that simply tells you how many commands have been issued to Postgresql by that session: + + + +```cs +using (var session = theStore.QuerySession()) +{ + var users = session.Query().ToList(); + var count = session.Query().Count(); + var any = session.Query().Any(); + + session.RequestCount.ShouldBe(3); +} +``` +snippet source | anchor + + +At this point, Marten does not have any built in support for asserting requests per session thresholds like other tools. While I think that we are uncomfortable with that functionality ever being turned on in production, it should be easily feasible to build those kinds of automated threshold testing like "fail the test if there were more than 25 requests issued for any given HTTP request." + +## Getting PostgreSQL server version + +Marten provides a helper method to fetch the PostgreSQL server version exposed via `IDocumentStore.Diagnostics`. This is helpful to enable feature toggles based on features available in PostgreSQL versions or perform any diagnostics. + + + +```cs +var pgVersion = theStore.Diagnostics.GetPostgresVersion(); +``` +snippet source | anchor + diff --git a/docs/documents/deletes.md b/docs/documents/deletes.md index ad330ed168..cf0c401169 100644 --- a/docs/documents/deletes.md +++ b/docs/documents/deletes.md @@ -285,7 +285,7 @@ public void query_is_soft_deleted_docs() .Single().ShouldBe("bar"); } ``` -snippet source | anchor +snippet source | anchor ### Fetching Documents Deleted Before or After a Specific Time @@ -391,7 +391,23 @@ The easiest way to expose the metadata about whether or not a document is delete and when it was deleted is to implement the `ISoftDeleted` interface as shown in this sample document: -snippet: sample_implementing_ISoftDeleted + + +```cs +public class MySoftDeletedDoc: ISoftDeleted +{ + // Always have to have an identity of some sort + public Guid Id { get; set; } + + // Is the document deleted? From ISoftDeleted + public bool Deleted { get; set; } + + // When was the document deleted? From ISoftDeleted + public DateTimeOffset? DeletedAt { get; set; } +} +``` +snippet source | anchor + Implementing `ISoftDeleted` on your document means that: diff --git a/docs/documents/hierarchies.md b/docs/documents/hierarchies.md index 26053af4ca..a596d9caf6 100644 --- a/docs/documents/hierarchies.md +++ b/docs/documents/hierarchies.md @@ -116,7 +116,7 @@ public query_with_inheritance(ITestOutputHelper output) // _.Schema.For().AddSubClassHierarchy(); // this, however, will use the assembly // of type ISmurf to get all its' subclasses/implementations. - // In projects with many types, this approach will be undvisable. + // In projects with many types, this approach will be unadvisable. _.Connection(ConnectionSource.ConnectionString); _.AutoCreateSchemaObjects = AutoCreate.All; diff --git a/docs/documents/querying/compiled-queries.md b/docs/documents/querying/compiled-queries.md index 94a4138445..661f972a22 100644 --- a/docs/documents/querying/compiled-queries.md +++ b/docs/documents/querying/compiled-queries.md @@ -25,12 +25,12 @@ All compiled queries are classes that implement the `ICompiledQuery ```cs -public interface ICompiledQuery +public interface ICompiledQuery : ICompiledQueryMarker where TDoc: notnull { Expression, TOut>> QueryIs(); } ``` -snippet source | anchor +snippet source | anchor In its simplest usage, let's say that we want to find the first user document with a certain first name. That class would look like this: @@ -175,7 +175,7 @@ public interface ICompiledListQuery: ICompiledListQuery { } ``` -snippet source | anchor +snippet source | anchor A sample usage of this type of query is shown below: @@ -206,7 +206,7 @@ public interface ICompiledListQuery: ICompiledQuerysnippet source | anchor +snippet source | anchor A sample usage of this type of query is shown below: @@ -428,7 +428,7 @@ public interface ICompiledQuery: ICompiledQuery { } ``` -snippet source | anchor +snippet source | anchor And an example: diff --git a/docs/events/projections/aggregate-projections.md b/docs/events/projections/aggregate-projections.md index 4be33dae09..85676b1614 100644 --- a/docs/events/projections/aggregate-projections.md +++ b/docs/events/projections/aggregate-projections.md @@ -7,28 +7,30 @@ view called `QuestParty` that creates an aggregated view of `MembersJoined`, `Me ```cs -public class QuestParty +public sealed record QuestParty(Guid Id, List Members) { - public List Members { get; set; } = new(); - public IList Slayed { get; } = new List(); - public string Key { get; set; } - public string Name { get; set; } - - // In this particular case, this is also the stream id for the quest events - public Guid Id { get; set; } - // These methods take in events and update the QuestParty - public void Apply(MembersJoined joined) => Members.Fill(joined.Members); - public void Apply(MembersDeparted departed) => Members.RemoveAll(x => departed.Members.Contains(x)); - public void Apply(QuestStarted started) => Name = started.Name; + public static QuestParty Create(QuestStarted started) => new(started.QuestId, []); + public static QuestParty Apply(MembersJoined joined, QuestParty party) => + party with + { + Members = party.Members.Union(joined.Members).ToList() + }; - public override string ToString() - { - return $"Quest party '{Name}' is {Members.Join(", ")}"; - } + public static QuestParty Apply(MembersDeparted departed, QuestParty party) => + party with + { + Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList() + }; + + public static QuestParty Apply(MembersEscaped escaped, QuestParty party) => + party with + { + Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList() + }; } ``` -snippet source | anchor +snippet source | anchor Once again, here's the class diagram of the key projection types inside of Marten, but please note the `SingleStreamProjection`: diff --git a/docs/events/projections/async-daemon.md b/docs/events/projections/async-daemon.md index fe37a3781c..a56be11c98 100644 --- a/docs/events/projections/async-daemon.md +++ b/docs/events/projections/async-daemon.md @@ -404,7 +404,7 @@ public static async Task UseAsyncDaemon(IDocumentStore store, CancellationToken snippet source | anchor -## Open Telemetry and Metrics ~~~~ +## Open Telemetry and Metrics ::: info All of these facilities are used automatically by Marten. diff --git a/docs/events/projections/index.md b/docs/events/projections/index.md index 2c93a81595..5c368d5be1 100644 --- a/docs/events/projections/index.md +++ b/docs/events/projections/index.md @@ -40,28 +40,30 @@ Sticking with the fantasy theme, the `QuestParty` class shown below could be use ```cs -public class QuestParty +public sealed record QuestParty(Guid Id, List Members) { - public List Members { get; set; } = new(); - public IList Slayed { get; } = new List(); - public string Key { get; set; } - public string Name { get; set; } - - // In this particular case, this is also the stream id for the quest events - public Guid Id { get; set; } - // These methods take in events and update the QuestParty - public void Apply(MembersJoined joined) => Members.Fill(joined.Members); - public void Apply(MembersDeparted departed) => Members.RemoveAll(x => departed.Members.Contains(x)); - public void Apply(QuestStarted started) => Name = started.Name; + public static QuestParty Create(QuestStarted started) => new(started.QuestId, []); + public static QuestParty Apply(MembersJoined joined, QuestParty party) => + party with + { + Members = party.Members.Union(joined.Members).ToList() + }; - public override string ToString() - { - return $"Quest party '{Name}' is {Members.Join(", ")}"; - } + public static QuestParty Apply(MembersDeparted departed, QuestParty party) => + party with + { + Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList() + }; + + public static QuestParty Apply(MembersEscaped escaped, QuestParty party) => + party with + { + Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList() + }; } ``` -snippet source | anchor +snippet source | anchor Marten provides the ability to use `IEvent` metadata within your projections, assuming that you're not trying to run the aggregations inline. @@ -126,20 +128,17 @@ You can always fetch a stream of events and build an aggregate completely live f ```cs -await using (var session = store.LightweightSession()) -{ - // questId is the id of the stream - var party = session.Events.AggregateStream(questId); - Console.WriteLine(party); +await using var session2 = store.LightweightSession(); +// questId is the id of the stream +var party = await session2.Events.AggregateStreamAsync(questId); - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); +var party_at_version_3 = await session2.Events + .AggregateStreamAsync(questId, 3); - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); -} +var party_yesterday = await session2.Events + .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); ``` -snippet source | anchor +snippet source | anchor There is also a matching asynchronous `AggregateStreamAsync()` mechanism as well. Additionally, you can do stream aggregations in batch queries with diff --git a/docs/events/projections/ioc.md b/docs/events/projections/ioc.md index b8b48ece50..da6a2ee554 100644 --- a/docs/events/projections/ioc.md +++ b/docs/events/projections/ioc.md @@ -56,7 +56,7 @@ public class ProductProjection: CustomProjection } } ``` -snippet source | anchor +snippet source | anchor Now, we *want* to use this projection at runtime within Marten, and need to register the projection diff --git a/docs/events/querying.md b/docs/events/querying.md index bf51fc74bd..721f72c42b 100644 --- a/docs/events/querying.md +++ b/docs/events/querying.md @@ -33,7 +33,7 @@ public async Task load_event_stream_async(IDocumentSession session, Guid streamI .FetchStreamAsync(streamId, timestamp: DateTime.UtcNow.AddDays(-1)); } ``` -snippet source | anchor +snippet source | anchor The data returned is a list of `IEvent` objects, where each is a (internal) strongly-typed `Event` object shown below: @@ -260,7 +260,7 @@ public async Task load_a_single_event_asynchronously(IDocumentSession session, G var event2 = await session.Events.LoadAsync(eventId); } ``` -snippet source | anchor +snippet source | anchor ## Querying Directly Against Event Data diff --git a/docs/events/quickstart.md b/docs/events/quickstart.md index 85c01e2463..b77f64a78f 100644 --- a/docs/events/quickstart.md +++ b/docs/events/quickstart.md @@ -1,144 +1,25 @@ # Event Store Quick Start -There is not anything special you need to do to enable the event store functionality in Marten, and it obeys the same rules about automatic schema generation described in [schema](/schema/). Marten is just a client library, and there's nothing to install other than the Marten NuGet. +There's nothing special you need to do to enable the event store functionality in Marten, it obeys the same rules of automatic schema generation as described in [schema](/schema/). Given you've followed the [Getting Started](/getting-started) guide, you're all ready to go. Because I’ve read way too much epic fantasy fiction, my sample problem domain is an application that records, analyses, and visualizes the status of heroic quests (destroying the One Ring, recovering Aldur's Orb, recovering the Horn of Valere, etc.). During a quest, you may want to record events like: ```cs -public class ArrivedAtLocation -{ - public int Day { get; set; } - - public string Location { get; set; } - - public override string ToString() - { - return $"Arrived at {Location} on Day {Day}"; - } -} +public sealed record ArrivedAtLocation(Guid QuestId, int Day, string Location); -public class MembersJoined -{ - public MembersJoined() - { - } - - public MembersJoined(int day, string location, params string[] members) - { - Day = day; - Location = location; - Members = members; - } - - public Guid QuestId { get; set; } - - public int Day { get; set; } - - public string Location { get; set; } - - public string[] Members { get; set; } - - public override string ToString() - { - return $"Members {Members.Join(", ")} joined at {Location} on Day {Day}"; - } - - protected bool Equals(MembersJoined other) - { - return QuestId.Equals(other.QuestId) && Day == other.Day && Location == other.Location && Members.SequenceEqual(other.Members); - } - - public override bool Equals(object obj) - { - if (ReferenceEquals(null, obj)) return false; - if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != this.GetType()) return false; - return Equals((MembersJoined) obj); - } - - public override int GetHashCode() - { - return HashCode.Combine(QuestId, Day, Location, Members); - } -} - -public class QuestStarted -{ - public string Name { get; set; } - public Guid Id { get; set; } - - public override string ToString() - { - return $"Quest {Name} started"; - } - - protected bool Equals(QuestStarted other) - { - return Name == other.Name && Id.Equals(other.Id); - } - - public override bool Equals(object obj) - { - if (ReferenceEquals(null, obj)) return false; - if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != this.GetType()) return false; - return Equals((QuestStarted) obj); - } - - public override int GetHashCode() - { - return HashCode.Combine(Name, Id); - } -} - -public class QuestEnded -{ - public string Name { get; set; } - public Guid Id { get; set; } - - public override string ToString() - { - return $"Quest {Name} ended"; - } -} - -public class MembersDeparted -{ - public Guid Id { get; set; } +public sealed record MembersJoined(Guid QuestId, int Day, string Location, string[] Members); - public Guid QuestId { get; set; } +public sealed record QuestStarted(Guid QuestId, string Name); - public int Day { get; set; } +public sealed record QuestEnded(Guid QuestId, string Name); - public string Location { get; set; } +public sealed record MembersDeparted(Guid QuestId, int Day, string Location, string[] Members); - public string[] Members { get; set; } - - public override string ToString() - { - return $"Members {Members.Join(", ")} departed at {Location} on Day {Day}"; - } -} - -public class MembersEscaped -{ - public Guid Id { get; set; } - - public Guid QuestId { get; set; } - - public string Location { get; set; } - - public string[] Members { get; set; } - - public override string ToString() - { - return $"Members {Members.Join(", ")} escaped from {Location}"; - } -} +public sealed record MembersEscaped(Guid QuestId, string Location, string[] Members); ``` -snippet source | anchor +snippet source | anchor @@ -146,99 +27,151 @@ public class MembersEscaped ```cs var store = DocumentStore.For(_ => { - _.Connection(ConnectionSource.ConnectionString); - _.Projections.Snapshot(SnapshotLifecycle.Inline); + _.Connection(ExampleConstants.ConnectionString); }); var questId = Guid.NewGuid(); -await using (var session = store.LightweightSession()) -{ - var started = new QuestStarted { Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Sam"); +await using var session = store.LightweightSession(); +var started = new QuestStarted(questId, "Destroy the One Ring"); +var joined1 = new MembersJoined(questId,1, "Hobbiton", ["Frodo", "Sam"]); - // Start a brand new stream and commit the new events as - // part of a transaction - session.Events.StartStream(questId, started, joined1); +// Start a brand new stream and commit the new events as +// part of a transaction +session.Events.StartStream(questId, started, joined1); - // Append more events to the same stream - var joined2 = new MembersJoined(3, "Buckland", "Merry", "Pippen"); - var joined3 = new MembersJoined(10, "Bree", "Aragorn"); - var arrived = new ArrivedAtLocation { Day = 15, Location = "Rivendell" }; - session.Events.Append(questId, joined2, joined3, arrived); +// Append more events to the same stream +var joined2 = new MembersJoined(questId,3, "Buckland", ["Merry", "Pippen"]); +var joined3 = new MembersJoined(questId,10, "Bree", ["Aragorn"]); +var arrived = new ArrivedAtLocation(questId, 15, "Rivendell"); +session.Events.Append(questId, joined2, joined3, arrived); - // Save the pending changes to db - await session.SaveChangesAsync(); -} +// Save the pending changes to db +await session.SaveChangesAsync(); ``` -snippet source | anchor +snippet source | anchor -In addition to generic `StartStream`, `IEventStore` has a non-generic `StartStream` overload that let you pass explicit type. +#### Aggregating Events - - +At some point we would like to know what members are currently part of the quest party. To keep things simple, we're going to use Marten's _live_ stream aggregation feature to model a `QuestParty` that updates itself based on our events: + + + ```cs -await using (var session = store.LightweightSession()) +public sealed record QuestParty(Guid Id, List Members) { - var started = new QuestStarted { Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Sam"); - - // Start a brand new stream and commit the new events as - // part of a transaction - session.Events.StartStream(typeof(Quest), questId, started, joined1); - await session.SaveChangesAsync(); + // These methods take in events and update the QuestParty + public static QuestParty Create(QuestStarted started) => new(started.QuestId, []); + public static QuestParty Apply(MembersJoined joined, QuestParty party) => + party with + { + Members = party.Members.Union(joined.Members).ToList() + }; + + public static QuestParty Apply(MembersDeparted departed, QuestParty party) => + party with + { + Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList() + }; + + public static QuestParty Apply(MembersEscaped escaped, QuestParty party) => + party with + { + Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList() + }; } ``` -snippet source | anchor +snippet source | anchor -Now, we would at some point like to see the current state of the quest party to check up on where they're at, who is in the party, and maybe how many monsters they've slain along the way. To keep things simple, we're going to use Marten's live stream aggregation feature to model a `QuestParty` that can update itself based on our events: +Next, we'll use the live projection to aggregate the quest stream for a single quest party like this: - - + + ```cs -public class QuestParty -{ - public List Members { get; set; } = new(); - public IList Slayed { get; } = new List(); - public string Key { get; set; } - public string Name { get; set; } +await using var session2 = store.LightweightSession(); +// questId is the id of the stream +var party = await session2.Events.AggregateStreamAsync(questId); - // In this particular case, this is also the stream id for the quest events - public Guid Id { get; set; } +var party_at_version_3 = await session2.Events + .AggregateStreamAsync(questId, 3); + +var party_yesterday = await session2.Events + .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); +``` +snippet source | anchor + + +Simple, right? The above code will load the events from the database and run them through the `Create` & `Apply` handlers of the `QuestParty` projection, returning the current state of our party. + +What about the quest itself? On top of seeing our in-progress quest, we also want the ability to query our entire history of past quests. For this, we'll create an _inline_ projection that persists our Quest state to the database as the events are being written: + + + +```cs +public sealed record Quest(Guid Id, List Members, List Slayed, string Name, bool isFinished); + +public sealed class QuestProjection: SingleStreamProjection +{ + public static Quest Create(QuestStarted started) => new(started.QuestId, [], [], started.Name, false); + public static Quest Apply(MembersJoined joined, Quest party) => + party with + { + Members = party.Members.Union(joined.Members).ToList() + }; + + public static Quest Apply(MembersDeparted departed, Quest party) => + party with + { + Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList() + }; + + public static Quest Apply(MembersEscaped escaped, Quest party) => + party with + { + Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList() + }; + + public static Quest Apply(QuestEnded ended, Quest party) => + party with { isFinished = true }; - // These methods take in events and update the QuestParty - public void Apply(MembersJoined joined) => Members.Fill(joined.Members); - public void Apply(MembersDeparted departed) => Members.RemoveAll(x => departed.Members.Contains(x)); - public void Apply(QuestStarted started) => Name = started.Name; - - public override string ToString() - { - return $"Quest party '{Name}' is {Members.Join(", ")}"; - } } ``` -snippet source | anchor +snippet source | anchor -And next, we'll use a live projection to build an aggregate for a single quest party like this: +Our projection should be registered to the document store like so: - - + + ```cs -await using (var session = store.LightweightSession()) +var store = DocumentStore.For(_ => { - // questId is the id of the stream - var party = session.Events.AggregateStream(questId); - Console.WriteLine(party); + _.Connection(ExampleConstants.ConnectionString); + _.Projections.Add(ProjectionLifecycle.Inline); // [!code ++] +}); +``` +snippet source | anchor + - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); +Then we can persist some events and immediately query the state of our quest: - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); -} + + +```cs +await using var session = store.LightweightSession(); + +var started = new QuestStarted(questId, "Destroy the One Ring"); +var joined1 = new MembersJoined(questId, 1, "Hobbiton", ["Frodo", "Sam"]); + +session.Events.StartStream(questId, started, joined1); +await session.SaveChangesAsync(); + +// we can now query the quest state like any other Marten document +var questState = await session.LoadAsync(questId); + +var finishedQuests = await session.Query().Where(x => x.isFinished).ToListAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/events/storage.md b/docs/events/storage.md index 2c8f873d27..6c1c4aa688 100644 --- a/docs/events/storage.md +++ b/docs/events/storage.md @@ -119,4 +119,17 @@ you have the option to add some extra, pre-canned indexes. Right now the only op is to add a unique index back on the `id` column that would be useful for references to external systems like so: -snippet: sample_using_optional_event_store_indexes + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection("some connection string"); + + // Add the unique index to the id field + opts.Events.EnableUniqueIndexOnEventId = true; +}); +``` +snippet source | anchor + diff --git a/src/EventSourcingTests/Examples/StartStreamSamples.cs b/src/EventSourcingTests/Examples/StartStreamSamples.cs index 8f5d8a1992..95f71f3bba 100644 --- a/src/EventSourcingTests/Examples/StartStreamSamples.cs +++ b/src/EventSourcingTests/Examples/StartStreamSamples.cs @@ -110,4 +110,4 @@ private object[] determineNewEvents(Guid streamId) { throw new NotImplementedException(); } -} \ No newline at end of file +} diff --git a/src/EventSourcingTests/Examples/event_store_quickstart.cs b/src/EventSourcingTests/Examples/event_store_quickstart.cs index b69505ec56..6b8be3c519 100644 --- a/src/EventSourcingTests/Examples/event_store_quickstart.cs +++ b/src/EventSourcingTests/Examples/event_store_quickstart.cs @@ -1,119 +1,11 @@ using System; -using System.Linq; using System.Threading.Tasks; -using JasperFx.Core; -using EventSourcingTests.Projections; using Marten; -using Marten.Events.Projections; -using Marten.Testing.Harness; -using Shouldly; namespace EventSourcingTests.Examples; public class event_store_quickstart { - public async Task capture_events() - { - #region sample_event-store-quickstart - - var store = DocumentStore.For(_ => - { - _.Connection(ConnectionSource.ConnectionString); - _.Projections.Snapshot(SnapshotLifecycle.Inline); - }); - - var questId = Guid.NewGuid(); - - await using (var session = store.LightweightSession()) - { - var started = new QuestStarted { Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Sam"); - - // Start a brand new stream and commit the new events as - // part of a transaction - session.Events.StartStream(questId, started, joined1); - - // Append more events to the same stream - var joined2 = new MembersJoined(3, "Buckland", "Merry", "Pippen"); - var joined3 = new MembersJoined(10, "Bree", "Aragorn"); - var arrived = new ArrivedAtLocation { Day = 15, Location = "Rivendell" }; - session.Events.Append(questId, joined2, joined3, arrived); - - // Save the pending changes to db - await session.SaveChangesAsync(); - } - - #endregion - - - #region sample_event-store-start-stream-with-explicit-type - - await using (var session = store.LightweightSession()) - { - var started = new QuestStarted { Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Sam"); - - // Start a brand new stream and commit the new events as - // part of a transaction - session.Events.StartStream(typeof(Quest), questId, started, joined1); - await session.SaveChangesAsync(); - } - - #endregion - - #region sample_event-store-start-stream-with-no-type - - await using (var session = store.LightweightSession()) - { - var started = new QuestStarted { Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Sam"); - - // Start a brand new stream and commit the new events as - // part of a transaction - // no stream type will be stored in database - session.Events.StartStream(questId, started, joined1); - await session.SaveChangesAsync(); - } - - #endregion - - #region sample_events-fetching-stream - - await using (var session = store.LightweightSession()) - { - var events = await session.Events.FetchStreamAsync(questId); - events.Each(evt => - { - Console.WriteLine($"{evt.Version}.) {evt.Data}"); - }); - } - - #endregion - - #region sample_events-aggregate-on-the-fly - - await using (var session = store.LightweightSession()) - { - // questId is the id of the stream - var party = session.Events.AggregateStream(questId); - Console.WriteLine(party); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); - } - - #endregion - - await using (var session = store.LightweightSession()) - { - var party = session.Load(questId); - Console.WriteLine(party); - } - } - #region sample_using-fetch-stream public void load_event_stream(IDocumentSession session, Guid streamId) @@ -166,30 +58,4 @@ public async Task load_a_single_event_asynchronously(IDocumentSession session, G #endregion - #region sample_using_live_transformed_events - - public void using_live_transformed_events(IDocumentSession session) - { - var started = new QuestStarted { Name = "Find the Orb" }; - var joined = new MembersJoined - { - Day = 2, Location = "Faldor's Farm", Members = new string[] { "Garion", "Polgara", "Belgarath" } - }; - var slayed1 = new MonsterSlayed { Name = "Troll" }; - var slayed2 = new MonsterSlayed { Name = "Dragon" }; - - MembersJoined joined2 = - new MembersJoined { Day = 5, Location = "Sendaria", Members = new string[] { "Silk", "Barak" } }; - - session.Events.StartStream(started, joined, slayed1, slayed2); - session.SaveChanges(); - - // Our MonsterDefeated documents are created inline - // with the SaveChanges() call above and are available - // for querying - session.Query().Count() - .ShouldBe(2); - } - - #endregion } diff --git a/src/EventSourcingTests/Projections/QuestParty.cs b/src/EventSourcingTests/Projections/QuestParty.cs index c9a2b87da8..eb1d61b39c 100644 --- a/src/EventSourcingTests/Projections/QuestParty.cs +++ b/src/EventSourcingTests/Projections/QuestParty.cs @@ -2,10 +2,11 @@ using System.Collections.Generic; using System.Linq; using JasperFx.Core; +using Marten.Events.Aggregation; namespace EventSourcingTests.Projections; -#region sample_QuestParty + public class QuestParty { public List Members { get; set; } = new(); @@ -27,7 +28,7 @@ public override string ToString() return $"Quest party '{Name}' is {Members.Join(", ")}"; } } -#endregion + public class QuestFinishingParty: QuestParty { diff --git a/src/EventSourcingTests/Projections/projections_with_IoC_services.cs b/src/EventSourcingTests/Projections/projections_with_IoC_services.cs index 088cbe5388..21cdc1e034 100644 --- a/src/EventSourcingTests/Projections/projections_with_IoC_services.cs +++ b/src/EventSourcingTests/Projections/projections_with_IoC_services.cs @@ -229,8 +229,6 @@ public async Task can_apply_database_changes_at_runtime_with_projection_with_ser [Fact] public async Task use_projection_as_singleton_and_inline_on_martenStore() { - #region sample_registering_projection_built_by_services - using var host = await Host.CreateDefaultBuilder() .ConfigureServices(services => { @@ -249,8 +247,6 @@ public async Task use_projection_as_singleton_and_inline_on_martenStore() }) .StartAsync(); - #endregion - var store = host.Services.GetRequiredService(); await using var session = store.LightweightSession(); diff --git a/src/EventSourcingTests/QuestTypes.cs b/src/EventSourcingTests/QuestTypes.cs index 8222ae055c..2ac5362667 100644 --- a/src/EventSourcingTests/QuestTypes.cs +++ b/src/EventSourcingTests/QuestTypes.cs @@ -9,8 +9,7 @@ public class Quest public Guid Id { get; set; } } -#region sample_sample-events -public class ArrivedAtLocation +public class ArrivedAtLocationT { public int Day { get; set; } @@ -141,8 +140,6 @@ public override string ToString() } } -#endregion - public class Issue { public Guid Id { get; set; } diff --git a/src/EventSourcingTests/delete_single_event_stream.cs b/src/EventSourcingTests/delete_single_event_stream.cs index ec449e1b43..fe6783a147 100644 --- a/src/EventSourcingTests/delete_single_event_stream.cs +++ b/src/EventSourcingTests/delete_single_event_stream.cs @@ -208,4 +208,4 @@ public void delete_stream_by_string_key_multi_tenanted() } } -} \ No newline at end of file +} diff --git a/src/EventSourcingTests/get_committed_events_from_listener_Tests.cs b/src/EventSourcingTests/get_committed_events_from_listener_Tests.cs index de7acdcfa4..cd522362d6 100644 --- a/src/EventSourcingTests/get_committed_events_from_listener_Tests.cs +++ b/src/EventSourcingTests/get_committed_events_from_listener_Tests.cs @@ -107,4 +107,4 @@ public void get_correct_events_across_multiple_stream() } -} \ No newline at end of file +} diff --git a/src/EventSourcingTests/marking_stream_as_isnew_on_capture.cs b/src/EventSourcingTests/marking_stream_as_isnew_on_capture.cs index 33fe4af7b3..1a08165a19 100644 --- a/src/EventSourcingTests/marking_stream_as_isnew_on_capture.cs +++ b/src/EventSourcingTests/marking_stream_as_isnew_on_capture.cs @@ -46,4 +46,4 @@ public void should_be_existing_stream_on_append_event() public marking_stream_as_isnew_on_capture(DefaultStoreFixture fixture) : base(fixture) { } -} \ No newline at end of file +} diff --git a/src/EventSourcingTests/query_against_event_documents_Tests.cs b/src/EventSourcingTests/query_against_event_documents_Tests.cs index ad0fcf04f9..b3279f1a82 100644 --- a/src/EventSourcingTests/query_against_event_documents_Tests.cs +++ b/src/EventSourcingTests/query_against_event_documents_Tests.cs @@ -237,4 +237,4 @@ public query_against_event_documents_Tests(ITestOutputHelper output) _output = output; theStore.Advanced.Clean.DeleteAllEventData(); } -} \ No newline at end of file +} diff --git a/src/Marten.sln b/src/Marten.sln index cfc4a793cc..10c05409b5 100644 --- a/src/Marten.sln +++ b/src/Marten.sln @@ -106,6 +106,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "OData", "OData", "{93FC8079 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AspNetODataWithMarten", "AspNetODataWithMarten\AspNetODataWithMarten.csproj", "{93298A4C-2B88-4EF3-9FC8-6B2AEF52F628}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DocsSamples", "samples\DocsSamples\DocsSamples.csproj", "{4B8AF0ED-590B-44B7-93BE-6182FE88C719}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -252,6 +254,10 @@ Global {93298A4C-2B88-4EF3-9FC8-6B2AEF52F628}.Debug|Any CPU.Build.0 = Debug|Any CPU {93298A4C-2B88-4EF3-9FC8-6B2AEF52F628}.Release|Any CPU.ActiveCfg = Release|Any CPU {93298A4C-2B88-4EF3-9FC8-6B2AEF52F628}.Release|Any CPU.Build.0 = Release|Any CPU + {4B8AF0ED-590B-44B7-93BE-6182FE88C719}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4B8AF0ED-590B-44B7-93BE-6182FE88C719}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4B8AF0ED-590B-44B7-93BE-6182FE88C719}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4B8AF0ED-590B-44B7-93BE-6182FE88C719}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -291,6 +297,7 @@ Global {00DBDB5E-FE7B-4886-A989-181331DDD148} = {79961196-DB50-4AD3-B749-D231799BCF2E} {93FC8079-48C4-4D3B-8C17-3944A903674F} = {79961196-DB50-4AD3-B749-D231799BCF2E} {93298A4C-2B88-4EF3-9FC8-6B2AEF52F628} = {93FC8079-48C4-4D3B-8C17-3944A903674F} + {4B8AF0ED-590B-44B7-93BE-6182FE88C719} = {79961196-DB50-4AD3-B749-D231799BCF2E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {52B7158F-0A24-47D9-9CF7-3FA94170041A} diff --git a/src/samples/DocsSamples/DocsSamples.csproj b/src/samples/DocsSamples/DocsSamples.csproj new file mode 100644 index 0000000000..b115fcea85 --- /dev/null +++ b/src/samples/DocsSamples/DocsSamples.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + Docs.Samples + + + + + + + diff --git a/src/samples/DocsSamples/EventSourcingQuickstart.cs b/src/samples/DocsSamples/EventSourcingQuickstart.cs new file mode 100644 index 0000000000..142fe46b22 --- /dev/null +++ b/src/samples/DocsSamples/EventSourcingQuickstart.cs @@ -0,0 +1,163 @@ +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; + +namespace Docs.Samples; + + + +#region sample_sample-events + +public sealed record ArrivedAtLocation(Guid QuestId, int Day, string Location); + +public sealed record MembersJoined(Guid QuestId, int Day, string Location, string[] Members); + +public sealed record QuestStarted(Guid QuestId, string Name); + +public sealed record QuestEnded(Guid QuestId, string Name); + +public sealed record MembersDeparted(Guid QuestId, int Day, string Location, string[] Members); + +public sealed record MembersEscaped(Guid QuestId, string Location, string[] Members); + + +#endregion + + +#region sample_QuestParty + +public sealed record QuestParty(Guid Id, List Members) +{ + // These methods take in events and update the QuestParty + public static QuestParty Create(QuestStarted started) => new(started.QuestId, []); + public static QuestParty Apply(MembersJoined joined, QuestParty party) => + party with + { + Members = party.Members.Union(joined.Members).ToList() + }; + + public static QuestParty Apply(MembersDeparted departed, QuestParty party) => + party with + { + Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList() + }; + + public static QuestParty Apply(MembersEscaped escaped, QuestParty party) => + party with + { + Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList() + }; +} + +#endregion + +#region sample_Quest +public sealed record Quest(Guid Id, List Members, List Slayed, string Name, bool isFinished); + +public sealed class QuestProjection: SingleStreamProjection +{ + public static Quest Create(QuestStarted started) => new(started.QuestId, [], [], started.Name, false); + public static Quest Apply(MembersJoined joined, Quest party) => + party with + { + Members = party.Members.Union(joined.Members).ToList() + }; + + public static Quest Apply(MembersDeparted departed, Quest party) => + party with + { + Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList() + }; + + public static Quest Apply(MembersEscaped escaped, Quest party) => + party with + { + Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList() + }; + + public static Quest Apply(QuestEnded ended, Quest party) => + party with { isFinished = true }; + +} + +#endregion + + +public class EventSourcingQuickstart +{ + public async Task capture_events() + { + #region sample_event-store-quickstart + + var store = DocumentStore.For(_ => + { + _.Connection(ExampleConstants.ConnectionString); + }); + + var questId = Guid.NewGuid(); + + await using var session = store.LightweightSession(); + var started = new QuestStarted(questId, "Destroy the One Ring"); + var joined1 = new MembersJoined(questId,1, "Hobbiton", ["Frodo", "Sam"]); + + // Start a brand new stream and commit the new events as + // part of a transaction + session.Events.StartStream(questId, started, joined1); + + // Append more events to the same stream + var joined2 = new MembersJoined(questId,3, "Buckland", ["Merry", "Pippen"]); + var joined3 = new MembersJoined(questId,10, "Bree", ["Aragorn"]); + var arrived = new ArrivedAtLocation(questId, 15, "Rivendell"); + session.Events.Append(questId, joined2, joined3, arrived); + + // Save the pending changes to db + await session.SaveChangesAsync(); + + #endregion + + #region sample_events-aggregate-on-the-fly + + await using var session2 = store.LightweightSession(); + // questId is the id of the stream + var party = await session2.Events.AggregateStreamAsync(questId); + + var party_at_version_3 = await session2.Events + .AggregateStreamAsync(questId, 3); + + var party_yesterday = await session2.Events + .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); + + #endregion + + } + + public async Task quest_projection() + { + #region sample_adding-quest-projection + var store = DocumentStore.For(_ => + { + _.Connection(ExampleConstants.ConnectionString); + _.Projections.Add(ProjectionLifecycle.Inline); // [!code ++] + }); + #endregion + + var questId = Guid.NewGuid(); + + #region sample_querying-quest-projection + await using var session = store.LightweightSession(); + + var started = new QuestStarted(questId, "Destroy the One Ring"); + var joined1 = new MembersJoined(questId, 1, "Hobbiton", ["Frodo", "Sam"]); + + session.Events.StartStream(questId, started, joined1); + await session.SaveChangesAsync(); + + // we can now query the quest state like any other Marten document + var questState = await session.LoadAsync(questId); + + var finishedQuests = await session.Query().Where(x => x.isFinished).ToListAsync(); + + #endregion + + } +} diff --git a/src/samples/DocsSamples/ExampleConstants.cs b/src/samples/DocsSamples/ExampleConstants.cs new file mode 100644 index 0000000000..cc5276f6e6 --- /dev/null +++ b/src/samples/DocsSamples/ExampleConstants.cs @@ -0,0 +1,6 @@ +namespace Docs.Samples; + +public static class ExampleConstants +{ + public static readonly string ConnectionString = string.Empty; +}