From 25d3329d04da008ba75cb4b6657d6875b39ec92a Mon Sep 17 00:00:00 2001 From: John Bell Date: Sat, 23 Mar 2024 01:35:54 +0000 Subject: [PATCH 01/17] Add new generic on PgStore and converter trait to decouple persistence from Aggregate::Event --- src/sql/event.rs | 25 +++++++++++++++ src/store/postgres/builder.rs | 2 ++ src/store/postgres/event_store.rs | 53 ++++++++++++++++++++++--------- 3 files changed, 65 insertions(+), 15 deletions(-) diff --git a/src/sql/event.rs b/src/sql/event.rs index ea90ba43..513a8f34 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -5,6 +5,7 @@ use serde_json::Value; use uuid::Uuid; use crate::event::Event; +use crate::store::postgres::Converter; use crate::store::StoreEvent; use crate::types::SequenceNumber; @@ -19,6 +20,30 @@ pub struct DbEvent { pub version: Option, } +impl DbEvent { + pub fn try_into_store_event(self) -> Result>, serde_json::Error> + where + Schema: Converter, + { + #[cfg(feature = "upcasting")] + let payload = Schema::upcast(self.payload, self.version)?.into(); + #[cfg(not(feature = "upcasting"))] + let payload = serde_json::from_value::(self.payload)?.into(); + + Ok(match payload { + None => None, + Some(payload) => Some(StoreEvent { + id: self.id, + aggregate_id: self.aggregate_id, + payload, + occurred_on: self.occurred_on, + sequence_number: self.sequence_number, + version: self.version, + }), + }) + } +} + impl TryInto> for DbEvent { type Error = serde_json::Error; diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 53cada9b..4854a69d 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -1,3 +1,4 @@ +use std::marker::PhantomData; use std::sync::Arc; use sqlx::{PgConnection, Pool, Postgres}; @@ -113,6 +114,7 @@ where transactional_event_handlers: self.transactional_event_handlers, event_buses: self.event_buses, }), + _schema: PhantomData, }) } } diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index c9dc1809..df108f11 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -1,4 +1,4 @@ -use std::convert::TryInto; +use std::marker::PhantomData; use std::sync::Arc; use async_trait::async_trait; @@ -22,16 +22,31 @@ use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; use crate::types::SequenceNumber; use crate::{Aggregate, AggregateState}; +pub trait Converter: From + Into> + Event {} + +impl Converter for T where T: From + Into> + Event {} + /// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a /// pre-made implementation of an [`EventStore`] persisting on Postgres. /// /// The store is protected by an [`Arc`] that allows it to be cloneable still having the same memory /// reference. -pub struct PgStore +/// +/// To decouple persistence from the event types, it is possible to optionally, specify the +/// Database event schema for this store as a serializable type. +/// +/// When events are persisted, they will first be converted via the `From` trait into the `Schema` +/// type, then serialized. +/// +/// When events are read from the store, they will first be deserialized into the `Schema` and then +/// they can be converted into and option of the domain aggregate event. In this way it is possible +/// to deprecate events in core part of your application by returning `None` when converting. +pub struct PgStore::Event> where A: Aggregate, { pub(super) inner: Arc>, + pub(super) _schema: PhantomData, } pub(super) struct InnerPgStore @@ -46,10 +61,11 @@ where pub(super) event_buses: Vec + Send>>, } -impl PgStore +impl PgStore where A: Aggregate, - A::Event: Event + Sync, + A::Event: Send + Sync, + Schema: Converter + Event + Send + Sync, { /// Returns the name of the event store table pub fn table_name(&self) -> &str { @@ -83,17 +99,15 @@ where let id: Uuid = Uuid::new_v4(); #[cfg(feature = "upcasting")] - let version: Option = { - use crate::event::Upcaster; - A::Event::current_version() - }; + let version: Option = Schema::current_version(); #[cfg(not(feature = "upcasting"))] let version: Option = None; + let schema = Schema::from(event); let _ = sqlx::query(self.inner.statements.insert()) .bind(id) .bind(aggregate_id) - .bind(Json(&event)) + .bind(Json(&schema)) .bind(occurred_on) .bind(sequence_number) .bind(version) @@ -103,7 +117,9 @@ where Ok(StoreEvent { id, aggregate_id, - payload: event, + payload: schema.into().expect( + "This should always be true for converters assert event == Converter::from(event).into().unwrap()", + ), occurred_on, sequence_number, version, @@ -119,7 +135,9 @@ where Box::pin({ sqlx::query_as::<_, DbEvent>(self.inner.statements.select_all()) .fetch(executor) - .map(|res| Ok(res?.try_into()?)) + .map(|res| Ok(res?.try_into_store_event::<_, Schema>()?)) + .map(Result::transpose) + .filter_map(std::future::ready) }) } } @@ -140,11 +158,12 @@ pub struct PgStoreLockGuard { impl UnlockOnDrop for PgStoreLockGuard {} #[async_trait] -impl EventStore for PgStore +impl EventStore for PgStore where A: Aggregate, A::State: Send, - A::Event: Event + Send + Sync, + A::Event: Send + Sync, + Schema: Converter + Event + Send + Sync, { type Aggregate = A; type Error = PgStoreError; @@ -167,10 +186,13 @@ where .fetch_all(&self.inner.pool) .await? .into_iter() - .map(|event| Ok(event.try_into()?)) + .map(|event| Ok(event.try_into_store_event::<_, Schema>()?)) + .filter_map(Result::transpose) .collect::>, Self::Error>>()?) } + // Note: https://github.com/rust-lang/rust-clippy/issues/12281 + #[allow(clippy::blocks_in_conditions)] #[tracing::instrument(skip_all, fields(aggregate_id = % aggregate_state.id()), err)] async fn persist( &self, @@ -301,13 +323,14 @@ impl std::fmt::Debug for PgStore { } } -impl Clone for PgStore +impl Clone for PgStore where A: Aggregate, { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), + _schema: PhantomData, } } } From b136e79de805d956619599a408adf770b2cf2194 Mon Sep 17 00:00:00 2001 From: John Bell Date: Sun, 24 Mar 2024 22:13:13 +0000 Subject: [PATCH 02/17] Add ability to set schema in the store builder --- src/store/postgres/builder.rs | 36 +++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 4854a69d..9f781760 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -5,16 +5,17 @@ use sqlx::{PgConnection, Pool, Postgres}; use tokio::sync::RwLock; use crate::bus::EventBus; +use crate::event::Event; use crate::handler::{EventHandler, TransactionalEventHandler}; use crate::sql::migrations::{Migrations, MigrationsHandler}; use crate::sql::statements::{Statements, StatementsHandler}; use crate::store::postgres::{InnerPgStore, PgStoreError}; use crate::Aggregate; -use super::PgStore; +use super::{Converter, PgStore}; /// Struct used to build a brand new [`PgStore`]. -pub struct PgStoreBuilder +pub struct PgStoreBuilder::Event> where A: Aggregate, { @@ -24,14 +25,15 @@ where transactional_event_handlers: Vec + Send>>, event_buses: Vec + Send>>, run_migrations: bool, + _schema: PhantomData, } -impl PgStoreBuilder +impl PgStoreBuilder::Event> where A: Aggregate, { /// Creates a new instance of a [`PgStoreBuilder`]. - pub fn new(pool: Pool) -> Self { + pub fn new(pool: Pool) -> PgStoreBuilder::Event> { PgStoreBuilder { pool, statements: Statements::new::(), @@ -39,9 +41,15 @@ where transactional_event_handlers: vec![], event_buses: vec![], run_migrations: true, + _schema: PhantomData, } } +} +impl PgStoreBuilder +where + A: Aggregate, +{ /// Set event handlers list pub fn with_event_handlers(mut self, event_handlers: Vec + Send>>) -> Self { self.event_handlers = event_handlers; @@ -92,6 +100,22 @@ where self } + /// Set the schema of the underlying PgStore. + pub fn with_schema(self) -> PgStoreBuilder + where + NewSchema: Converter + Event + Send + Sync, + { + PgStoreBuilder { + pool: self.pool, + statements: self.statements, + run_migrations: self.run_migrations, + event_handlers: self.event_handlers, + transactional_event_handlers: self.transactional_event_handlers, + event_buses: self.event_buses, + _schema: PhantomData, + } + } + /// This function runs all the needed [`Migrations`], atomically setting up the database if /// `run_migrations` isn't explicitly set to false. [`Migrations`] should be run only at application /// startup due to avoid performance issues. @@ -101,7 +125,7 @@ where /// # Errors /// /// Will return an `Err` if there's an error running [`Migrations`]. - pub async fn try_build(self) -> Result, sqlx::Error> { + pub async fn try_build(self) -> Result, sqlx::Error> { if self.run_migrations { Migrations::run::(&self.pool).await?; } @@ -114,7 +138,7 @@ where transactional_event_handlers: self.transactional_event_handlers, event_buses: self.event_buses, }), - _schema: PhantomData, + _schema: self._schema, }) } } From dea207354759cd397b4f879d9854151d8fabd8fd Mon Sep 17 00:00:00 2001 From: John Bell Date: Sun, 24 Mar 2024 22:14:10 +0000 Subject: [PATCH 03/17] Add example of how to use schema for both deprecating and upcasting use cases --- Makefile.toml | 2 + examples/schema/main.rs | 228 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 examples/schema/main.rs diff --git a/Makefile.toml b/Makefile.toml index 39fb91a4..8ef5af60 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -78,6 +78,7 @@ script = [ "cargo run --example readme --features=postgres", "cargo run --example rebuilder --features=rebuilder,postgres", "cargo run --example saga --features=postgres", + "cargo run --example schema --features=postgres", "cargo run --example shared_view --features=postgres", "cargo run --example store_crud --features=postgres", "cargo run --example transactional_view --features=postgres", @@ -95,6 +96,7 @@ script = [ "cargo clippy --example readme --features=postgres -- -D warnings", "cargo clippy --example rebuilder --features=rebuilder,postgres -- -D warnings", "cargo clippy --example saga --features=postgres -- -D warnings", + "cargo clippy --example schema --features=postgres -- -D warnings", "cargo clippy --example shared_view --features=postgres -- -D warnings", "cargo clippy --example store_crud --features=postgres -- -D warnings", "cargo clippy --example transactional_view --features=postgres -- -D warnings", diff --git a/examples/schema/main.rs b/examples/schema/main.rs new file mode 100644 index 00000000..7bbe56fd --- /dev/null +++ b/examples/schema/main.rs @@ -0,0 +1,228 @@ +use std::marker::PhantomData; + +use esrs::manager::AggregateManager; +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::Aggregate; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[path = "../common/lib.rs"] +mod common; + +use crate::common::util::new_pool; +use crate::common::CommonError; + +struct SchemaAggregate(PhantomData); + +impl Aggregate for SchemaAggregate +where + EventType: Default, +{ + const NAME: &'static str = "schema"; + type State = SchemaState; + type Command = SchemaCommand; + type Event = EventType; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } +} + +#[derive(Default)] +struct SchemaState { + events: Vec, +} + +enum SchemaCommand {} + +#[derive(Default, Debug)] +enum SchemaEventUpcasted { + #[default] + EmptyEvent, + EventB { + count: u64, + }, + EventC { + contents: String, + count: u64, + }, +} + +#[derive(Default, Debug)] +enum SchemaEvent { + #[default] + EmptyEvent, + EventB { + count: u64, + }, + EventC { + contents: String, + count: u64, + }, +} + +#[derive(Default)] +enum SchemaEventOld { + #[default] + EmptyEvent, + EventA { + contents: String, + }, + EventB { + count: u64, + }, +} + +#[derive(Deserialize, Serialize)] +enum Schema { + EmptyEvent, + EventA { contents: String }, + EventB { count: u64 }, + EventC { contents: String, count: u64 }, +} + +#[cfg(feature = "upcasting")] +impl esrs::event::Upcaster for Schema {} + +impl From for Schema { + fn from(value: SchemaEvent) -> Self { + match value { + SchemaEvent::EmptyEvent => Schema::EmptyEvent, + SchemaEvent::EventB { count } => Schema::EventB { count }, + SchemaEvent::EventC { contents, count } => Schema::EventC { contents, count }, + } + } +} + +impl From for Schema { + fn from(value: SchemaEventUpcasted) -> Self { + match value { + SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent, + SchemaEventUpcasted::EventB { count } => Schema::EventB { count }, + SchemaEventUpcasted::EventC { contents, count } => Schema::EventC { contents, count }, + } + } +} + +impl From for Schema { + fn from(value: SchemaEventOld) -> Self { + match value { + SchemaEventOld::EmptyEvent => Schema::EmptyEvent, + SchemaEventOld::EventA { contents } => Schema::EventA { contents }, + SchemaEventOld::EventB { count } => Schema::EventB { count }, + } + } +} + +impl From for Option { + fn from(val: Schema) -> Self { + match val { + Schema::EmptyEvent => Some(SchemaEventOld::EmptyEvent), + Schema::EventA { contents } => Some(SchemaEventOld::EventA { contents }), + Schema::EventB { count } => Some(SchemaEventOld::EventB { count }), + Schema::EventC { .. } => panic!("not supported"), + } + } +} + +impl From for Option { + fn from(val: Schema) -> Self { + match val { + Schema::EmptyEvent => Some(SchemaEvent::EmptyEvent), + Schema::EventA { .. } => None, + Schema::EventB { count } => Some(SchemaEvent::EventB { count }), + Schema::EventC { contents, count } => Some(SchemaEvent::EventC { contents, count }), + } + } +} + +impl From for Option { + fn from(val: Schema) -> Self { + match val { + Schema::EmptyEvent => Some(SchemaEventUpcasted::EmptyEvent), + Schema::EventA { contents } => Some(SchemaEventUpcasted::EventC { contents, count: 1 }), + Schema::EventB { count } => Some(SchemaEventUpcasted::EventB { count }), + Schema::EventC { contents, count } => Some(SchemaEventUpcasted::EventC { contents, count }), + } + } +} + +#[tokio::main] +async fn main() { + let pool = new_pool().await; + let aggregate_id: Uuid = Uuid::new_v4(); + + // Before deprecation of EventA + let old_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + SchemaEventOld::EmptyEvent, + SchemaEventOld::EventA { + contents: "this is soon to be deprecated".to_owned(), + }, + SchemaEventOld::EventB { count: 42 }, + ]; + + let manager = AggregateManager::new(old_store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let events = old_store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + + // After deprecation of EventA and addition of EventC + let new_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = new_store.by_aggregate_id(aggregate_id).await.unwrap(); + + assert_eq!(events.len(), 2); + + let events = vec![ + SchemaEvent::EmptyEvent, + SchemaEvent::EventB { count: 42 }, + SchemaEvent::EventC { + contents: "this is the new events".to_owned(), + count: 21, + }, + ]; + + let manager = AggregateManager::new(new_store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = new_store.persist(&mut state, events).await.unwrap(); + + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + // The deprecated event is skipped + assert_eq!(events.len(), 5); + + // After upcasting of EventA to EventC + let upcasting_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + let manager = AggregateManager::new(upcasting_store.clone()); + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + // All the events are visible + assert_eq!(events.len(), 6); + + // The event has been upcasted + assert!(matches!(events[1], SchemaEventUpcasted::EventC { count: 1, .. })); +} From e249895934d5d47142659cc47e49c2c9b85beffc Mon Sep 17 00:00:00 2001 From: John Bell Date: Mon, 25 Mar 2024 09:16:54 +0000 Subject: [PATCH 04/17] Add description of schema example --- examples/schema/main.rs | 90 ++++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 19 deletions(-) diff --git a/examples/schema/main.rs b/examples/schema/main.rs index 7bbe56fd..24f00e57 100644 --- a/examples/schema/main.rs +++ b/examples/schema/main.rs @@ -1,9 +1,26 @@ +//! This example demonstrates several stages of the evolution of a system. +//! +//! The first part for an event store with no schema (where the `Aggregate::Event` type implement +//! `Event`. +//! +//! The next evolution of the system is the introduction of the `Schema` to the event store. This +//! removes the need for `Aggregate::Event` to implement `Event` instead relies on the +//! `From` and `Into>` on `Schema` and the fact that +//! `Schema` implements event. +//! +//! The next evolution demonstrates how an event can be deprecated using the `Schema` mechanism. A +//! new event variant is introduced and one is deprecated. The use of the schema enables the rest +//! of the system to not have to concern itself with the deprecated events. The only place it still +//! exists is as a variant of the `Schema` type. +//! +//! The final part is an alternative way for the system to evolve using the `Schema` mechanism as a +//! way to upcast from one shape of and event to another. use std::marker::PhantomData; use esrs::manager::AggregateManager; use esrs::store::postgres::{PgStore, PgStoreBuilder}; use esrs::store::EventStore; -use esrs::Aggregate; +use esrs::{Aggregate, AggregateState}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -44,20 +61,20 @@ struct SchemaState { enum SchemaCommand {} -#[derive(Default, Debug)] +#[derive(Default)] enum SchemaEventUpcasted { #[default] EmptyEvent, - EventB { + EventA { + contents: String, count: u64, }, - EventC { - contents: String, + EventB { count: u64, }, } -#[derive(Default, Debug)] +#[derive(Default)] enum SchemaEvent { #[default] EmptyEvent, @@ -70,6 +87,21 @@ enum SchemaEvent { }, } +#[derive(Default, Deserialize, Serialize)] +enum SchemaEventNoSchema { + #[default] + EmptyEvent, + EventA { + contents: String, + }, + EventB { + count: u64, + }, +} + +#[cfg(feature = "upcasting")] +impl esrs::event::Upcaster for SchemaEventNoSchema {} + #[derive(Default)] enum SchemaEventOld { #[default] @@ -108,7 +140,7 @@ impl From for Schema { match value { SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent, SchemaEventUpcasted::EventB { count } => Schema::EventB { count }, - SchemaEventUpcasted::EventC { contents, count } => Schema::EventC { contents, count }, + SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count }, } } } @@ -149,9 +181,9 @@ impl From for Option { fn from(val: Schema) -> Self { match val { Schema::EmptyEvent => Some(SchemaEventUpcasted::EmptyEvent), - Schema::EventA { contents } => Some(SchemaEventUpcasted::EventC { contents, count: 1 }), + Schema::EventA { contents } => Some(SchemaEventUpcasted::EventA { contents, count: 1 }), Schema::EventB { count } => Some(SchemaEventUpcasted::EventB { count }), - Schema::EventC { contents, count } => Some(SchemaEventUpcasted::EventC { contents, count }), + Schema::EventC { contents, count } => Some(SchemaEventUpcasted::EventA { contents, count }), } } } @@ -161,7 +193,24 @@ async fn main() { let pool = new_pool().await; let aggregate_id: Uuid = Uuid::new_v4(); - // Before deprecation of EventA + // Before the schema + let schemaless_store: PgStore> = + PgStoreBuilder::new(pool.clone()).try_build().await.unwrap(); + + let events = vec![ + SchemaEventNoSchema::EmptyEvent, + SchemaEventNoSchema::EventA { + contents: "this is soon to be deprecated".to_owned(), + }, + SchemaEventNoSchema::EventB { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = schemaless_store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + + // With schema before deprecation of EventA let old_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) .with_schema::() .try_build() @@ -178,9 +227,11 @@ async fn main() { let manager = AggregateManager::new(old_store.clone()); let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); - let events = old_store.persist(&mut state, events).await.unwrap(); + let _ = old_store.persist(&mut state, events).await.unwrap(); - assert_eq!(events.len(), 3); + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + assert_eq!(events.len(), 6); // After deprecation of EventA and addition of EventC let new_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) @@ -191,7 +242,7 @@ async fn main() { let events = new_store.by_aggregate_id(aggregate_id).await.unwrap(); - assert_eq!(events.len(), 2); + assert_eq!(events.len(), 4); let events = vec![ SchemaEvent::EmptyEvent, @@ -208,10 +259,10 @@ async fn main() { let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; - // The deprecated event is skipped - assert_eq!(events.len(), 5); + // The deprecated events are skipped + assert_eq!(events.len(), 7); - // After upcasting of EventA to EventC + // After upcasting of EventA let upcasting_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) .with_schema::() .try_build() @@ -221,8 +272,9 @@ async fn main() { let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; // All the events are visible - assert_eq!(events.len(), 6); + assert_eq!(events.len(), 9); - // The event has been upcasted - assert!(matches!(events[1], SchemaEventUpcasted::EventC { count: 1, .. })); + // The events have been upcasted + assert!(matches!(events[1], SchemaEventUpcasted::EventA { count: 1, .. })); + assert!(matches!(events[4], SchemaEventUpcasted::EventA { count: 1, .. })); } From 0bef40c8132b5a997533af1aea27a39c0713f303 Mon Sep 17 00:00:00 2001 From: John Bell Date: Mon, 25 Mar 2024 21:32:59 +0000 Subject: [PATCH 05/17] Use explicit methods on the conversion trait in place of making use of from and to --- examples/schema/main.rs | 54 ++++++------ src/sql/event.rs | 10 +-- src/store/postgres/builder.rs | 8 +- src/store/postgres/event_store.rs | 131 +++++++++++++++++++++++++----- 4 files changed, 145 insertions(+), 58 deletions(-) diff --git a/examples/schema/main.rs b/examples/schema/main.rs index 24f00e57..e0a9d0ce 100644 --- a/examples/schema/main.rs +++ b/examples/schema/main.rs @@ -125,61 +125,55 @@ enum Schema { #[cfg(feature = "upcasting")] impl esrs::event::Upcaster for Schema {} -impl From for Schema { - fn from(value: SchemaEvent) -> Self { +impl esrs::store::postgres::Schema for Schema { + fn write(value: SchemaEvent) -> Self { match value { SchemaEvent::EmptyEvent => Schema::EmptyEvent, SchemaEvent::EventB { count } => Schema::EventB { count }, SchemaEvent::EventC { contents, count } => Schema::EventC { contents, count }, } } -} -impl From for Schema { - fn from(value: SchemaEventUpcasted) -> Self { - match value { - SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent, - SchemaEventUpcasted::EventB { count } => Schema::EventB { count }, - SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count }, + fn read(self) -> Option { + match self { + Self::EmptyEvent => Some(SchemaEvent::EmptyEvent), + Self::EventA { .. } => None, + Self::EventB { count } => Some(SchemaEvent::EventB { count }), + Self::EventC { contents, count } => Some(SchemaEvent::EventC { contents, count }), } } } -impl From for Schema { - fn from(value: SchemaEventOld) -> Self { +impl esrs::store::postgres::Schema for Schema { + fn write(value: SchemaEventOld) -> Self { match value { SchemaEventOld::EmptyEvent => Schema::EmptyEvent, SchemaEventOld::EventA { contents } => Schema::EventA { contents }, SchemaEventOld::EventB { count } => Schema::EventB { count }, } } -} -impl From for Option { - fn from(val: Schema) -> Self { - match val { - Schema::EmptyEvent => Some(SchemaEventOld::EmptyEvent), - Schema::EventA { contents } => Some(SchemaEventOld::EventA { contents }), - Schema::EventB { count } => Some(SchemaEventOld::EventB { count }), - Schema::EventC { .. } => panic!("not supported"), + fn read(self) -> Option { + match self { + Self::EmptyEvent => Some(SchemaEventOld::EmptyEvent), + Self::EventA { contents } => Some(SchemaEventOld::EventA { contents }), + Self::EventB { count } => Some(SchemaEventOld::EventB { count }), + Self::EventC { .. } => panic!("not supported"), } } } -impl From for Option { - fn from(val: Schema) -> Self { - match val { - Schema::EmptyEvent => Some(SchemaEvent::EmptyEvent), - Schema::EventA { .. } => None, - Schema::EventB { count } => Some(SchemaEvent::EventB { count }), - Schema::EventC { contents, count } => Some(SchemaEvent::EventC { contents, count }), +impl esrs::store::postgres::Schema for Schema { + fn write(value: SchemaEventUpcasted) -> Self { + match value { + SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent, + SchemaEventUpcasted::EventB { count } => Schema::EventB { count }, + SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count }, } } -} -impl From for Option { - fn from(val: Schema) -> Self { - match val { + fn read(self) -> Option { + match self { Schema::EmptyEvent => Some(SchemaEventUpcasted::EmptyEvent), Schema::EventA { contents } => Some(SchemaEventUpcasted::EventA { contents, count: 1 }), Schema::EventB { count } => Some(SchemaEventUpcasted::EventB { count }), diff --git a/src/sql/event.rs b/src/sql/event.rs index 513a8f34..22b6a734 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -5,7 +5,7 @@ use serde_json::Value; use uuid::Uuid; use crate::event::Event; -use crate::store::postgres::Converter; +use crate::store::postgres::Schema; use crate::store::StoreEvent; use crate::types::SequenceNumber; @@ -21,14 +21,14 @@ pub struct DbEvent { } impl DbEvent { - pub fn try_into_store_event(self) -> Result>, serde_json::Error> + pub fn try_into_store_event(self) -> Result>, serde_json::Error> where - Schema: Converter, + S: Schema, { #[cfg(feature = "upcasting")] - let payload = Schema::upcast(self.payload, self.version)?.into(); + let payload = S::upcast(self.payload, self.version)?.read(); #[cfg(not(feature = "upcasting"))] - let payload = serde_json::from_value::(self.payload)?.into(); + let payload = serde_json::from_value::(self.payload)?.read(); Ok(match payload { None => None, diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 9f781760..34e09766 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -12,7 +12,7 @@ use crate::sql::statements::{Statements, StatementsHandler}; use crate::store::postgres::{InnerPgStore, PgStoreError}; use crate::Aggregate; -use super::{Converter, PgStore}; +use super::{PgStore, Schema}; /// Struct used to build a brand new [`PgStore`]. pub struct PgStoreBuilder::Event> @@ -46,7 +46,7 @@ where } } -impl PgStoreBuilder +impl PgStoreBuilder where A: Aggregate, { @@ -103,7 +103,7 @@ where /// Set the schema of the underlying PgStore. pub fn with_schema(self) -> PgStoreBuilder where - NewSchema: Converter + Event + Send + Sync, + NewSchema: Schema + Event + Send + Sync, { PgStoreBuilder { pool: self.pool, @@ -125,7 +125,7 @@ where /// # Errors /// /// Will return an `Err` if there's an error running [`Migrations`]. - pub async fn try_build(self) -> Result, sqlx::Error> { + pub async fn try_build(self) -> Result, sqlx::Error> { if self.run_migrations { Migrations::run::(&self.pool).await?; } diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index df108f11..41fc097c 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -22,9 +22,100 @@ use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; use crate::types::SequenceNumber; use crate::{Aggregate, AggregateState}; -pub trait Converter: From + Into> + Event {} +/// To support decoupling between the Aggregate::Event type and the schema of the DB table +/// in `PgStore` you can create a schema type that implements `Event` and `Schema` +/// where `E = Aggregate::Event`. +/// +/// Note: Although `Schema::read` returns an `Option` for any given event and implementation. +/// +/// The following must hold +/// +/// ```rust +/// # use serde::{Serialize, Deserialize}; +/// # use esrs::store::postgres::Schema as SchemaTrait; +/// # +/// # #[derive(Clone, Eq, PartialEq, Debug)] +/// # struct Event { +/// # a: u32, +/// # } +/// # +/// # #[derive(Serialize, Deserialize)] +/// # struct Schema { +/// # a: u32, +/// # } +/// # +/// # #[cfg(feature = "upcasting")] +/// # impl esrs::event::Upcaster for Schema {} +/// # +/// # impl SchemaTrait for Schema { +/// # fn write(Event { a }: Event) -> Self { +/// # Self { a } +/// # } +/// # +/// # fn read(self) -> Option { +/// # Some(Event { a: self.a }) +/// # } +/// # } +/// # +/// # let event = Event { a: 42 }; +/// assert_eq!(Some(event.clone()), Schema::write(event).read()); +/// ``` +pub trait Schema: Event { + /// Converts the event into the schema type. + fn write(event: E) -> Self; + + /// Converts the schema into the event type. + /// + /// This returns an option to enable skipping deprecated event which are persisted in the DB. + /// + /// Note: Although `Schema::read` returns an `Option` for any given event and implementation. + /// + /// The following must hold + /// + /// ```rust + /// # use serde::{Serialize, Deserialize}; + /// # use esrs::store::postgres::Schema as SchemaTrait; + /// # + /// # #[derive(Clone, Eq, PartialEq, Debug)] + /// # struct Event { + /// # a: u32, + /// # } + /// # + /// # #[derive(Serialize, Deserialize)] + /// # struct Schema { + /// # a: u32, + /// # } + /// # + /// # #[cfg(feature = "upcasting")] + /// # impl esrs::event::Upcaster for Schema {} + /// # + /// # impl SchemaTrait for Schema { + /// # fn write(Event { a }: Event) -> Self { + /// # Self { a } + /// # } + /// # + /// # fn read(self) -> Option { + /// # Some(Event { a: self.a }) + /// # } + /// # } + /// # + /// # let event = Event { a: 42 }; + /// assert_eq!(Some(event.clone()), Schema::write(event).read()); + /// ``` + fn read(self) -> Option; +} -impl Converter for T where T: From + Into> + Event {} +impl Schema for E +where + E: Event, +{ + fn write(event: E) -> Self { + event + } + fn read(self) -> Option { + Some(self) + } +} /// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a /// pre-made implementation of an [`EventStore`] persisting on Postgres. @@ -33,14 +124,15 @@ impl Converter for T where T: From + Into> + Event {} /// reference. /// /// To decouple persistence from the event types, it is possible to optionally, specify the -/// Database event schema for this store as a serializable type. +/// Database event schema for this store as a type that implements `Event` and +/// `Schema`. /// -/// When events are persisted, they will first be converted via the `From` trait into the `Schema` -/// type, then serialized. +/// When events are persisted, they will first be converted to the `Schema` type using +/// `Schema::write` then serialized using the `Serialize` implementation on `Schema`. /// -/// When events are read from the store, they will first be deserialized into the `Schema` and then -/// they can be converted into and option of the domain aggregate event. In this way it is possible -/// to deprecate events in core part of your application by returning `None` when converting. +/// When events are read from the store, they will first be deserialized into the `Schema` type and +/// then converted into an `Option` using `Schema::read`. In this way it is possible +/// to remove deprecate events in core part of your application by returning `None` from `Schema::read`. pub struct PgStore::Event> where A: Aggregate, @@ -61,11 +153,11 @@ where pub(super) event_buses: Vec + Send>>, } -impl PgStore +impl PgStore where A: Aggregate, A::Event: Send + Sync, - Schema: Converter + Event + Send + Sync, + S: Schema + Event + Send + Sync, { /// Returns the name of the event store table pub fn table_name(&self) -> &str { @@ -99,10 +191,10 @@ where let id: Uuid = Uuid::new_v4(); #[cfg(feature = "upcasting")] - let version: Option = Schema::current_version(); + let version: Option = S::current_version(); #[cfg(not(feature = "upcasting"))] let version: Option = None; - let schema = Schema::from(event); + let schema = S::write(event); let _ = sqlx::query(self.inner.statements.insert()) .bind(id) @@ -117,8 +209,9 @@ where Ok(StoreEvent { id, aggregate_id, - payload: schema.into().expect( - "This should always be true for converters assert event == Converter::from(event).into().unwrap()", + payload: schema.read().expect( + "For any type that implements Schema the following contract should be upheld:\ + assert_eq!(Some(event.clone()), Schema::write(event).read())", ), occurred_on, sequence_number, @@ -135,7 +228,7 @@ where Box::pin({ sqlx::query_as::<_, DbEvent>(self.inner.statements.select_all()) .fetch(executor) - .map(|res| Ok(res?.try_into_store_event::<_, Schema>()?)) + .map(|res| Ok(res?.try_into_store_event::<_, S>()?)) .map(Result::transpose) .filter_map(std::future::ready) }) @@ -158,12 +251,12 @@ pub struct PgStoreLockGuard { impl UnlockOnDrop for PgStoreLockGuard {} #[async_trait] -impl EventStore for PgStore +impl EventStore for PgStore where A: Aggregate, A::State: Send, A::Event: Send + Sync, - Schema: Converter + Event + Send + Sync, + S: Schema + Event + Send + Sync, { type Aggregate = A; type Error = PgStoreError; @@ -186,7 +279,7 @@ where .fetch_all(&self.inner.pool) .await? .into_iter() - .map(|event| Ok(event.try_into_store_event::<_, Schema>()?)) + .map(|event| Ok(event.try_into_store_event::<_, S>()?)) .filter_map(Result::transpose) .collect::>, Self::Error>>()?) } @@ -323,7 +416,7 @@ impl std::fmt::Debug for PgStore { } } -impl Clone for PgStore +impl Clone for PgStore where A: Aggregate, { From aec7d38023e3378b7e8ba4409a1adc08a7872453 Mon Sep 17 00:00:00 2001 From: John Bell Date: Tue, 26 Mar 2024 09:34:11 +0000 Subject: [PATCH 06/17] Refactor schema example and rename functions on trait --- examples/schema/aggregate.rs | 36 ++++ examples/schema/main.rs | 275 ++++++++++++++---------------- src/sql/event.rs | 4 +- src/store/postgres/event_store.rs | 26 +-- 4 files changed, 181 insertions(+), 160 deletions(-) create mode 100644 examples/schema/aggregate.rs diff --git a/examples/schema/aggregate.rs b/examples/schema/aggregate.rs new file mode 100644 index 00000000..1205fc86 --- /dev/null +++ b/examples/schema/aggregate.rs @@ -0,0 +1,36 @@ +use std::marker::PhantomData; + +use esrs::Aggregate; + +use crate::common::CommonError; + +pub(crate) struct SchemaAggregate(PhantomData); + +impl Aggregate for SchemaAggregate +where + EventType: Default, +{ + const NAME: &'static str = "schema"; + type State = SchemaState; + type Command = SchemaCommand; + type Event = EventType; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } +} + +#[derive(Default)] +pub(crate) struct SchemaState { + pub(crate) events: Vec, +} + +pub(crate) enum SchemaCommand {} diff --git a/examples/schema/main.rs b/examples/schema/main.rs index e0a9d0ce..bfbde9a6 100644 --- a/examples/schema/main.rs +++ b/examples/schema/main.rs @@ -15,169 +15,154 @@ //! //! The final part is an alternative way for the system to evolve using the `Schema` mechanism as a //! way to upcast from one shape of and event to another. -use std::marker::PhantomData; use esrs::manager::AggregateManager; use esrs::store::postgres::{PgStore, PgStoreBuilder}; use esrs::store::EventStore; -use esrs::{Aggregate, AggregateState}; +use esrs::AggregateState; use serde::{Deserialize, Serialize}; use uuid::Uuid; +mod aggregate; #[path = "../common/lib.rs"] mod common; use crate::common::util::new_pool; -use crate::common::CommonError; - -struct SchemaAggregate(PhantomData); - -impl Aggregate for SchemaAggregate -where - EventType: Default, -{ - const NAME: &'static str = "schema"; - type State = SchemaState; - type Command = SchemaCommand; - type Event = EventType; - type Error = CommonError; - - fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { - match command {} - } - - fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { - let mut events = state.events; - events.push(payload); +use aggregate::SchemaAggregate; - Self::State { events } - } +#[derive(Deserialize, Serialize)] +enum Schema { + A, + B { contents: String }, + C { count: u64 }, + D { contents: String, count: u64 }, } -#[derive(Default)] -struct SchemaState { - events: Vec, -} +#[cfg(feature = "upcasting")] +impl esrs::event::Upcaster for Schema {} -enum SchemaCommand {} - -#[derive(Default)] -enum SchemaEventUpcasted { - #[default] - EmptyEvent, - EventA { - contents: String, - count: u64, - }, - EventB { - count: u64, - }, -} +mod before_schema { + use serde::{Deserialize, Serialize}; -#[derive(Default)] -enum SchemaEvent { - #[default] - EmptyEvent, - EventB { - count: u64, - }, - EventC { - contents: String, - count: u64, - }, -} + #[derive(Default, Deserialize, Serialize)] + pub enum Event { + #[default] + A, + B { + contents: String, + }, + C { + count: u64, + }, + } -#[derive(Default, Deserialize, Serialize)] -enum SchemaEventNoSchema { - #[default] - EmptyEvent, - EventA { - contents: String, - }, - EventB { - count: u64, - }, + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Event {} } -#[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for SchemaEventNoSchema {} - -#[derive(Default)] -enum SchemaEventOld { - #[default] - EmptyEvent, - EventA { - contents: String, - }, - EventB { - count: u64, - }, -} - -#[derive(Deserialize, Serialize)] -enum Schema { - EmptyEvent, - EventA { contents: String }, - EventB { count: u64 }, - EventC { contents: String, count: u64 }, -} +mod after_schema { + use super::Schema; -#[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for Schema {} + #[derive(Default)] + pub enum Event { + #[default] + A, + B { + contents: String, + }, + C { + count: u64, + }, + } -impl esrs::store::postgres::Schema for Schema { - fn write(value: SchemaEvent) -> Self { - match value { - SchemaEvent::EmptyEvent => Schema::EmptyEvent, - SchemaEvent::EventB { count } => Schema::EventB { count }, - SchemaEvent::EventC { contents, count } => Schema::EventC { contents, count }, + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } } - } - fn read(self) -> Option { - match self { - Self::EmptyEvent => Some(SchemaEvent::EmptyEvent), - Self::EventA { .. } => None, - Self::EventB { count } => Some(SchemaEvent::EventB { count }), - Self::EventC { contents, count } => Some(SchemaEvent::EventC { contents, count }), + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + Self::D { .. } => panic!("not supported"), + } } } } -impl esrs::store::postgres::Schema for Schema { - fn write(value: SchemaEventOld) -> Self { - match value { - SchemaEventOld::EmptyEvent => Schema::EmptyEvent, - SchemaEventOld::EventA { contents } => Schema::EventA { contents }, - SchemaEventOld::EventB { count } => Schema::EventB { count }, - } +mod with_deprecation { + use super::Schema; + + #[derive(Default)] + pub enum Event { + #[default] + A, + C { + count: u64, + }, + D { + contents: String, + count: u64, + }, } - fn read(self) -> Option { - match self { - Self::EmptyEvent => Some(SchemaEventOld::EmptyEvent), - Self::EventA { contents } => Some(SchemaEventOld::EventA { contents }), - Self::EventB { count } => Some(SchemaEventOld::EventB { count }), - Self::EventC { .. } => panic!("not supported"), + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::C { count } => Schema::C { count }, + Event::D { contents, count } => Schema::D { contents, count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { .. } => None, + Self::C { count } => Some(Event::C { count }), + Self::D { contents, count } => Some(Event::D { contents, count }), + } } } } -impl esrs::store::postgres::Schema for Schema { - fn write(value: SchemaEventUpcasted) -> Self { - match value { - SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent, - SchemaEventUpcasted::EventB { count } => Schema::EventB { count }, - SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count }, - } +mod upcasting { + use super::Schema; + + #[derive(Default)] + pub enum Event { + #[default] + A, + B { + contents: String, + count: u64, + }, + C { + count: u64, + }, } - fn read(self) -> Option { - match self { - Schema::EmptyEvent => Some(SchemaEventUpcasted::EmptyEvent), - Schema::EventA { contents } => Some(SchemaEventUpcasted::EventA { contents, count: 1 }), - Schema::EventB { count } => Some(SchemaEventUpcasted::EventB { count }), - Schema::EventC { contents, count } => Some(SchemaEventUpcasted::EventA { contents, count }), + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::C { count } => Schema::C { count }, + Event::B { contents, count } => Schema::D { contents, count }, + } + } + + fn to_event(self) -> Option { + match self { + Schema::A => Some(Event::A), + Schema::B { contents } => Some(Event::B { contents, count: 1 }), + Schema::C { count } => Some(Event::C { count }), + Schema::D { contents, count } => Some(Event::B { contents, count }), + } } } } @@ -188,15 +173,15 @@ async fn main() { let aggregate_id: Uuid = Uuid::new_v4(); // Before the schema - let schemaless_store: PgStore> = + let schemaless_store: PgStore> = PgStoreBuilder::new(pool.clone()).try_build().await.unwrap(); let events = vec![ - SchemaEventNoSchema::EmptyEvent, - SchemaEventNoSchema::EventA { - contents: "this is soon to be deprecated".to_owned(), + before_schema::Event::A, + before_schema::Event::B { + contents: "hello world".to_owned(), }, - SchemaEventNoSchema::EventB { count: 42 }, + before_schema::Event::C { count: 42 }, ]; let mut state = AggregateState::with_id(aggregate_id); @@ -205,18 +190,18 @@ async fn main() { assert_eq!(events.len(), 3); // With schema before deprecation of EventA - let old_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) + let old_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) .with_schema::() .try_build() .await .unwrap(); let events = vec![ - SchemaEventOld::EmptyEvent, - SchemaEventOld::EventA { - contents: "this is soon to be deprecated".to_owned(), + after_schema::Event::A, + after_schema::Event::B { + contents: "goodbye world".to_owned(), }, - SchemaEventOld::EventB { count: 42 }, + after_schema::Event::C { count: 42 }, ]; let manager = AggregateManager::new(old_store.clone()); @@ -228,7 +213,7 @@ async fn main() { assert_eq!(events.len(), 6); // After deprecation of EventA and addition of EventC - let new_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) + let new_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) .with_schema::() .try_build() .await @@ -239,9 +224,9 @@ async fn main() { assert_eq!(events.len(), 4); let events = vec![ - SchemaEvent::EmptyEvent, - SchemaEvent::EventB { count: 42 }, - SchemaEvent::EventC { + with_deprecation::Event::A, + with_deprecation::Event::C { count: 42 }, + with_deprecation::Event::D { contents: "this is the new events".to_owned(), count: 21, }, @@ -257,7 +242,7 @@ async fn main() { assert_eq!(events.len(), 7); // After upcasting of EventA - let upcasting_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) + let upcasting_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) .with_schema::() .try_build() .await @@ -269,6 +254,6 @@ async fn main() { assert_eq!(events.len(), 9); // The events have been upcasted - assert!(matches!(events[1], SchemaEventUpcasted::EventA { count: 1, .. })); - assert!(matches!(events[4], SchemaEventUpcasted::EventA { count: 1, .. })); + assert!(matches!(events[1], upcasting::Event::B { count: 1, .. })); + assert!(matches!(events[4], upcasting::Event::B { count: 1, .. })); } diff --git a/src/sql/event.rs b/src/sql/event.rs index 22b6a734..e8316bed 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -26,9 +26,9 @@ impl DbEvent { S: Schema, { #[cfg(feature = "upcasting")] - let payload = S::upcast(self.payload, self.version)?.read(); + let payload = S::upcast(self.payload, self.version)?.to_event(); #[cfg(not(feature = "upcasting"))] - let payload = serde_json::from_value::(self.payload)?.read(); + let payload = serde_json::from_value::(self.payload)?.to_event(); Ok(match payload { None => None, diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index 41fc097c..44aac8e0 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -48,21 +48,21 @@ use crate::{Aggregate, AggregateState}; /// # impl esrs::event::Upcaster for Schema {} /// # /// # impl SchemaTrait for Schema { -/// # fn write(Event { a }: Event) -> Self { +/// # fn from_event(Event { a }: Event) -> Self { /// # Self { a } /// # } /// # -/// # fn read(self) -> Option { +/// # fn to_event(self) -> Option { /// # Some(Event { a: self.a }) /// # } /// # } /// # /// # let event = Event { a: 42 }; -/// assert_eq!(Some(event.clone()), Schema::write(event).read()); +/// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); /// ``` pub trait Schema: Event { /// Converts the event into the schema type. - fn write(event: E) -> Self; + fn from_event(event: E) -> Self; /// Converts the schema into the event type. /// @@ -90,29 +90,29 @@ pub trait Schema: Event { /// # impl esrs::event::Upcaster for Schema {} /// # /// # impl SchemaTrait for Schema { - /// # fn write(Event { a }: Event) -> Self { + /// # fn from_event(Event { a }: Event) -> Self { /// # Self { a } /// # } /// # - /// # fn read(self) -> Option { + /// # fn to_event(self) -> Option { /// # Some(Event { a: self.a }) /// # } /// # } /// # /// # let event = Event { a: 42 }; - /// assert_eq!(Some(event.clone()), Schema::write(event).read()); + /// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); /// ``` - fn read(self) -> Option; + fn to_event(self) -> Option; } impl Schema for E where E: Event, { - fn write(event: E) -> Self { + fn from_event(event: E) -> Self { event } - fn read(self) -> Option { + fn to_event(self) -> Option { Some(self) } } @@ -194,7 +194,7 @@ where let version: Option = S::current_version(); #[cfg(not(feature = "upcasting"))] let version: Option = None; - let schema = S::write(event); + let schema = S::from_event(event); let _ = sqlx::query(self.inner.statements.insert()) .bind(id) @@ -209,9 +209,9 @@ where Ok(StoreEvent { id, aggregate_id, - payload: schema.read().expect( + payload: schema.to_event().expect( "For any type that implements Schema the following contract should be upheld:\ - assert_eq!(Some(event.clone()), Schema::write(event).read())", + assert_eq!(Some(event.clone()), Schema::from_event(event).to_event())", ), occurred_on, sequence_number, From d9441d85b29d690af3c8a34cb52b0fe0b30ae478 Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 16:53:52 +0000 Subject: [PATCH 07/17] Refactor example into three seperate example files --- examples/schema/adding_schema.rs | 184 ++++++++++++++++++ examples/schema/aggregate.rs | 36 ---- examples/schema/deprecating_events.rs | 219 ++++++++++++++++++++++ examples/schema/main.rs | 260 ++------------------------ examples/schema/upcasting.rs | 215 +++++++++++++++++++++ src/store/postgres/builder.rs | 4 +- 6 files changed, 632 insertions(+), 286 deletions(-) create mode 100644 examples/schema/adding_schema.rs delete mode 100644 examples/schema/aggregate.rs create mode 100644 examples/schema/deprecating_events.rs create mode 100644 examples/schema/upcasting.rs diff --git a/examples/schema/adding_schema.rs b/examples/schema/adding_schema.rs new file mode 100644 index 00000000..5c864566 --- /dev/null +++ b/examples/schema/adding_schema.rs @@ -0,0 +1,184 @@ +//! This example demonstrates how it is possible to add a schema to a store. +//! +//! The module `before_schema` represents the code in the system before the introduction of the +//! schema. The first part of the example shows the state of the system before the schema is +//! introduced. +//! +//! The module `after_schema` represents the code in the system after the introduction of the +//! schema. Similarly, the second part of the example shows how the to setup the PgStore in this +//! state and how the previously persisted events are visible via the schema. +//! +//! Note that the introduction of the schema removes the need for `Aggregate::Event` to implement +//! `Event` instead relies on the implementation of the `Schema` and the fact that `Schema` +//! implements `Event`. + +use esrs::manager::AggregateManager; +use serde::{Deserialize, Serialize}; + +use sqlx::PgPool; +use uuid::Uuid; + +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::AggregateState; + +use crate::common::CommonError; + +pub(crate) enum Command {} + +mod before_schema { + //! This module represents the code of the initial iteration of the system before the + //! introduction of the schema. + use super::*; + pub(crate) struct Aggregate; + + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "introducing_schema"; + type State = SchemaState; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct SchemaState { + pub(crate) events: Vec, + } + + // Required only before the schema is introduced, after the schema is introduced + // we do not need to make the type serializable. See other schema examples. + #[derive(Deserialize, Serialize)] + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Event {} +} + +mod after_schema { + //! This module represents the code after the introduction of the schema. + use super::*; + pub(crate) struct Aggregate; + + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "introducing_schema"; + type State = SchemaState; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct SchemaState { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + } + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + } + } + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} +} + +pub(crate) async fn example(pool: PgPool) { + let aggregate_id: Uuid = Uuid::new_v4(); + + // Initial state of system before introduction of schema + { + use before_schema::{Aggregate, Event}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()).try_build().await.unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "hello world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + } + + // After introducing Schema + { + use after_schema::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "goodbye world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let manager = AggregateManager::new(store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = store.persist(&mut state, events).await.unwrap(); + + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + assert_eq!(events.len(), 6); + } +} diff --git a/examples/schema/aggregate.rs b/examples/schema/aggregate.rs deleted file mode 100644 index 1205fc86..00000000 --- a/examples/schema/aggregate.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::marker::PhantomData; - -use esrs::Aggregate; - -use crate::common::CommonError; - -pub(crate) struct SchemaAggregate(PhantomData); - -impl Aggregate for SchemaAggregate -where - EventType: Default, -{ - const NAME: &'static str = "schema"; - type State = SchemaState; - type Command = SchemaCommand; - type Event = EventType; - type Error = CommonError; - - fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { - match command {} - } - - fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { - let mut events = state.events; - events.push(payload); - - Self::State { events } - } -} - -#[derive(Default)] -pub(crate) struct SchemaState { - pub(crate) events: Vec, -} - -pub(crate) enum SchemaCommand {} diff --git a/examples/schema/deprecating_events.rs b/examples/schema/deprecating_events.rs new file mode 100644 index 00000000..ee92e9be --- /dev/null +++ b/examples/schema/deprecating_events.rs @@ -0,0 +1,219 @@ +//! This example demonstrates how it is possible to deprecate an event using the Schema mechanism +//! +//! The module `before_deprecation` represents the code in the system before the deprecation of an +//! event. The first part of the example shows the state of the system before the event is +//! deprecated. +//! +//! The module `after_deprecation` represents the code in the system after the deprecation of an +//! schema. Similarly, the second part of the example shows how the deprecation of the event will +//! play out in a running system. +//! +//! Note the only place that still required to reference the deprecated event is in the schema type +//! itself, the rest of the system can simply operate as if it never existed. + +use esrs::manager::AggregateManager; +use serde::{Deserialize, Serialize}; + +use sqlx::PgPool; +use uuid::Uuid; + +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::AggregateState; + +use crate::common::CommonError; + +pub(crate) enum Command {} + +mod before_deprecation { + //! This module represents the code of the initial iteration of the system. + use super::*; + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_deprecation"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + } + } + } +} + +mod after_deprecation { + //! This module represents the code after `Event::B` has been deprecated and a new event + //! (`Event::C`) has been introduced + use super::*; + + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_deprecation"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + // Event B does not live here + pub enum Event { + A, + C { count: u64 }, + D { contents: String, count: u64 }, + } + + // Event B exists here just for deserialization + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + D { contents: String, count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::C { count } => Schema::C { count }, + Event::D { contents, count } => Schema::D { contents, count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { .. } => None, + Self::C { count } => Some(Event::C { count }), + Self::D { contents, count } => Some(Event::D { contents, count }), + } + } + } +} + +pub(crate) async fn example(pool: PgPool) { + let aggregate_id: Uuid = Uuid::new_v4(); + + // Initial state of system + { + use before_deprecation::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "goodbye world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + } + + // After deprecation + { + use after_deprecation::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = store.by_aggregate_id(aggregate_id).await.unwrap(); + + assert_eq!(events.len(), 2); + + let events = vec![ + Event::A, + Event::C { count: 42 }, + Event::D { + contents: "this is the new events".to_owned(), + count: 21, + }, + ]; + + let manager = AggregateManager::new(store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = store.persist(&mut state, events).await.unwrap(); + + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + // The deprecated events are skipped + assert_eq!(events.len(), 5); + } +} diff --git a/examples/schema/main.rs b/examples/schema/main.rs index bfbde9a6..93c7629d 100644 --- a/examples/schema/main.rs +++ b/examples/schema/main.rs @@ -1,259 +1,23 @@ -//! This example demonstrates several stages of the evolution of a system. +//! These example demonstrates several ways a system can evolve through the use of the Schema +//! mechanism. //! -//! The first part for an event store with no schema (where the `Aggregate::Event` type implement -//! `Event`. -//! -//! The next evolution of the system is the introduction of the `Schema` to the event store. This -//! removes the need for `Aggregate::Event` to implement `Event` instead relies on the -//! `From` and `Into>` on `Schema` and the fact that -//! `Schema` implements event. -//! -//! The next evolution demonstrates how an event can be deprecated using the `Schema` mechanism. A -//! new event variant is introduced and one is deprecated. The use of the schema enables the rest -//! of the system to not have to concern itself with the deprecated events. The only place it still -//! exists is as a variant of the `Schema` type. -//! -//! The final part is an alternative way for the system to evolve using the `Schema` mechanism as a -//! way to upcast from one shape of and event to another. +//! - The first example demonstrates how to add a schema to an existing store. +//! - The second example demonstrates how to use the schema mechanism to deprecate certain event +//! types. +//! - The third example demonstrates how to use the schema mechanism to upcast events. -use esrs::manager::AggregateManager; -use esrs::store::postgres::{PgStore, PgStoreBuilder}; -use esrs::store::EventStore; -use esrs::AggregateState; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -mod aggregate; +mod adding_schema; #[path = "../common/lib.rs"] mod common; +mod deprecating_events; +mod upcasting; use crate::common::util::new_pool; -use aggregate::SchemaAggregate; - -#[derive(Deserialize, Serialize)] -enum Schema { - A, - B { contents: String }, - C { count: u64 }, - D { contents: String, count: u64 }, -} - -#[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for Schema {} - -mod before_schema { - use serde::{Deserialize, Serialize}; - - #[derive(Default, Deserialize, Serialize)] - pub enum Event { - #[default] - A, - B { - contents: String, - }, - C { - count: u64, - }, - } - - #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Event {} -} - -mod after_schema { - use super::Schema; - - #[derive(Default)] - pub enum Event { - #[default] - A, - B { - contents: String, - }, - C { - count: u64, - }, - } - - impl esrs::store::postgres::Schema for Schema { - fn from_event(value: Event) -> Self { - match value { - Event::A => Schema::A, - Event::B { contents } => Schema::B { contents }, - Event::C { count } => Schema::C { count }, - } - } - - fn to_event(self) -> Option { - match self { - Self::A => Some(Event::A), - Self::B { contents } => Some(Event::B { contents }), - Self::C { count } => Some(Event::C { count }), - Self::D { .. } => panic!("not supported"), - } - } - } -} - -mod with_deprecation { - use super::Schema; - - #[derive(Default)] - pub enum Event { - #[default] - A, - C { - count: u64, - }, - D { - contents: String, - count: u64, - }, - } - - impl esrs::store::postgres::Schema for Schema { - fn from_event(value: Event) -> Self { - match value { - Event::A => Schema::A, - Event::C { count } => Schema::C { count }, - Event::D { contents, count } => Schema::D { contents, count }, - } - } - - fn to_event(self) -> Option { - match self { - Self::A => Some(Event::A), - Self::B { .. } => None, - Self::C { count } => Some(Event::C { count }), - Self::D { contents, count } => Some(Event::D { contents, count }), - } - } - } -} - -mod upcasting { - use super::Schema; - - #[derive(Default)] - pub enum Event { - #[default] - A, - B { - contents: String, - count: u64, - }, - C { - count: u64, - }, - } - - impl esrs::store::postgres::Schema for Schema { - fn from_event(value: Event) -> Self { - match value { - Event::A => Schema::A, - Event::C { count } => Schema::C { count }, - Event::B { contents, count } => Schema::D { contents, count }, - } - } - - fn to_event(self) -> Option { - match self { - Schema::A => Some(Event::A), - Schema::B { contents } => Some(Event::B { contents, count: 1 }), - Schema::C { count } => Some(Event::C { count }), - Schema::D { contents, count } => Some(Event::B { contents, count }), - } - } - } -} #[tokio::main] async fn main() { let pool = new_pool().await; - let aggregate_id: Uuid = Uuid::new_v4(); - - // Before the schema - let schemaless_store: PgStore> = - PgStoreBuilder::new(pool.clone()).try_build().await.unwrap(); - - let events = vec![ - before_schema::Event::A, - before_schema::Event::B { - contents: "hello world".to_owned(), - }, - before_schema::Event::C { count: 42 }, - ]; - - let mut state = AggregateState::with_id(aggregate_id); - let events = schemaless_store.persist(&mut state, events).await.unwrap(); - - assert_eq!(events.len(), 3); - - // With schema before deprecation of EventA - let old_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) - .with_schema::() - .try_build() - .await - .unwrap(); - - let events = vec![ - after_schema::Event::A, - after_schema::Event::B { - contents: "goodbye world".to_owned(), - }, - after_schema::Event::C { count: 42 }, - ]; - - let manager = AggregateManager::new(old_store.clone()); - let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); - let _ = old_store.persist(&mut state, events).await.unwrap(); - - let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; - - assert_eq!(events.len(), 6); - - // After deprecation of EventA and addition of EventC - let new_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) - .with_schema::() - .try_build() - .await - .unwrap(); - - let events = new_store.by_aggregate_id(aggregate_id).await.unwrap(); - - assert_eq!(events.len(), 4); - - let events = vec![ - with_deprecation::Event::A, - with_deprecation::Event::C { count: 42 }, - with_deprecation::Event::D { - contents: "this is the new events".to_owned(), - count: 21, - }, - ]; - - let manager = AggregateManager::new(new_store.clone()); - let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); - let _ = new_store.persist(&mut state, events).await.unwrap(); - - let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; - - // The deprecated events are skipped - assert_eq!(events.len(), 7); - - // After upcasting of EventA - let upcasting_store: PgStore, Schema> = PgStoreBuilder::new(pool.clone()) - .with_schema::() - .try_build() - .await - .unwrap(); - let manager = AggregateManager::new(upcasting_store.clone()); - let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; - - // All the events are visible - assert_eq!(events.len(), 9); - - // The events have been upcasted - assert!(matches!(events[1], upcasting::Event::B { count: 1, .. })); - assert!(matches!(events[4], upcasting::Event::B { count: 1, .. })); + adding_schema::example(pool.clone()).await; + deprecating_events::example(pool.clone()).await; + upcasting::example(pool).await; } diff --git a/examples/schema/upcasting.rs b/examples/schema/upcasting.rs new file mode 100644 index 00000000..1d1b047a --- /dev/null +++ b/examples/schema/upcasting.rs @@ -0,0 +1,215 @@ +//! This example demonstrates how it is possible to use the schema mechanism for upcasting. +//! +//! The module `before_upcasting` represents the code in the system before the upcasting of an +//! event. The first part of the example shows the state of the system before the event is +//! upcasted. +//! +//! The module `after_upcasting` represents the code in the system after the upcasting of an +//! schema. Similarly, the second part of the example shows how the upcasting of the event will +//! play out in a running system. +//! +//! Note the only place that still required to reference the original shape of the event is in the +//! schema itself and the rest of the system can simply operate as if the event has always been of +//! this shape. + +use esrs::manager::AggregateManager; +use serde::{Deserialize, Serialize}; + +use sqlx::PgPool; +use uuid::Uuid; + +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::AggregateState; + +use crate::common::CommonError; + +pub(crate) enum Command {} + +mod before_upcasting { + //! This module represents the code of the initial iteration of the system. + use super::*; + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_upcasting"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + } + } + } +} + +mod after_upcasting { + //! This module represents the code after the upcasting has been implemented + use super::*; + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_upcasting"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String, count: u64 }, + C { count: u64 }, + } + + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + D { contents: String, count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::C { count } => Schema::C { count }, + Event::B { contents, count } => Schema::D { contents, count }, + } + } + + fn to_event(self) -> Option { + match self { + Schema::A => Some(Event::A), + Schema::B { contents } => Some(Event::B { contents, count: 1 }), + Schema::C { count } => Some(Event::C { count }), + Schema::D { contents, count } => Some(Event::B { contents, count }), + } + } + } +} + +pub(crate) async fn example(pool: PgPool) { + let aggregate_id: Uuid = Uuid::new_v4(); + + // Initial state of the system + { + use before_upcasting::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "goodbye world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + } + + // After upcasting before_upcasting::Event::B to after_upcasting::Event::B + { + use after_upcasting::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::C { count: 42 }, + Event::B { + contents: "this is the new events".to_owned(), + count: 21, + }, + ]; + + let manager = AggregateManager::new(store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = store.persist(&mut state, events).await.unwrap(); + + let persisted_events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + // All the events are visible + assert_eq!(persisted_events.len(), 6); + + // The events have been upcasted + assert!(matches!(persisted_events[1], Event::B { count: 1, .. })); + } +} diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 34e09766..24472de0 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -101,9 +101,9 @@ where } /// Set the schema of the underlying PgStore. - pub fn with_schema(self) -> PgStoreBuilder + pub fn with_schema(self) -> PgStoreBuilder where - NewSchema: Schema + Event + Send + Sync, + N: Schema + Event + Send + Sync, { PgStoreBuilder { pool: self.pool, From 3d86c46bb0ddcde8b00af087850fd1a90ec45208 Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 17:35:38 +0000 Subject: [PATCH 08/17] Rename Event -> Persistable and reorganize code --- examples/common/a.rs | 2 +- examples/common/b.rs | 2 +- examples/common/basic/mod.rs | 2 +- examples/readme/main.rs | 2 +- examples/saga/aggregate.rs | 2 +- examples/schema/adding_schema.rs | 8 +- examples/schema/deprecating_events.rs | 4 +- examples/schema/upcasting.rs | 4 +- examples/upcasting/a.rs | 2 +- examples/upcasting/b.rs | 2 +- src/event.rs | 31 -------- src/lib.rs | 1 - src/sql/event.rs | 34 ++++++++- src/sql/migrations.rs | 2 +- src/store/postgres/builder.rs | 4 +- src/store/postgres/event_store.rs | 103 +------------------------- src/store/postgres/mod.rs | 2 + src/store/postgres/schema.rs | 96 ++++++++++++++++++++++++ tests/aggregate/structs.rs | 2 +- 19 files changed, 153 insertions(+), 152 deletions(-) delete mode 100644 src/event.rs create mode 100644 src/store/postgres/schema.rs diff --git a/examples/common/a.rs b/examples/common/a.rs index 6b598f38..d134d10c 100644 --- a/examples/common/a.rs +++ b/examples/common/a.rs @@ -38,4 +38,4 @@ pub struct EventA { } #[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for EventA {} +impl esrs::sql::event::Upcaster for EventA {} diff --git a/examples/common/b.rs b/examples/common/b.rs index 305bf0b6..7a923a1c 100644 --- a/examples/common/b.rs +++ b/examples/common/b.rs @@ -38,4 +38,4 @@ pub struct EventB { } #[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for EventB {} +impl esrs::sql::event::Upcaster for EventB {} diff --git a/examples/common/basic/mod.rs b/examples/common/basic/mod.rs index 55b1e1b2..f08dd810 100644 --- a/examples/common/basic/mod.rs +++ b/examples/common/basic/mod.rs @@ -38,7 +38,7 @@ pub struct BasicEvent { } #[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for BasicEvent {} +impl esrs::sql::event::Upcaster for BasicEvent {} #[allow(dead_code)] #[derive(Debug, thiserror::Error)] diff --git a/examples/readme/main.rs b/examples/readme/main.rs index 62e2c007..14cf4882 100644 --- a/examples/readme/main.rs +++ b/examples/readme/main.rs @@ -99,7 +99,7 @@ pub enum BookEvent { } #[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for BookEvent {} +impl esrs::sql::event::Upcaster for BookEvent {} #[derive(Debug, Error)] pub enum BookError { diff --git a/examples/saga/aggregate.rs b/examples/saga/aggregate.rs index 9d901102..a289b427 100644 --- a/examples/saga/aggregate.rs +++ b/examples/saga/aggregate.rs @@ -36,4 +36,4 @@ pub enum SagaEvent { } #[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for SagaEvent {} +impl esrs::sql::event::Upcaster for SagaEvent {} diff --git a/examples/schema/adding_schema.rs b/examples/schema/adding_schema.rs index 5c864566..bf91521f 100644 --- a/examples/schema/adding_schema.rs +++ b/examples/schema/adding_schema.rs @@ -9,8 +9,8 @@ //! state and how the previously persisted events are visible via the schema. //! //! Note that the introduction of the schema removes the need for `Aggregate::Event` to implement -//! `Event` instead relies on the implementation of the `Schema` and the fact that `Schema` -//! implements `Event`. +//! `Persistable` instead relies on the implementation of `Schema` and the fact that `Schema` +//! implements `Persistable`. use esrs::manager::AggregateManager; use serde::{Deserialize, Serialize}; @@ -66,7 +66,7 @@ mod before_schema { } #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Event {} + impl esrs::sql::event::Upcaster for Event {} } mod after_schema { @@ -129,7 +129,7 @@ mod after_schema { } #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Schema {} + impl esrs::sql::event::Upcaster for Schema {} } pub(crate) async fn example(pool: PgPool) { diff --git a/examples/schema/deprecating_events.rs b/examples/schema/deprecating_events.rs index ee92e9be..5a56fcbc 100644 --- a/examples/schema/deprecating_events.rs +++ b/examples/schema/deprecating_events.rs @@ -67,7 +67,7 @@ mod before_deprecation { } #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Schema {} + impl esrs::sql::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { @@ -135,7 +135,7 @@ mod after_deprecation { } #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Schema {} + impl esrs::sql::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { diff --git a/examples/schema/upcasting.rs b/examples/schema/upcasting.rs index 1d1b047a..e752b4d7 100644 --- a/examples/schema/upcasting.rs +++ b/examples/schema/upcasting.rs @@ -68,7 +68,7 @@ mod before_upcasting { } #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Schema {} + impl esrs::sql::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { @@ -132,7 +132,7 @@ mod after_upcasting { } #[cfg(feature = "upcasting")] - impl esrs::event::Upcaster for Schema {} + impl esrs::sql::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { diff --git a/examples/upcasting/a.rs b/examples/upcasting/a.rs index e51588e5..b65f0544 100644 --- a/examples/upcasting/a.rs +++ b/examples/upcasting/a.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; -use esrs::event::Upcaster; +use esrs::sql::event::Upcaster; use esrs::Aggregate; use crate::{Command, Error}; diff --git a/examples/upcasting/b.rs b/examples/upcasting/b.rs index 9fe56f0b..d0b1539a 100644 --- a/examples/upcasting/b.rs +++ b/examples/upcasting/b.rs @@ -3,7 +3,7 @@ use std::convert::{TryFrom, TryInto}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use esrs::event::Upcaster; +use esrs::sql::event::Upcaster; use esrs::Aggregate; use crate::{Command, Error}; diff --git a/src/event.rs b/src/event.rs deleted file mode 100644 index fd9c8a16..00000000 --- a/src/event.rs +++ /dev/null @@ -1,31 +0,0 @@ -use serde::de::DeserializeOwned; -use serde::Serialize; - -#[cfg(not(feature = "upcasting"))] -pub trait Event: Serialize + DeserializeOwned {} - -#[cfg(not(feature = "upcasting"))] -impl Event for T where T: Serialize + DeserializeOwned {} - -#[cfg(feature = "upcasting")] -pub trait Event: Serialize + DeserializeOwned + Upcaster {} - -#[cfg(feature = "upcasting")] -impl Event for T where T: Serialize + DeserializeOwned + Upcaster {} - -#[cfg(feature = "upcasting")] -pub trait Upcaster -where - Self: Sized, -{ - fn upcast(value: serde_json::Value, _version: Option) -> Result - where - Self: DeserializeOwned, - { - serde_json::from_value(value) - } - - fn current_version() -> Option { - None - } -} diff --git a/src/lib.rs b/src/lib.rs index 38d125ab..de7200c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,6 @@ mod aggregate; mod state; pub mod bus; -pub mod event; pub mod handler; pub mod manager; pub mod store; diff --git a/src/sql/event.rs b/src/sql/event.rs index e8316bed..8fd100b1 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -4,10 +4,40 @@ use chrono::{DateTime, Utc}; use serde_json::Value; use uuid::Uuid; -use crate::event::Event; use crate::store::postgres::Schema; use crate::store::StoreEvent; use crate::types::SequenceNumber; +use serde::de::DeserializeOwned; +use serde::Serialize; + +#[cfg(feature = "upcasting")] +pub trait Upcaster +where + Self: Sized, +{ + fn upcast(value: serde_json::Value, _version: Option) -> Result + where + Self: DeserializeOwned, + { + serde_json::from_value(value) + } + + fn current_version() -> Option { + None + } +} + +#[cfg(not(feature = "upcasting"))] +pub trait Persistable: Serialize + DeserializeOwned {} + +#[cfg(not(feature = "upcasting"))] +impl Persistable for T where T: Serialize + DeserializeOwned {} + +#[cfg(feature = "upcasting")] +pub trait Persistable: Serialize + DeserializeOwned + Upcaster {} + +#[cfg(feature = "upcasting")] +impl Persistable for T where T: Serialize + DeserializeOwned + Upcaster {} /// Event representation on the event store #[derive(sqlx::FromRow, serde::Serialize, serde::Deserialize, Debug)] @@ -44,7 +74,7 @@ impl DbEvent { } } -impl TryInto> for DbEvent { +impl TryInto> for DbEvent { type Error = serde_json::Error; fn try_into(self) -> Result, Self::Error> { diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 48ed2e8d..314911f7 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -63,7 +63,7 @@ mod tests { pub struct TestEvent; #[cfg(feature = "upcasting")] - impl crate::event::Upcaster for TestEvent {} + impl crate::sql::event::Upcaster for TestEvent {} impl Aggregate for TestAggregate { const NAME: &'static str = "test"; diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 24472de0..777ca3f0 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -5,8 +5,8 @@ use sqlx::{PgConnection, Pool, Postgres}; use tokio::sync::RwLock; use crate::bus::EventBus; -use crate::event::Event; use crate::handler::{EventHandler, TransactionalEventHandler}; +use crate::sql::event::Persistable; use crate::sql::migrations::{Migrations, MigrationsHandler}; use crate::sql::statements::{Statements, StatementsHandler}; use crate::store::postgres::{InnerPgStore, PgStoreError}; @@ -103,7 +103,7 @@ where /// Set the schema of the underlying PgStore. pub fn with_schema(self) -> PgStoreBuilder where - N: Schema + Event + Send + Sync, + N: Schema + Persistable + Send + Sync, { PgStoreBuilder { pool: self.pool, diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index 44aac8e0..3860a943 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -13,110 +13,15 @@ use tokio::sync::RwLock; use uuid::Uuid; use crate::bus::EventBus; -use crate::event::Event; use crate::handler::{EventHandler, TransactionalEventHandler}; -use crate::sql::event::DbEvent; +use crate::sql::event::{DbEvent, Persistable}; use crate::sql::statements::{Statements, StatementsHandler}; use crate::store::postgres::PgStoreError; +use crate::store::postgres::Schema; use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; use crate::types::SequenceNumber; use crate::{Aggregate, AggregateState}; -/// To support decoupling between the Aggregate::Event type and the schema of the DB table -/// in `PgStore` you can create a schema type that implements `Event` and `Schema` -/// where `E = Aggregate::Event`. -/// -/// Note: Although `Schema::read` returns an `Option` for any given event and implementation. -/// -/// The following must hold -/// -/// ```rust -/// # use serde::{Serialize, Deserialize}; -/// # use esrs::store::postgres::Schema as SchemaTrait; -/// # -/// # #[derive(Clone, Eq, PartialEq, Debug)] -/// # struct Event { -/// # a: u32, -/// # } -/// # -/// # #[derive(Serialize, Deserialize)] -/// # struct Schema { -/// # a: u32, -/// # } -/// # -/// # #[cfg(feature = "upcasting")] -/// # impl esrs::event::Upcaster for Schema {} -/// # -/// # impl SchemaTrait for Schema { -/// # fn from_event(Event { a }: Event) -> Self { -/// # Self { a } -/// # } -/// # -/// # fn to_event(self) -> Option { -/// # Some(Event { a: self.a }) -/// # } -/// # } -/// # -/// # let event = Event { a: 42 }; -/// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); -/// ``` -pub trait Schema: Event { - /// Converts the event into the schema type. - fn from_event(event: E) -> Self; - - /// Converts the schema into the event type. - /// - /// This returns an option to enable skipping deprecated event which are persisted in the DB. - /// - /// Note: Although `Schema::read` returns an `Option` for any given event and implementation. - /// - /// The following must hold - /// - /// ```rust - /// # use serde::{Serialize, Deserialize}; - /// # use esrs::store::postgres::Schema as SchemaTrait; - /// # - /// # #[derive(Clone, Eq, PartialEq, Debug)] - /// # struct Event { - /// # a: u32, - /// # } - /// # - /// # #[derive(Serialize, Deserialize)] - /// # struct Schema { - /// # a: u32, - /// # } - /// # - /// # #[cfg(feature = "upcasting")] - /// # impl esrs::event::Upcaster for Schema {} - /// # - /// # impl SchemaTrait for Schema { - /// # fn from_event(Event { a }: Event) -> Self { - /// # Self { a } - /// # } - /// # - /// # fn to_event(self) -> Option { - /// # Some(Event { a: self.a }) - /// # } - /// # } - /// # - /// # let event = Event { a: 42 }; - /// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); - /// ``` - fn to_event(self) -> Option; -} - -impl Schema for E -where - E: Event, -{ - fn from_event(event: E) -> Self { - event - } - fn to_event(self) -> Option { - Some(self) - } -} - /// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a /// pre-made implementation of an [`EventStore`] persisting on Postgres. /// @@ -157,7 +62,7 @@ impl PgStore where A: Aggregate, A::Event: Send + Sync, - S: Schema + Event + Send + Sync, + S: Schema + Persistable + Send + Sync, { /// Returns the name of the event store table pub fn table_name(&self) -> &str { @@ -256,7 +161,7 @@ where A: Aggregate, A::State: Send, A::Event: Send + Sync, - S: Schema + Event + Send + Sync, + S: Schema + Persistable + Send + Sync, { type Aggregate = A; type Error = PgStoreError; diff --git a/src/store/postgres/mod.rs b/src/store/postgres/mod.rs index 3f55112a..8014fc66 100644 --- a/src/store/postgres/mod.rs +++ b/src/store/postgres/mod.rs @@ -1,8 +1,10 @@ pub use builder::*; pub use event_store::*; +pub use schema::*; mod builder; mod event_store; +mod schema; // Trait aliases are experimental. See issue #41517 // trait PgTransactionalEventHandler = TransactionalEventHandler where A: Aggregate; diff --git a/src/store/postgres/schema.rs b/src/store/postgres/schema.rs new file mode 100644 index 00000000..88743942 --- /dev/null +++ b/src/store/postgres/schema.rs @@ -0,0 +1,96 @@ +use crate::sql::event::Persistable; + +/// To support decoupling between the `Aggregate::Event` type and the schema of the DB table +/// in `PgStore` you can create a schema type that implements `Persistable` and `Schema` +/// where `E = Aggregate::Event`. +/// +/// Note: Although `Schema::read` returns an `Option` for any given event and implementation. +/// +/// The following must hold +/// +/// ```rust +/// # use serde::{Serialize, Deserialize}; +/// # use esrs::store::postgres::Schema as SchemaTrait; +/// # +/// # #[derive(Clone, Eq, PartialEq, Debug)] +/// # struct Event { +/// # a: u32, +/// # } +/// # +/// # #[derive(Serialize, Deserialize)] +/// # struct Schema { +/// # a: u32, +/// # } +/// # +/// # #[cfg(feature = "upcasting")] +/// # impl esrs::sql::event::Upcaster for Schema {} +/// # +/// # impl SchemaTrait for Schema { +/// # fn from_event(Event { a }: Event) -> Self { +/// # Self { a } +/// # } +/// # +/// # fn to_event(self) -> Option { +/// # Some(Event { a: self.a }) +/// # } +/// # } +/// # +/// # let event = Event { a: 42 }; +/// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); +/// ``` +pub trait Schema: Persistable { + /// Converts the event into the schema type. + fn from_event(event: E) -> Self; + + /// Converts the schema into the event type. + /// + /// This returns an option to enable skipping deprecated event which are persisted in the DB. + /// + /// Note: Although `Schema::read` returns an `Option` for any given event and implementation. + /// + /// The following must hold + /// + /// ```rust + /// # use serde::{Serialize, Deserialize}; + /// # use esrs::store::postgres::Schema as SchemaTrait; + /// # + /// # #[derive(Clone, Eq, PartialEq, Debug)] + /// # struct Event { + /// # a: u32, + /// # } + /// # + /// # #[derive(Serialize, Deserialize)] + /// # struct Schema { + /// # a: u32, + /// # } + /// # + /// # #[cfg(feature = "upcasting")] + /// # impl esrs::sql::event::Upcaster for Schema {} + /// # + /// # impl SchemaTrait for Schema { + /// # fn from_event(Event { a }: Event) -> Self { + /// # Self { a } + /// # } + /// # + /// # fn to_event(self) -> Option { + /// # Some(Event { a: self.a }) + /// # } + /// # } + /// # + /// # let event = Event { a: 42 }; + /// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); + /// ``` + fn to_event(self) -> Option; +} + +impl Schema for E +where + E: Persistable, +{ + fn from_event(event: E) -> Self { + event + } + fn to_event(self) -> Option { + Some(self) + } +} diff --git a/tests/aggregate/structs.rs b/tests/aggregate/structs.rs index 08b481d7..4db8ac3c 100644 --- a/tests/aggregate/structs.rs +++ b/tests/aggregate/structs.rs @@ -11,7 +11,7 @@ pub struct TestEvent { } #[cfg(feature = "upcasting")] -impl esrs::event::Upcaster for TestEvent {} +impl esrs::sql::event::Upcaster for TestEvent {} #[derive(Debug, thiserror::Error)] pub enum TestError {} From 2edf45a342e93ba5bce5cd22cc6fc77f039bc489 Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 17:36:12 +0000 Subject: [PATCH 09/17] Add Schema support to PgRebuilder --- src/rebuilder/pg_rebuilder.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/rebuilder/pg_rebuilder.rs b/src/rebuilder/pg_rebuilder.rs index 57ee1715..d2b2d0de 100644 --- a/src/rebuilder/pg_rebuilder.rs +++ b/src/rebuilder/pg_rebuilder.rs @@ -1,23 +1,26 @@ +use std::marker::PhantomData; + use async_trait::async_trait; use futures::StreamExt; use sqlx::{PgConnection, Pool, Postgres, Transaction}; use uuid::Uuid; use crate::bus::EventBus; -use crate::event::Event; use crate::handler::{ReplayableEventHandler, TransactionalEventHandler}; use crate::rebuilder::Rebuilder; -use crate::store::postgres::{PgStore, PgStoreBuilder, PgStoreError}; +use crate::sql::event::Persistable; +use crate::store::postgres::{PgStore, PgStoreBuilder, PgStoreError, Schema}; use crate::store::{EventStore, StoreEvent}; use crate::Aggregate; -pub struct PgRebuilder +pub struct PgRebuilder::Event> where A: Aggregate, { event_handlers: Vec + Send>>, transactional_event_handlers: Vec + Send>>, event_buses: Vec + Send>>, + _schema: PhantomData, } impl PgRebuilder @@ -56,16 +59,18 @@ where event_handlers: vec![], transactional_event_handlers: vec![], event_buses: vec![], + _schema: PhantomData, } } } #[async_trait] -impl Rebuilder for PgRebuilder +impl Rebuilder for PgRebuilder where A: Aggregate, - A::Event: Event + Send + Sync, A::State: Send, + A::Event: Send + Sync, + S: Schema + Persistable + Send + Sync, { type Executor = Pool; type Error = PgStoreError; @@ -77,8 +82,9 @@ where /// events is processed by the mentioned handlers. /// Finally the events are passed to every configured [`EventBus`]. async fn by_aggregate_id(&self, pool: Pool) -> Result<(), Self::Error> { - let store: PgStore = PgStoreBuilder::new(pool.clone()) + let store: PgStore = PgStoreBuilder::new(pool.clone()) .without_running_migrations() + .with_schema::() .try_build() .await?; @@ -122,7 +128,8 @@ where /// events are handled. After the transaction ends, for each [`crate::handler::EventHandler`] /// and [`EventBus`], the events are handled. async fn all_at_once(&self, pool: Pool) -> Result<(), Self::Error> { - let store: PgStore = PgStoreBuilder::new(pool.clone()) + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() .without_running_migrations() .try_build() .await?; From 6d68f7baf5cd6338e87b12ef70cbe25c4661f2fe Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 17:45:44 +0000 Subject: [PATCH 10/17] Update read me with details of using Schema --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 5059a7d6..7499f0e4 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,14 @@ store.persist(&mut state, events).await?; To alleviate the burden of writing all of this code, you can leverage the `AggregateManager` helper. An `AggregateManager` could be considered as a synchronous `CommandBus`. +##### Decoupling `Aggregate::Event` from the database using `Schema` + +To avoid strong coupling between the domain events represented by `Aggregate::Event` and the persistence layer. It is possible to introduce a `Schema` type on the `PgStore`. + +This type must implement `Schema` and `Persistable`. The mechanism enables the domain events to evolve more freely. For example, it is possible to deprecate an event variant making use of the schema (see [deprecating events example](examples/schema/deprecating_events.rs)). Additionally, this mechanism can be used as an alternative for upcasting (see [upcasting example](examples/schema/upcasting.rs)). + +For an example of how to introduce a schema to an existing application see [introducing schema example](examples/schema/adding_schema.rs). + ```rust let manager: AggregateManager<_> = AggregateManager::new(store); manager.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 }).await From 4a9077bb47bf57c5fdb4961f2bedc0476d0803bb Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 18:30:44 +0000 Subject: [PATCH 11/17] Remove allow clippy lint and specify rust toolchain for CI --- .clippy.toml | 2 +- .github/workflows/ci.yml | 6 ++++++ Cargo.toml | 2 +- src/store/postgres/event_store.rs | 4 ++-- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/.clippy.toml b/.clippy.toml index 48204101..3b9db9df 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1 +1 @@ -msrv = "1.58.0" +msrv = "1.74.0" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1f04da56..f79f1437 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,6 +12,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - run: | + rustup override set 1.74.0 + rustup component add rustfmt + rustup component add clippy + rustup component add rust-docs - uses: Swatinem/rust-cache@v2 - uses: taiki-e/install-action@cargo-make - run: cargo make fmt-check @@ -25,6 +30,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - run: rustup override set 1.74.0 - uses: Swatinem/rust-cache@v2 - uses: taiki-e/install-action@cargo-make - name: Add hosts entries diff --git a/Cargo.toml b/Cargo.toml index 6418fe2a..f6f8b118 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" name = "esrs" readme = "README.md" repository = "https://github.com/primait/event_sourcing.rs" -rust-version = "1.58.0" +rust-version = "1.74.0" version = "0.14.0" [package.metadata.docs.rs] diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index 3860a943..fa727b32 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -189,8 +189,8 @@ where .collect::>, Self::Error>>()?) } - // Note: https://github.com/rust-lang/rust-clippy/issues/12281 - #[allow(clippy::blocks_in_conditions)] + // Clippy introduced `blocks_in_conditions` lint. With certain version of rust and tracing this + // line throws an error see: https://github.com/rust-lang/rust-clippy/issues/12281 #[tracing::instrument(skip_all, fields(aggregate_id = % aggregate_state.id()), err)] async fn persist( &self, From 608b9093b6765fa4ced678ff85fa2d9d0c33521f Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 21:10:37 +0000 Subject: [PATCH 12/17] Update library version number --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f6f8b118..4b91d1b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ name = "esrs" readme = "README.md" repository = "https://github.com/primait/event_sourcing.rs" rust-version = "1.74.0" -version = "0.14.0" +version = "0.15.0" [package.metadata.docs.rs] all-features = true From 15efc7602f6b89cde317e58cfd6342c158ee72fe Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 27 Mar 2024 22:01:07 +0000 Subject: [PATCH 13/17] Fix documentation links and naming of methods --- src/store/postgres/event_store.rs | 15 ++++++++------- src/store/postgres/schema.rs | 8 ++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index fa727b32..cb1e80b5 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -29,15 +29,16 @@ use crate::{Aggregate, AggregateState}; /// reference. /// /// To decouple persistence from the event types, it is possible to optionally, specify the -/// Database event schema for this store as a type that implements `Event` and -/// `Schema`. +/// Database event schema for this store as a type that implements [`Persistable`] and +/// [`Schema`]. /// -/// When events are persisted, they will first be converted to the `Schema` type using -/// `Schema::write` then serialized using the `Serialize` implementation on `Schema`. +/// When events are persisted, they will first be converted to the schema type using +/// [`Schema::from_event`] then serialized using the [`serde::Serialize`] implementation on schema. /// -/// When events are read from the store, they will first be deserialized into the `Schema` type and -/// then converted into an `Option` using `Schema::read`. In this way it is possible -/// to remove deprecate events in core part of your application by returning `None` from `Schema::read`. +/// When events are read from the store, they will first be deserialized into the schema type and +/// then converted into an [`Option`] using [`Schema::from_event`]. In this way +/// it is possible to remove deprecate events in core part of your application by returning [`None`] +/// from [`Schema::from_event`]. pub struct PgStore::Event> where A: Aggregate, diff --git a/src/store/postgres/schema.rs b/src/store/postgres/schema.rs index 88743942..c0f24f8f 100644 --- a/src/store/postgres/schema.rs +++ b/src/store/postgres/schema.rs @@ -1,10 +1,10 @@ use crate::sql::event::Persistable; -/// To support decoupling between the `Aggregate::Event` type and the schema of the DB table -/// in `PgStore` you can create a schema type that implements `Persistable` and `Schema` +/// To support decoupling between the [`crate::Aggregate::Event`] type and the schema of the DB table +/// in [`super::PgStore`] you can create a schema type that implements [`Persistable`] and [`Schema`] /// where `E = Aggregate::Event`. /// -/// Note: Although `Schema::read` returns an `Option` for any given event and implementation. +/// Note: Although [`Schema::to_event`] returns an [`Option`] for any given event and implementation. /// /// The following must hold /// @@ -46,7 +46,7 @@ pub trait Schema: Persistable { /// /// This returns an option to enable skipping deprecated event which are persisted in the DB. /// - /// Note: Although `Schema::read` returns an `Option` for any given event and implementation. + /// Note: Although this function returns an [`Option`] for any given event and implementation. /// /// The following must hold /// From 0b165f82db21ec6e5a790279ddb1b80992596544 Mon Sep 17 00:00:00 2001 From: John Bell Date: Thu, 28 Mar 2024 11:55:35 +0000 Subject: [PATCH 14/17] Return Upcaster to original location --- examples/common/a.rs | 2 +- examples/common/b.rs | 2 +- examples/common/basic/mod.rs | 2 +- examples/readme/main.rs | 2 +- examples/saga/aggregate.rs | 2 +- examples/schema/adding_schema.rs | 4 ++-- examples/schema/deprecating_events.rs | 4 ++-- examples/schema/upcasting.rs | 4 ++-- examples/upcasting/a.rs | 2 +- examples/upcasting/b.rs | 2 +- src/event.rs | 17 +++++++++++++++++ src/lib.rs | 2 ++ src/sql/event.rs | 21 ++------------------- src/sql/migrations.rs | 2 +- src/store/postgres/schema.rs | 4 ++-- tests/aggregate/structs.rs | 2 +- 16 files changed, 38 insertions(+), 36 deletions(-) create mode 100644 src/event.rs diff --git a/examples/common/a.rs b/examples/common/a.rs index d134d10c..6b598f38 100644 --- a/examples/common/a.rs +++ b/examples/common/a.rs @@ -38,4 +38,4 @@ pub struct EventA { } #[cfg(feature = "upcasting")] -impl esrs::sql::event::Upcaster for EventA {} +impl esrs::event::Upcaster for EventA {} diff --git a/examples/common/b.rs b/examples/common/b.rs index 7a923a1c..305bf0b6 100644 --- a/examples/common/b.rs +++ b/examples/common/b.rs @@ -38,4 +38,4 @@ pub struct EventB { } #[cfg(feature = "upcasting")] -impl esrs::sql::event::Upcaster for EventB {} +impl esrs::event::Upcaster for EventB {} diff --git a/examples/common/basic/mod.rs b/examples/common/basic/mod.rs index f08dd810..55b1e1b2 100644 --- a/examples/common/basic/mod.rs +++ b/examples/common/basic/mod.rs @@ -38,7 +38,7 @@ pub struct BasicEvent { } #[cfg(feature = "upcasting")] -impl esrs::sql::event::Upcaster for BasicEvent {} +impl esrs::event::Upcaster for BasicEvent {} #[allow(dead_code)] #[derive(Debug, thiserror::Error)] diff --git a/examples/readme/main.rs b/examples/readme/main.rs index 14cf4882..62e2c007 100644 --- a/examples/readme/main.rs +++ b/examples/readme/main.rs @@ -99,7 +99,7 @@ pub enum BookEvent { } #[cfg(feature = "upcasting")] -impl esrs::sql::event::Upcaster for BookEvent {} +impl esrs::event::Upcaster for BookEvent {} #[derive(Debug, Error)] pub enum BookError { diff --git a/examples/saga/aggregate.rs b/examples/saga/aggregate.rs index a289b427..9d901102 100644 --- a/examples/saga/aggregate.rs +++ b/examples/saga/aggregate.rs @@ -36,4 +36,4 @@ pub enum SagaEvent { } #[cfg(feature = "upcasting")] -impl esrs::sql::event::Upcaster for SagaEvent {} +impl esrs::event::Upcaster for SagaEvent {} diff --git a/examples/schema/adding_schema.rs b/examples/schema/adding_schema.rs index bf91521f..8c007151 100644 --- a/examples/schema/adding_schema.rs +++ b/examples/schema/adding_schema.rs @@ -66,7 +66,7 @@ mod before_schema { } #[cfg(feature = "upcasting")] - impl esrs::sql::event::Upcaster for Event {} + impl esrs::event::Upcaster for Event {} } mod after_schema { @@ -129,7 +129,7 @@ mod after_schema { } #[cfg(feature = "upcasting")] - impl esrs::sql::event::Upcaster for Schema {} + impl esrs::event::Upcaster for Schema {} } pub(crate) async fn example(pool: PgPool) { diff --git a/examples/schema/deprecating_events.rs b/examples/schema/deprecating_events.rs index 5a56fcbc..ee92e9be 100644 --- a/examples/schema/deprecating_events.rs +++ b/examples/schema/deprecating_events.rs @@ -67,7 +67,7 @@ mod before_deprecation { } #[cfg(feature = "upcasting")] - impl esrs::sql::event::Upcaster for Schema {} + impl esrs::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { @@ -135,7 +135,7 @@ mod after_deprecation { } #[cfg(feature = "upcasting")] - impl esrs::sql::event::Upcaster for Schema {} + impl esrs::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { diff --git a/examples/schema/upcasting.rs b/examples/schema/upcasting.rs index e752b4d7..1d1b047a 100644 --- a/examples/schema/upcasting.rs +++ b/examples/schema/upcasting.rs @@ -68,7 +68,7 @@ mod before_upcasting { } #[cfg(feature = "upcasting")] - impl esrs::sql::event::Upcaster for Schema {} + impl esrs::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { @@ -132,7 +132,7 @@ mod after_upcasting { } #[cfg(feature = "upcasting")] - impl esrs::sql::event::Upcaster for Schema {} + impl esrs::event::Upcaster for Schema {} impl esrs::store::postgres::Schema for Schema { fn from_event(value: Event) -> Self { diff --git a/examples/upcasting/a.rs b/examples/upcasting/a.rs index b65f0544..e51588e5 100644 --- a/examples/upcasting/a.rs +++ b/examples/upcasting/a.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; -use esrs::sql::event::Upcaster; +use esrs::event::Upcaster; use esrs::Aggregate; use crate::{Command, Error}; diff --git a/examples/upcasting/b.rs b/examples/upcasting/b.rs index d0b1539a..9fe56f0b 100644 --- a/examples/upcasting/b.rs +++ b/examples/upcasting/b.rs @@ -3,7 +3,7 @@ use std::convert::{TryFrom, TryInto}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use esrs::sql::event::Upcaster; +use esrs::event::Upcaster; use esrs::Aggregate; use crate::{Command, Error}; diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 00000000..b0205ec1 --- /dev/null +++ b/src/event.rs @@ -0,0 +1,17 @@ +use serde::de::DeserializeOwned; + +pub trait Upcaster +where + Self: Sized, +{ + fn upcast(value: serde_json::Value, _version: Option) -> Result + where + Self: DeserializeOwned, + { + serde_json::from_value(value) + } + + fn current_version() -> Option { + None + } +} diff --git a/src/lib.rs b/src/lib.rs index de7200c9..99155a9e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,8 @@ mod aggregate; mod state; pub mod bus; +#[cfg(feature = "upcasting")] +pub mod event; pub mod handler; pub mod manager; pub mod store; diff --git a/src/sql/event.rs b/src/sql/event.rs index 8fd100b1..e07928f3 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -10,23 +10,6 @@ use crate::types::SequenceNumber; use serde::de::DeserializeOwned; use serde::Serialize; -#[cfg(feature = "upcasting")] -pub trait Upcaster -where - Self: Sized, -{ - fn upcast(value: serde_json::Value, _version: Option) -> Result - where - Self: DeserializeOwned, - { - serde_json::from_value(value) - } - - fn current_version() -> Option { - None - } -} - #[cfg(not(feature = "upcasting"))] pub trait Persistable: Serialize + DeserializeOwned {} @@ -34,10 +17,10 @@ pub trait Persistable: Serialize + DeserializeOwned {} impl Persistable for T where T: Serialize + DeserializeOwned {} #[cfg(feature = "upcasting")] -pub trait Persistable: Serialize + DeserializeOwned + Upcaster {} +pub trait Persistable: Serialize + DeserializeOwned + crate::event::Upcaster {} #[cfg(feature = "upcasting")] -impl Persistable for T where T: Serialize + DeserializeOwned + Upcaster {} +impl Persistable for T where T: Serialize + DeserializeOwned + crate::event::Upcaster {} /// Event representation on the event store #[derive(sqlx::FromRow, serde::Serialize, serde::Deserialize, Debug)] diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 314911f7..48ed2e8d 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -63,7 +63,7 @@ mod tests { pub struct TestEvent; #[cfg(feature = "upcasting")] - impl crate::sql::event::Upcaster for TestEvent {} + impl crate::event::Upcaster for TestEvent {} impl Aggregate for TestAggregate { const NAME: &'static str = "test"; diff --git a/src/store/postgres/schema.rs b/src/store/postgres/schema.rs index c0f24f8f..c9d8dbe2 100644 --- a/src/store/postgres/schema.rs +++ b/src/store/postgres/schema.rs @@ -23,7 +23,7 @@ use crate::sql::event::Persistable; /// # } /// # /// # #[cfg(feature = "upcasting")] -/// # impl esrs::sql::event::Upcaster for Schema {} +/// # impl esrs::event::Upcaster for Schema {} /// # /// # impl SchemaTrait for Schema { /// # fn from_event(Event { a }: Event) -> Self { @@ -65,7 +65,7 @@ pub trait Schema: Persistable { /// # } /// # /// # #[cfg(feature = "upcasting")] - /// # impl esrs::sql::event::Upcaster for Schema {} + /// # impl esrs::event::Upcaster for Schema {} /// # /// # impl SchemaTrait for Schema { /// # fn from_event(Event { a }: Event) -> Self { diff --git a/tests/aggregate/structs.rs b/tests/aggregate/structs.rs index 4db8ac3c..08b481d7 100644 --- a/tests/aggregate/structs.rs +++ b/tests/aggregate/structs.rs @@ -11,7 +11,7 @@ pub struct TestEvent { } #[cfg(feature = "upcasting")] -impl esrs::sql::event::Upcaster for TestEvent {} +impl esrs::event::Upcaster for TestEvent {} #[derive(Debug, thiserror::Error)] pub enum TestError {} From 69ef2d3327738dcf0949750a103fd276864e1d42 Mon Sep 17 00:00:00 2001 From: John Bell Date: Tue, 2 Apr 2024 11:54:52 +0100 Subject: [PATCH 15/17] Move persistable to postgres module --- src/rebuilder/pg_rebuilder.rs | 2 +- src/sql/event.rs | 15 +-------------- src/store/postgres/builder.rs | 2 +- src/store/postgres/event_store.rs | 3 ++- src/store/postgres/mod.rs | 1 + src/store/postgres/persistable.rs | 14 ++++++++++++++ src/store/postgres/schema.rs | 2 +- 7 files changed, 21 insertions(+), 18 deletions(-) create mode 100644 src/store/postgres/persistable.rs diff --git a/src/rebuilder/pg_rebuilder.rs b/src/rebuilder/pg_rebuilder.rs index d2b2d0de..b7b55879 100644 --- a/src/rebuilder/pg_rebuilder.rs +++ b/src/rebuilder/pg_rebuilder.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::bus::EventBus; use crate::handler::{ReplayableEventHandler, TransactionalEventHandler}; use crate::rebuilder::Rebuilder; -use crate::sql::event::Persistable; +use crate::store::postgres::persistable::Persistable; use crate::store::postgres::{PgStore, PgStoreBuilder, PgStoreError, Schema}; use crate::store::{EventStore, StoreEvent}; use crate::Aggregate; diff --git a/src/sql/event.rs b/src/sql/event.rs index e07928f3..d2ec21f4 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -4,23 +4,10 @@ use chrono::{DateTime, Utc}; use serde_json::Value; use uuid::Uuid; +use crate::store::postgres::persistable::Persistable; use crate::store::postgres::Schema; use crate::store::StoreEvent; use crate::types::SequenceNumber; -use serde::de::DeserializeOwned; -use serde::Serialize; - -#[cfg(not(feature = "upcasting"))] -pub trait Persistable: Serialize + DeserializeOwned {} - -#[cfg(not(feature = "upcasting"))] -impl Persistable for T where T: Serialize + DeserializeOwned {} - -#[cfg(feature = "upcasting")] -pub trait Persistable: Serialize + DeserializeOwned + crate::event::Upcaster {} - -#[cfg(feature = "upcasting")] -impl Persistable for T where T: Serialize + DeserializeOwned + crate::event::Upcaster {} /// Event representation on the event store #[derive(sqlx::FromRow, serde::Serialize, serde::Deserialize, Debug)] diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 777ca3f0..04b72088 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -6,12 +6,12 @@ use tokio::sync::RwLock; use crate::bus::EventBus; use crate::handler::{EventHandler, TransactionalEventHandler}; -use crate::sql::event::Persistable; use crate::sql::migrations::{Migrations, MigrationsHandler}; use crate::sql::statements::{Statements, StatementsHandler}; use crate::store::postgres::{InnerPgStore, PgStoreError}; use crate::Aggregate; +use super::persistable::Persistable; use super::{PgStore, Schema}; /// Struct used to build a brand new [`PgStore`]. diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index cb1e80b5..e39442a6 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -14,8 +14,9 @@ use uuid::Uuid; use crate::bus::EventBus; use crate::handler::{EventHandler, TransactionalEventHandler}; -use crate::sql::event::{DbEvent, Persistable}; +use crate::sql::event::DbEvent; use crate::sql::statements::{Statements, StatementsHandler}; +use crate::store::postgres::persistable::Persistable; use crate::store::postgres::PgStoreError; use crate::store::postgres::Schema; use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; diff --git a/src/store/postgres/mod.rs b/src/store/postgres/mod.rs index 8014fc66..9646b1c5 100644 --- a/src/store/postgres/mod.rs +++ b/src/store/postgres/mod.rs @@ -4,6 +4,7 @@ pub use schema::*; mod builder; mod event_store; +pub mod persistable; mod schema; // Trait aliases are experimental. See issue #41517 diff --git a/src/store/postgres/persistable.rs b/src/store/postgres/persistable.rs new file mode 100644 index 00000000..fb07a347 --- /dev/null +++ b/src/store/postgres/persistable.rs @@ -0,0 +1,14 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; + +#[cfg(not(feature = "upcasting"))] +pub trait Persistable: Serialize + DeserializeOwned {} + +#[cfg(not(feature = "upcasting"))] +impl Persistable for T where T: Serialize + DeserializeOwned {} + +#[cfg(feature = "upcasting")] +pub trait Persistable: Serialize + DeserializeOwned + crate::event::Upcaster {} + +#[cfg(feature = "upcasting")] +impl Persistable for T where T: Serialize + DeserializeOwned + crate::event::Upcaster {} diff --git a/src/store/postgres/schema.rs b/src/store/postgres/schema.rs index c9d8dbe2..60f5e0df 100644 --- a/src/store/postgres/schema.rs +++ b/src/store/postgres/schema.rs @@ -1,4 +1,4 @@ -use crate::sql::event::Persistable; +use super::persistable::Persistable; /// To support decoupling between the [`crate::Aggregate::Event`] type and the schema of the DB table /// in [`super::PgStore`] you can create a schema type that implements [`Persistable`] and [`Schema`] From 6b80e4bbfb26824776f289fc398db448695cf879 Mon Sep 17 00:00:00 2001 From: John Bell Date: Tue, 2 Apr 2024 12:07:48 +0100 Subject: [PATCH 16/17] Remove sql feature --- Cargo.toml | 3 +-- src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4b91d1b2..70baebb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,7 @@ all-features = true [features] default = [] -sql = ["sqlx"] -postgres = ["sql", "sqlx/postgres", "typed-builder", "tokio"] +postgres = ["sqlx", "sqlx/postgres", "typed-builder", "tokio"] rebuilder = [] kafka = ["rdkafka", "typed-builder"] rabbit = ["lapin", "typed-builder"] diff --git a/src/lib.rs b/src/lib.rs index 99155a9e..8e22e1c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,7 @@ pub mod store; #[cfg(feature = "rebuilder")] pub mod rebuilder; -#[cfg(feature = "sql")] +#[cfg(feature = "postgres")] pub mod sql; pub mod types { From 813aac72d0664674a947c14f9680a9d1f307b988 Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 3 Apr 2024 14:40:31 +0100 Subject: [PATCH 17/17] Update change log --- CHANGELOG.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cb53156..62e95dff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +--- +## [0.15.0] - 2024-04-03 + +### Added + +- [[#191]]: Add new generic on `PgStore` and `Schema` trait to decouple persistence from `Aggregate::Event`. +- [[#187]]: Make the `AggregateManager` `deref` blanket implementation work for smart pointers. + +### Changed + +- [[#191]]: Updated MSRV to `1.74.0`. +- [[#191]]: Renamed `Event` trait to `Persistable` (this should not affect users of the library since users of the library benefit from a blanket implementation). + +### Removed + +- [[#191]]: Removed broken `sql` feature. + --- ## [0.14.0] - 2024-01-09