Skip to content

Commit

Permalink
Add new generic on PgStore and converter trait to decouple persistenc…
Browse files Browse the repository at this point in the history
…e from Aggregate::Event
  • Loading branch information
Johnabell committed Mar 23, 2024
1 parent 04c87d8 commit ff9bac4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
25 changes: 25 additions & 0 deletions src/sql/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde_json::Value;
use uuid::Uuid;

use crate::event::Event;
use crate::store::postgres::Converter;
use crate::store::StoreEvent;
use crate::types::SequenceNumber;

Expand All @@ -19,6 +20,30 @@ pub struct DbEvent {
pub version: Option<i32>,
}

impl DbEvent {
pub fn try_into_store_event<E, Schema>(self) -> Result<Option<StoreEvent<E>>, serde_json::Error>
where
Schema: Converter<E>,
{
#[cfg(feature = "upcasting")]
let payload = Schema::upcast(self.payload, self.version)?.into();
#[cfg(not(feature = "upcasting"))]
let payload = serde_json::from_value::<Schema>(self.payload)?.into();

Ok(match payload {
None => None,
Some(payload) => Some(StoreEvent {
id: self.id,
aggregate_id: self.aggregate_id,
payload,
occurred_on: self.occurred_on,
sequence_number: self.sequence_number,
version: self.version,
}),
})
}
}

impl<E: Event> TryInto<StoreEvent<E>> for DbEvent {
type Error = serde_json::Error;

Expand Down
2 changes: 2 additions & 0 deletions src/store/postgres/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::marker::PhantomData;
use std::sync::Arc;

use sqlx::{PgConnection, Pool, Postgres};
Expand Down Expand Up @@ -113,6 +114,7 @@ where
transactional_event_handlers: self.transactional_event_handlers,
event_buses: self.event_buses,
}),
_schema: PhantomData,
})
}
}
52 changes: 37 additions & 15 deletions src/store/postgres/event_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -22,16 +22,31 @@ use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop};
use crate::types::SequenceNumber;
use crate::{Aggregate, AggregateState};

pub trait Converter<E>: From<E> + Into<Option<E>> + Event {}

impl<T, E> Converter<E> for T where T: From<E> + Into<Option<E>> + Event {}

/// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a
/// pre-made implementation of an [`EventStore`] persisting on Postgres.
///
/// The store is protected by an [`Arc`] that allows it to be cloneable still having the same memory
/// reference.
pub struct PgStore<A>
///
/// To decouple persistence from the event types, it is possible to optionally, specify the
/// Database event schema for this store as a serializable type.
///
/// When events are persisted, they will first be converted via the `From` trait into the `Schema`
/// type, then serialized.
///
/// When events are read from the store, they will first be deserialized into the `Schema` and then
/// they can be converted into and option of the domain aggregate event. In this way it is possible
/// to deprecate events in core part of your application by returning `None` when converting.
pub struct PgStore<A, Schema = <A as Aggregate>::Event>
where
A: Aggregate,
{
pub(super) inner: Arc<InnerPgStore<A>>,
pub(super) _schema: PhantomData<Schema>,
}

pub(super) struct InnerPgStore<A>
Expand All @@ -46,10 +61,11 @@ where
pub(super) event_buses: Vec<Box<dyn EventBus<A> + Send>>,
}

impl<A> PgStore<A>
impl<A, Schema> PgStore<A, Schema>
where
A: Aggregate,
A::Event: Event + Sync,
A::Event: Send + Sync,
Schema: Converter<A::Event> + Event + Send + Sync,
{
/// Returns the name of the event store table
pub fn table_name(&self) -> &str {
Expand Down Expand Up @@ -83,17 +99,15 @@ where
let id: Uuid = Uuid::new_v4();

#[cfg(feature = "upcasting")]
let version: Option<i32> = {
use crate::event::Upcaster;
A::Event::current_version()
};
let version: Option<i32> = Schema::current_version();
#[cfg(not(feature = "upcasting"))]
let version: Option<i32> = None;
let schema = Schema::from(event);

let _ = sqlx::query(self.inner.statements.insert())
.bind(id)
.bind(aggregate_id)
.bind(Json(&event))
.bind(Json(&schema))
.bind(occurred_on)
.bind(sequence_number)
.bind(version)
Expand All @@ -103,7 +117,9 @@ where
Ok(StoreEvent {
id,
aggregate_id,
payload: event,
payload: schema.into().expect(
"This should always be true for converters assert event == Converter::from(event).into().unwrap()",
),
occurred_on,
sequence_number,
version,
Expand All @@ -119,7 +135,8 @@ where
Box::pin({
sqlx::query_as::<_, DbEvent>(self.inner.statements.select_all())
.fetch(executor)
.map(|res| Ok(res?.try_into()?))
.map(|res| Ok(res?.try_into_store_event::<_, Schema>()?))
.filter_map(|event| async { event.transpose() })
})
}
}
Expand All @@ -140,11 +157,12 @@ pub struct PgStoreLockGuard {
impl UnlockOnDrop for PgStoreLockGuard {}

#[async_trait]
impl<A> EventStore for PgStore<A>
impl<A, Schema> EventStore for PgStore<A, Schema>
where
A: Aggregate,
A::State: Send,
A::Event: Event + Send + Sync,
A::Event: Send + Sync,
Schema: Converter<A::Event> + Event + Send + Sync,
{
type Aggregate = A;
type Error = PgStoreError;
Expand All @@ -167,10 +185,13 @@ where
.fetch_all(&self.inner.pool)
.await?
.into_iter()
.map(|event| Ok(event.try_into()?))
.map(|event| Ok(event.try_into_store_event::<_, Schema>()?))
.filter_map(Result::transpose)
.collect::<Result<Vec<StoreEvent<A::Event>>, Self::Error>>()?)
}

// Note: https://github.com/rust-lang/rust-clippy/issues/12281
#[allow(clippy::blocks_in_conditions)]
#[tracing::instrument(skip_all, fields(aggregate_id = % aggregate_state.id()), err)]
async fn persist(
&self,
Expand Down Expand Up @@ -301,13 +322,14 @@ impl<T: Aggregate> std::fmt::Debug for PgStore<T> {
}
}

impl<A> Clone for PgStore<A>
impl<A, Schema> Clone for PgStore<A, Schema>
where
A: Aggregate,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
_schema: PhantomData,
}
}
}

0 comments on commit ff9bac4

Please sign in to comment.