diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c6ea0d7ed..c7ec0393f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,9 +11,10 @@ Recommendation: for ease of reading, use the following order: - Fixed --> -## [Unreleased] +## [0.217.2] - 2025-01-10 ### Changed - Updated to latest `datafusion` and `alloy` dependencies +- Performance improvements with batch loading of event sourcing aggregates ## [0.217.1] - 2025-01-09 ### Changed diff --git a/src/domain/flow-system/services/src/dependencies.rs b/src/domain/flow-system/services/src/dependencies.rs index a7e4cb07fc..45f7004b78 100644 --- a/src/domain/flow-system/services/src/dependencies.rs +++ b/src/domain/flow-system/services/src/dependencies.rs @@ -17,6 +17,7 @@ pub fn register_dependencies(catalog_builder: &mut CatalogBuilder) { catalog_builder.add::(); catalog_builder.add::(); catalog_builder.add::(); + catalog_builder.add::(); catalog_builder.add::(); catalog_builder.add::(); diff --git a/src/domain/flow-system/services/src/flow/flow_agent_impl.rs b/src/domain/flow-system/services/src/flow/flow_agent_impl.rs index 7976ef8386..db2b9f0aad 100644 --- a/src/domain/flow-system/services/src/flow/flow_agent_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_agent_impl.rs @@ -32,6 +32,7 @@ use messaging_outbox::{ use time_source::SystemTimeSource; use tracing::Instrument as _; +use crate::flow::FlowStateHelper; use crate::{ FlowAbortHelper, FlowSchedulingHelper, @@ -126,6 +127,7 @@ impl FlowAgentImpl { // Extract necessary dependencies let flow_event_store = target_catalog.get_one::().unwrap(); let scheduling_helper = target_catalog.get_one::().unwrap(); + let flows_state_helper = target_catalog.get_one::().unwrap(); // How many waiting flows do we have? let waiting_filters = AllFlowFilters { @@ -152,13 +154,10 @@ impl FlowAgentImpl { .try_collect() .await?; - // Process each waiting flow - for waiting_flow_id in &waiting_flow_ids { - // TODO: batch loading of flows - let flow = Flow::load(*waiting_flow_id, flow_event_store.as_ref()) - .await - .int_err()?; + processed_waiting_flows += waiting_flow_ids.len(); + let mut state_stream = flows_state_helper.get_stream(waiting_flow_ids); + while let Some(flow) = state_stream.try_next().await? { // We need to re-evaluate batching conditions only if let Some(FlowStartCondition::Batching(b)) = &flow.start_condition { scheduling_helper @@ -173,8 +172,6 @@ impl FlowAgentImpl { .await?; } } - - processed_waiting_flows += waiting_flow_ids.len(); } Ok(()) diff --git a/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs b/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs index 9dd8b4c59b..df3207907e 100644 --- a/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs @@ -19,6 +19,7 @@ use kamu_core::DatasetOwnershipService; use kamu_flow_system::*; use opendatafabric::{AccountID, DatasetID}; +use crate::flow::flow_state_helper::FlowStateHelper; use crate::{FlowAbortHelper, FlowSchedulingHelper}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -28,6 +29,7 @@ pub struct FlowQueryServiceImpl { flow_event_store: Arc, dataset_ownership_service: Arc, agent_config: Arc, + flow_state: Arc, } #[component(pub)] @@ -38,12 +40,14 @@ impl FlowQueryServiceImpl { flow_event_store: Arc, dataset_ownership_service: Arc, agent_config: Arc, + flow_state: Arc, ) -> Self { Self { catalog, flow_event_store, dataset_ownership_service, agent_config, + flow_state, } } } @@ -67,21 +71,13 @@ impl FlowQueryService for FlowQueryServiceImpl { .get_count_flows_by_dataset(dataset_id, &filters) .await?; - let dataset_id = dataset_id.clone(); - - let matched_stream = Box::pin(async_stream::try_stream! { - let relevant_flow_ids: Vec<_> = self - .flow_event_store - .get_all_flow_ids_by_dataset(&dataset_id, &filters, pagination) - .try_collect() - .await?; + let relevant_flow_ids: Vec<_> = self + .flow_event_store + .get_all_flow_ids_by_dataset(dataset_id, &filters, pagination) + .try_collect() + .await?; - // TODO: implement batch loading - for flow_id in relevant_flow_ids { - let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?; - yield flow.into(); - } - }); + let matched_stream = self.flow_state.get_stream(relevant_flow_ids); Ok(FlowStateListing { matched_stream, @@ -144,20 +140,13 @@ impl FlowQueryService for FlowQueryServiceImpl { let account_dataset_ids: HashSet = HashSet::from_iter(filtered_dataset_ids); - let matched_stream = Box::pin(async_stream::try_stream! { - let relevant_flow_ids: Vec<_> = self - .flow_event_store - .get_all_flow_ids_by_datasets(account_dataset_ids, &dataset_flow_filters, pagination) - .try_collect() - .await - .int_err()?; - - // TODO: implement batch loading - for flow_id in relevant_flow_ids { - let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?; - yield flow.into(); - } - }); + let relevant_flow_ids: Vec<_> = self + .flow_event_store + .get_all_flow_ids_by_datasets(account_dataset_ids, &dataset_flow_filters, pagination) + .try_collect() + .await + .int_err()?; + let matched_stream = self.flow_state.get_stream(relevant_flow_ids); Ok(FlowStateListing { matched_stream, @@ -209,19 +198,13 @@ impl FlowQueryService for FlowQueryServiceImpl { .await .int_err()?; - let matched_stream = Box::pin(async_stream::try_stream! { - let relevant_flow_ids: Vec<_> = self - .flow_event_store - .get_all_system_flow_ids(&filters, pagination) - .try_collect() - .await?; + let relevant_flow_ids: Vec<_> = self + .flow_event_store + .get_all_system_flow_ids(&filters, pagination) + .try_collect() + .await?; - // TODO: implement batch loading - for flow_id in relevant_flow_ids { - let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?; - yield flow.into(); - } - }); + let matched_stream = self.flow_state.get_stream(relevant_flow_ids); Ok(FlowStateListing { matched_stream, @@ -242,19 +225,12 @@ impl FlowQueryService for FlowQueryServiceImpl { .get_count_all_flows(&empty_filters) .await?; - let matched_stream = Box::pin(async_stream::try_stream! { - let all_flows: Vec<_> = self - .flow_event_store - .get_all_flow_ids(&empty_filters, pagination) - .try_collect() - .await?; - - // TODO: implement batch loading - for flow_id in all_flows { - let flow = Flow::load(flow_id, self.flow_event_store.as_ref()).await.int_err()?; - yield flow.into(); - } - }); + let all_flows: Vec<_> = self + .flow_event_store + .get_all_flow_ids(&empty_filters, pagination) + .try_collect() + .await?; + let matched_stream = self.flow_state.get_stream(all_flows); Ok(FlowStateListing { matched_stream, diff --git a/src/domain/flow-system/services/src/flow/flow_state_helper.rs b/src/domain/flow-system/services/src/flow/flow_state_helper.rs new file mode 100644 index 0000000000..4334076dd3 --- /dev/null +++ b/src/domain/flow-system/services/src/flow/flow_state_helper.rs @@ -0,0 +1,45 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use dill::component; +use internal_error::ResultIntoInternal; +use kamu_flow_system::{Flow, FlowEventStore, FlowID, FlowStateStream}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct FlowStateHelper { + flow_event_store: Arc, +} + +#[component(pub)] +impl FlowStateHelper { + pub(crate) fn new(flow_event_store: Arc) -> Self { + Self { flow_event_store } + } + + pub fn get_stream(&self, flow_ids: Vec) -> FlowStateStream { + Box::pin(async_stream::try_stream! { + // 32-items batching will give a performance boost, + // but queries for long-lived datasets should not bee too heavy. + // This number was chosen without any performance measurements. Subject of change. + let chunk_size = 32; + for chunk in flow_ids.chunks(chunk_size) { + let flows = Flow::load_multi( + chunk.to_vec(), + self.flow_event_store.as_ref() + ).await.int_err()?; + for flow in flows { + yield flow.int_err()?.into(); + } + } + }) + } +} diff --git a/src/domain/flow-system/services/src/flow/mod.rs b/src/domain/flow-system/services/src/flow/mod.rs index 098471329e..2400332ae5 100644 --- a/src/domain/flow-system/services/src/flow/mod.rs +++ b/src/domain/flow-system/services/src/flow/mod.rs @@ -11,8 +11,10 @@ mod flow_abort_helper; mod flow_agent_impl; mod flow_query_service_impl; mod flow_scheduling_helper; +mod flow_state_helper; pub(crate) use flow_abort_helper::*; pub use flow_agent_impl::*; pub use flow_query_service_impl::*; pub(crate) use flow_scheduling_helper::*; +pub(crate) use flow_state_helper::*; diff --git a/src/domain/task-system/services/src/task_agent_impl.rs b/src/domain/task-system/services/src/task_agent_impl.rs index 84adf0a9b2..2aa4f0ad7f 100644 --- a/src/domain/task-system/services/src/task_agent_impl.rs +++ b/src/domain/task-system/services/src/task_agent_impl.rs @@ -86,19 +86,21 @@ impl TaskAgentImpl { }) .try_collect() .await?; + let batch_size = running_task_ids.len(); - for running_task_id in &running_task_ids { - // TODO: batch loading of tasks - let mut task = Task::load(*running_task_id, task_event_store.as_ref()) - .await - .int_err()?; + let tasks = Task::load_multi(running_task_ids, task_event_store.as_ref()) + .await + .int_err()?; + + for task in tasks { + let mut t = task.int_err()?; // Requeue - task.requeue(self.time_source.now()).int_err()?; - task.save(task_event_store.as_ref()).await.int_err()?; + t.requeue(self.time_source.now()).int_err()?; + t.save(task_event_store.as_ref()).await.int_err()?; } - processed_running_tasks += running_task_ids.len(); + processed_running_tasks += batch_size; } Ok(()) diff --git a/src/domain/task-system/services/tests/tests/test_task_aggregate.rs b/src/domain/task-system/services/tests/tests/test_task_aggregate.rs index 428980285c..7cffb8ce4a 100644 --- a/src/domain/task-system/services/tests/tests/test_task_aggregate.rs +++ b/src/domain/task-system/services/tests/tests/test_task_aggregate.rs @@ -96,6 +96,42 @@ async fn test_task_save_load_update() { assert_eq!(task.outcome, Some(TaskOutcome::Cancelled)); } +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////#[test_log::test(tokio::test)] + +#[test_log::test(tokio::test)] +async fn test_task_load_multi() { + let event_store = InMemoryTaskEventStore::new(); + + for _ in 0..5000 { + let task_id = event_store.new_task_id().await.unwrap(); + let mut task = Task::new( + Utc::now(), + task_id, + LogicalPlanProbe::default().into(), + None, + ); + task.save(&event_store).await.unwrap(); + task.run(Utc::now()).unwrap(); + task.cancel(Utc::now()).unwrap(); + task.save(&event_store).await.unwrap(); + task.finish(Utc::now(), TaskOutcome::Cancelled).unwrap(); + task.save(&event_store).await.unwrap(); + } + + let delta = 2500; + let ids: Vec = (delta..(delta + 2000)).map(TaskID::new).collect(); + let tasks = Task::load_multi(ids, &event_store).await.unwrap(); + + for (i, task_res) in tasks.into_iter().enumerate() { + assert!(task_res.is_ok()); + let expected_id = TaskID::new(i as u64 + delta); + let task = task_res.unwrap(); + assert_eq!(task.task_id, expected_id); + assert_eq!(task.status(), TaskStatus::Finished); + assert_eq!(task.outcome, Some(TaskOutcome::Cancelled)); + } +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] diff --git a/src/infra/flow-system/postgres/.sqlx/query-ffff8fcd2b8ba38101e39fae7f226d466f20c5178f979303dd9974d008524e83.json b/src/infra/flow-system/postgres/.sqlx/query-ffff8fcd2b8ba38101e39fae7f226d466f20c5178f979303dd9974d008524e83.json new file mode 100644 index 0000000000..69eba0bb42 --- /dev/null +++ b/src/infra/flow-system/postgres/.sqlx/query-ffff8fcd2b8ba38101e39fae7f226d466f20c5178f979303dd9974d008524e83.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT flow_id, event_id, event_payload\n FROM flow_events\n WHERE flow_id = ANY($1)\n ORDER BY event_id ASC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "flow_id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "event_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "event_payload", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "ffff8fcd2b8ba38101e39fae7f226d466f20c5178f979303dd9974d008524e83" +} diff --git a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs index 170594f01a..627c431b9e 100644 --- a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs +++ b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs @@ -367,6 +367,40 @@ impl EventStore for PostgresFlowEventStore { }) } + fn get_events_multi(&self, queries: Vec) -> MultiEventStream { + let flow_ids: Vec = queries.iter().map(|id| (*id).try_into().unwrap()).collect(); + + Box::pin(async_stream::stream! { + let mut tr = self.transaction.lock().await; + let connection_mut = tr + .connection_mut() + .await?; + + let mut query_stream = sqlx::query!( + r#" + SELECT flow_id, event_id, event_payload + FROM flow_events + WHERE flow_id = ANY($1) + ORDER BY event_id ASC + "#, + &flow_ids, + ).try_map(|event_row| { + let event = serde_json::from_value::(event_row.event_payload) + .map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + + Ok((FlowID::try_from(event_row.flow_id).unwrap(), // ids are always > 0 + EventID::new(event_row.event_id), + event)) + }) + .fetch(connection_mut) + .map_err(|e| GetEventsError::Internal(e.int_err())); + + while let Some((flow_id, event_id, event)) = query_stream.try_next().await? { + yield Ok((flow_id, event_id, event)); + } + }) + } + async fn save_events( &self, flow_id: &FlowID, diff --git a/src/infra/task-system/postgres/.sqlx/query-4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e.json b/src/infra/task-system/postgres/.sqlx/query-4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e.json new file mode 100644 index 0000000000..0a8a19b21d --- /dev/null +++ b/src/infra/task-system/postgres/.sqlx/query-4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT task_id, event_id, event_payload\n FROM task_events\n WHERE task_id = ANY($1)\n ORDER BY event_id ASC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "task_id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "event_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "event_payload", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e" +} diff --git a/src/infra/task-system/postgres/src/postgres_task_event_store.rs b/src/infra/task-system/postgres/src/postgres_task_event_store.rs index b09c644fd0..c34a30300e 100644 --- a/src/infra/task-system/postgres/src/postgres_task_event_store.rs +++ b/src/infra/task-system/postgres/src/postgres_task_event_store.rs @@ -184,6 +184,40 @@ impl EventStore for PostgresTaskEventStore { }) } + fn get_events_multi(&self, queries: Vec) -> MultiEventStream { + let task_ids: Vec = queries.iter().map(|id| (*id).try_into().unwrap()).collect(); + + Box::pin(async_stream::stream! { + let mut tr = self.transaction.lock().await; + let connection_mut = tr + .connection_mut() + .await?; + + let mut query_stream = sqlx::query!( + r#" + SELECT task_id, event_id, event_payload + FROM task_events + WHERE task_id = ANY($1) + ORDER BY event_id ASC + "#, + &task_ids, + ).try_map(|event_row| { + let event = serde_json::from_value::(event_row.event_payload) + .map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + + Ok((TaskID::try_from(event_row.task_id).unwrap(), // ids are always > 0 + EventID::new(event_row.event_id), + event)) + }) + .fetch(connection_mut) + .map_err(|e| GetEventsError::Internal(e.int_err())); + + while let Some((task_id, event_id, event)) = query_stream.try_next().await? { + yield Ok((task_id, event_id, event)); + } + }) + } + async fn save_events( &self, task_id: &TaskID, diff --git a/src/utils/event-sourcing-macros/src/lib.rs b/src/utils/event-sourcing-macros/src/lib.rs index dbc32186b4..493f8a039d 100644 --- a/src/utils/event-sourcing-macros/src/lib.rs +++ b/src/utils/event-sourcing-macros/src/lib.rs @@ -72,6 +72,23 @@ pub fn derive_aggregate(tokens: proc_macro::TokenStream) -> proc_macro::TokenStr Ok(Self(agg)) } + #[inline] + pub async fn load_multi( + queries: Vec<<#proj_type as ::event_sourcing::Projection>::Query>, + event_store: &#store_type, + ) -> Result>>, GetEventsError> { + let aggs = ::event_sourcing::Aggregate::load_multi(queries, event_store).await?; + let mut result = vec![]; + for agg in aggs { + let res = match agg { + Err(e) => Err(e), + Ok(a) => Ok(Self(a)), + }; + result.push(res); + } + Ok(result) + } + #[inline] pub async fn try_load( query: <#proj_type as ::event_sourcing::Projection>::Query, diff --git a/src/utils/event-sourcing/src/aggregate.rs b/src/utils/event-sourcing/src/aggregate.rs index 9b8e341a57..bff070bff8 100644 --- a/src/utils/event-sourcing/src/aggregate.rs +++ b/src/utils/event-sourcing/src/aggregate.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::marker::PhantomData; use internal_error::InternalError; @@ -116,6 +117,50 @@ where } } + /// Loads multiple aggregations + /// + /// Returns either collection of aggregation results or an error, when + /// failed to read from source stream. + /// + /// "Ok" vector contains results for every item from `queries` argument. + /// Order is preserved. + pub async fn load_multi( + queries: Vec, + event_store: &Store, + ) -> Result>>, GetEventsError> { + use tokio_stream::StreamExt; + + let mut event_stream = event_store.get_events_multi(queries.clone()); + let mut agg_results: HashMap>> = HashMap::new(); + + while let Some(res) = event_stream.next().await { + // When failed to read at least one event from source stream, + // function returns error result immediately + let (query, event_id, event) = res?; + let agg_result: Result, LoadError> = match agg_results + .remove(&query) + { + None => Self::from_stored_event(query.clone(), event_id, event).map_err(Into::into), + Some(Ok(mut agg)) => match agg.apply_stored(event_id, event) { + Ok(_) => Ok(agg), + Err(err) => Err(err.into()), + }, + Some(Err(err)) => Err(err), + }; + agg_results.insert(query, agg_result); + } + + let mut result: Vec>> = vec![]; + for query in queries { + let item = match agg_results.remove(&query) { + None => Err(AggregateNotFoundError::new(query).into()), + Some(agg) => agg, + }; + result.push(item); + } + Ok(result) + } + /// Same as [Aggregate::load()] but with extra control knobs #[tracing::instrument( level = "debug", diff --git a/src/utils/event-sourcing/src/event_store.rs b/src/utils/event-sourcing/src/event_store.rs index 687785f6fe..388a27642a 100644 --- a/src/utils/event-sourcing/src/event_store.rs +++ b/src/utils/event-sourcing/src/event_store.rs @@ -20,6 +20,26 @@ pub trait EventStore: Send + Sync { /// Returns the event history of an aggregate in chronological order fn get_events(&self, query: &Proj::Query, opts: GetEventsOpts) -> EventStream; + /// Returns event history of multiple aggregates in chronological order + /// Created to give a room for query optimisations when needed + fn get_events_multi( + &self, + queries: Vec, + ) -> MultiEventStream { + use tokio_stream::StreamExt; + let queries = queries.clone(); + + Box::pin(async_stream::try_stream! { + for query in queries { + let mut stream = self.get_events(&query, GetEventsOpts::default()); + while let Some(event) = stream.next().await { + let (event_id, event) = event?; + yield (query.clone(), event_id, event) + } + } + }) + } + /// Persists a series of events /// /// The `query` argument must be the same as query passed when retrieving @@ -42,6 +62,14 @@ pub type EventStream<'a, Event> = std::pin::Pin< Box> + Send + 'a>, >; +pub type MultiEventStream<'a, Query, Event> = std::pin::Pin< + Box< + dyn tokio_stream::Stream> + + Send + + 'a, + >, +>; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Default)] diff --git a/src/utils/event-sourcing/src/projection.rs b/src/utils/event-sourcing/src/projection.rs index a0f5b488ba..154ad4dd96 100644 --- a/src/utils/event-sourcing/src/projection.rs +++ b/src/utils/event-sourcing/src/projection.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::hash::Hash; + use crate::ProjectionEvent; /// Projections reconstruct some state from a series of events @@ -20,6 +22,9 @@ where Self: Clone, Self::Query: Clone, + Self::Query: Hash, + Self::Query: Eq, + Self: std::fmt::Debug, Self::Query: std::fmt::Debug,