Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batch loading of event sourcing aggregates (Flows, Tasks) #1024

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 36 additions & 54 deletions src/domain/flow-system/services/src/flow/flow_query_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use database_common::PaginationOpts;
use dill::{component, interface, Catalog};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use internal_error::ResultIntoInternal;
use kamu_core::DatasetOwnershipService;
use kamu_flow_system::*;
Expand Down Expand Up @@ -46,6 +46,23 @@ impl FlowQueryServiceImpl {
agent_config,
}
}

fn flow_state_stream<'a>(&'a self, flow_ids: FlowIDStream<'a>) -> FlowStateStream<'a> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting: missing blank line

Box::pin(async_stream::try_stream! {
// 32-items batching will give a performance boost,
// but queries for long-lived datasets should not bee too heavy.
// This number was chosen without any performance measurements. Subject of change.
let batch_size = 32;
let mut chunks = flow_ids.try_chunks(batch_size);
while let Some(item) = chunks.next().await {
let ids = item.int_err()?;
let flows = Flow::load_multi(ids, self.flow_event_store.as_ref()).await.int_err()?;
for flow in flows {
yield flow.int_err()?.into();
}
}
})
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -67,21 +84,11 @@ impl FlowQueryService for FlowQueryServiceImpl {
.get_count_flows_by_dataset(dataset_id, &filters)
.await?;

let dataset_id = dataset_id.clone();

let matched_stream = Box::pin(async_stream::try_stream! {
let relevant_flow_ids: Vec<_> = self
.flow_event_store
.get_all_flow_ids_by_dataset(&dataset_id, &filters, pagination)
.try_collect()
.await?;
let flow_ids_stream = self
.flow_event_store
.get_all_flow_ids_by_dataset(dataset_id, &filters, pagination);

// TODO: implement batch loading
for flow_id in relevant_flow_ids {
let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?;
yield flow.into();
}
});
let matched_stream = self.flow_state_stream(flow_ids_stream);

Ok(FlowStateListing {
matched_stream,
Expand Down Expand Up @@ -144,20 +151,12 @@ impl FlowQueryService for FlowQueryServiceImpl {

let account_dataset_ids: HashSet<DatasetID> = HashSet::from_iter(filtered_dataset_ids);

let matched_stream = Box::pin(async_stream::try_stream! {
let relevant_flow_ids: Vec<_> = self
.flow_event_store
.get_all_flow_ids_by_datasets(account_dataset_ids, &dataset_flow_filters, pagination)
.try_collect()
.await
.int_err()?;

// TODO: implement batch loading
for flow_id in relevant_flow_ids {
let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?;
yield flow.into();
}
});
let flow_ids_stream = self.flow_event_store.get_all_flow_ids_by_datasets(
account_dataset_ids,
&dataset_flow_filters,
pagination,
);
let matched_stream = self.flow_state_stream(flow_ids_stream);

Ok(FlowStateListing {
matched_stream,
Expand Down Expand Up @@ -209,19 +208,11 @@ impl FlowQueryService for FlowQueryServiceImpl {
.await
.int_err()?;

let matched_stream = Box::pin(async_stream::try_stream! {
let relevant_flow_ids: Vec<_> = self
.flow_event_store
.get_all_system_flow_ids(&filters, pagination)
.try_collect()
.await?;
let flow_ids_stream = self
.flow_event_store
.get_all_system_flow_ids(&filters, pagination);

// TODO: implement batch loading
for flow_id in relevant_flow_ids {
let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?;
yield flow.into();
}
});
let matched_stream = self.flow_state_stream(flow_ids_stream);

Ok(FlowStateListing {
matched_stream,
Expand All @@ -242,19 +233,10 @@ impl FlowQueryService for FlowQueryServiceImpl {
.get_count_all_flows(&empty_filters)
.await?;

let matched_stream = Box::pin(async_stream::try_stream! {
let all_flows: Vec<_> = self
.flow_event_store
.get_all_flow_ids(&empty_filters, pagination)
.try_collect()
.await?;

// TODO: implement batch loading
for flow_id in all_flows {
let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?;
yield flow.into();
}
});
let all_flows = self
.flow_event_store
.get_all_flow_ids(&empty_filters, pagination);
let matched_stream = self.flow_state_stream(all_flows);

Ok(FlowStateListing {
matched_stream,
Expand Down
18 changes: 10 additions & 8 deletions src/domain/task-system/services/src/task_agent_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,21 @@ impl TaskAgentImpl {
})
.try_collect()
.await?;
let batch_size = running_task_ids.len();

for running_task_id in &running_task_ids {
// TODO: batch loading of tasks
let mut task = Task::load(*running_task_id, task_event_store.as_ref())
.await
.int_err()?;
let tasks = Task::load_multi(running_task_ids, task_event_store.as_ref())
.await
.int_err()?;

for task in tasks {
let mut t = task.int_err()?;

// Requeue
task.requeue(self.time_source.now()).int_err()?;
task.save(task_event_store.as_ref()).await.int_err()?;
t.requeue(self.time_source.now()).int_err()?;
t.save(task_event_store.as_ref()).await.int_err()?;
}

processed_running_tasks += running_task_ids.len();
processed_running_tasks += batch_size;
}

Ok(())
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions src/infra/flow-system/postgres/src/postgres_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,40 @@ impl EventStore<FlowState> for PostgresFlowEventStore {
})
}

fn get_events_multi(&self, queries: Vec<FlowID>) -> MultiEventStream<FlowID, FlowEvent> {
let flow_ids: Vec<i64> = queries.iter().map(|id| (*id).try_into().unwrap()).collect();

Box::pin(async_stream::stream! {
let mut tr = self.transaction.lock().await;
let connection_mut = tr
.connection_mut()
.await?;

let mut query_stream = sqlx::query!(
r#"
SELECT flow_id, event_id, event_payload
FROM flow_events
WHERE flow_id = ANY($1)
ORDER BY event_id ASC
"#,
&flow_ids,
).try_map(|event_row| {
let event = serde_json::from_value::<FlowEvent>(event_row.event_payload)
.map_err(|e| sqlx::Error::Decode(Box::new(e)))?;

Ok((FlowID::try_from(event_row.flow_id).unwrap(), // ids are always > 0
EventID::new(event_row.event_id),
event))
})
.fetch(connection_mut)
.map_err(|e| GetEventsError::Internal(e.int_err()));

while let Some((flow_id, event_id, event)) = query_stream.try_next().await? {
yield Ok((flow_id, event_id, event));
}
})
}

async fn save_events(
&self,
flow_id: &FlowID,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions src/infra/task-system/postgres/src/postgres_task_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,40 @@ impl EventStore<TaskState> for PostgresTaskEventStore {
})
}

fn get_events_multi(&self, queries: Vec<TaskID>) -> MultiEventStream<TaskID, TaskEvent> {
let task_ids: Vec<i64> = queries.iter().map(|id| (*id).try_into().unwrap()).collect();

Box::pin(async_stream::stream! {
let mut tr = self.transaction.lock().await;
let connection_mut = tr
.connection_mut()
.await?;

let mut query_stream = sqlx::query!(
r#"
SELECT task_id, event_id, event_payload
FROM task_events
WHERE task_id = ANY($1)
ORDER BY event_id ASC
"#,
&task_ids,
).try_map(|event_row| {
let event = serde_json::from_value::<TaskEvent>(event_row.event_payload)
.map_err(|e| sqlx::Error::Decode(Box::new(e)))?;

Ok((TaskID::try_from(event_row.task_id).unwrap(), // ids are always > 0
EventID::new(event_row.event_id),
event))
})
.fetch(connection_mut)
.map_err(|e| GetEventsError::Internal(e.int_err()));

while let Some((task_id, event_id, event)) = query_stream.try_next().await? {
yield Ok((task_id, event_id, event));
}
})
}

async fn save_events(
&self,
task_id: &TaskID,
Expand Down
17 changes: 17 additions & 0 deletions src/utils/event-sourcing-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ pub fn derive_aggregate(tokens: proc_macro::TokenStream) -> proc_macro::TokenStr
Ok(Self(agg))
}

#[inline]
pub async fn load_multi(
queries: Vec<<#proj_type as ::event_sourcing::Projection>::Query>,
event_store: &#store_type,
) -> Result<Vec<Result<Self, LoadError<#proj_type>>>, GetEventsError> {
let aggs = ::event_sourcing::Aggregate::load_multi(queries, event_store).await?;
let mut result = vec![];
for agg in aggs {
let res = match agg {
Err(e) => Err(e),
Ok(a) => Ok(Self(a)),
};
result.push(res);
}
Ok(result)
}

#[inline]
pub async fn try_load(
query: <#proj_type as ::event_sourcing::Projection>::Query,
Expand Down
45 changes: 45 additions & 0 deletions src/utils/event-sourcing/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::marker::PhantomData;

use internal_error::InternalError;
Expand Down Expand Up @@ -116,6 +117,50 @@ where
}
}

/// Loads multiple aggregations
///
/// Returns either collection of aggregation results or an error, when
/// failed to read from source stream.
///
/// "Ok" vector contains results for every item from `queries` argument.
/// Order is preserved.
pub async fn load_multi(
queries: Vec<Proj::Query>,
event_store: &Store,
) -> Result<Vec<Result<Self, LoadError<Proj>>>, GetEventsError> {
use tokio_stream::StreamExt;

let mut event_stream = event_store.get_events_multi(queries.clone());
let mut agg_results: HashMap<Proj::Query, Result<Self, LoadError<Proj>>> = HashMap::new();

while let Some(res) = event_stream.next().await {
// When failed to read at least one event from source stream,
// function returns error result immediately
let (query, event_id, event) = res?;
let agg_result: Result<Aggregate<Proj, Store>, LoadError<Proj>> = match agg_results
.remove(&query)
{
None => Self::from_stored_event(query.clone(), event_id, event).map_err(Into::into),
Some(Ok(mut agg)) => match agg.apply_stored(event_id, event) {
Ok(_) => Ok(agg),
Err(err) => Err(err.into()),
},
Some(Err(err)) => Err(err),
};
agg_results.insert(query, agg_result);
}

let mut result: Vec<Result<Self, LoadError<Proj>>> = vec![];
for query in queries {
let item = match agg_results.remove(&query) {
None => Err(AggregateNotFoundError::new(query).into()),
Some(agg) => agg,
};
result.push(item);
}
Ok(result)
}

/// Same as [Aggregate::load()] but with extra control knobs
#[tracing::instrument(
level = "debug",
Expand Down
Loading
Loading