Skip to content

Commit

Permalink
Use batch aggregation in flow recovering
Browse files Browse the repository at this point in the history
  • Loading branch information
andriidemus committed Jan 9, 2025
1 parent bb9af2a commit c4de068
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/domain/flow-system/services/src/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn register_dependencies(catalog_builder: &mut CatalogBuilder) {
catalog_builder.add::<FlowConfigurationServiceImpl>();
catalog_builder.add::<FlowTriggerServiceImpl>();
catalog_builder.add::<FlowAgentImpl>();
catalog_builder.add::<FlowStateHelper>();
catalog_builder.add::<FlowQueryServiceImpl>();

catalog_builder.add::<FlowAbortHelper>();
Expand Down
13 changes: 5 additions & 8 deletions src/domain/flow-system/services/src/flow/flow_agent_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use messaging_outbox::{
use time_source::SystemTimeSource;
use tracing::Instrument as _;

use crate::flow::FlowStateHelper;
use crate::{
FlowAbortHelper,
FlowSchedulingHelper,
Expand Down Expand Up @@ -126,6 +127,7 @@ impl FlowAgentImpl {
// Extract necessary dependencies
let flow_event_store = target_catalog.get_one::<dyn FlowEventStore>().unwrap();
let scheduling_helper = target_catalog.get_one::<FlowSchedulingHelper>().unwrap();
let flows_state_helper = target_catalog.get_one::<FlowStateHelper>().unwrap();

// How many waiting flows do we have?
let waiting_filters = AllFlowFilters {
Expand All @@ -152,13 +154,10 @@ impl FlowAgentImpl {
.try_collect()
.await?;

// Process each waiting flow
for waiting_flow_id in &waiting_flow_ids {
// TODO: batch loading of flows
let flow = Flow::load(*waiting_flow_id, flow_event_store.as_ref())
.await
.int_err()?;
processed_waiting_flows += waiting_flow_ids.len();
let mut state_stream = flows_state_helper.get_stream(waiting_flow_ids);

while let Some(flow) = state_stream.try_next().await? {
// We need to re-evaluate batching conditions only
if let Some(FlowStartCondition::Batching(b)) = &flow.start_condition {
scheduling_helper
Expand All @@ -173,8 +172,6 @@ impl FlowAgentImpl {
.await?;
}
}

processed_waiting_flows += waiting_flow_ids.len();
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kamu_core::DatasetOwnershipService;
use kamu_flow_system::*;
use opendatafabric::{AccountID, DatasetID};

use crate::flow::flow_state_helper::FlowStateHelper;
use crate::{FlowAbortHelper, FlowSchedulingHelper};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -28,6 +29,7 @@ pub struct FlowQueryServiceImpl {
flow_event_store: Arc<dyn FlowEventStore>,
dataset_ownership_service: Arc<dyn DatasetOwnershipService>,
agent_config: Arc<FlowAgentConfig>,
flow_state: Arc<FlowStateHelper>,
}

#[component(pub)]
Expand All @@ -38,32 +40,16 @@ impl FlowQueryServiceImpl {
flow_event_store: Arc<dyn FlowEventStore>,
dataset_ownership_service: Arc<dyn DatasetOwnershipService>,
agent_config: Arc<FlowAgentConfig>,
flow_state: Arc<FlowStateHelper>,
) -> Self {
Self {
catalog,
flow_event_store,
dataset_ownership_service,
agent_config,
flow_state,
}
}

fn flow_state_stream(&self, flow_ids: Vec<FlowID>) -> FlowStateStream {
Box::pin(async_stream::try_stream! {
// 32-items batching will give a performance boost,
// but queries for long-lived datasets should not bee too heavy.
// This number was chosen without any performance measurements. Subject of change.
let chunk_size = 32;
for chunk in flow_ids.chunks(chunk_size) {
let flows = Flow::load_multi(
chunk.to_vec(),
self.flow_event_store.as_ref()
).await.int_err()?;
for flow in flows {
yield flow.int_err()?.into();
}
}
})
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -91,7 +77,7 @@ impl FlowQueryService for FlowQueryServiceImpl {
.try_collect()
.await?;

let matched_stream = self.flow_state_stream(relevant_flow_ids);
let matched_stream = self.flow_state.get_stream(relevant_flow_ids);

Ok(FlowStateListing {
matched_stream,
Expand Down Expand Up @@ -160,7 +146,7 @@ impl FlowQueryService for FlowQueryServiceImpl {
.try_collect()
.await
.int_err()?;
let matched_stream = self.flow_state_stream(relevant_flow_ids);
let matched_stream = self.flow_state.get_stream(relevant_flow_ids);

Ok(FlowStateListing {
matched_stream,
Expand Down Expand Up @@ -218,7 +204,7 @@ impl FlowQueryService for FlowQueryServiceImpl {
.try_collect()
.await?;

let matched_stream = self.flow_state_stream(relevant_flow_ids);
let matched_stream = self.flow_state.get_stream(relevant_flow_ids);

Ok(FlowStateListing {
matched_stream,
Expand All @@ -244,7 +230,7 @@ impl FlowQueryService for FlowQueryServiceImpl {
.get_all_flow_ids(&empty_filters, pagination)
.try_collect()
.await?;
let matched_stream = self.flow_state_stream(all_flows);
let matched_stream = self.flow_state.get_stream(all_flows);

Ok(FlowStateListing {
matched_stream,
Expand Down
45 changes: 45 additions & 0 deletions src/domain/flow-system/services/src/flow/flow_state_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use dill::component;
use internal_error::ResultIntoInternal;
use kamu_flow_system::{Flow, FlowEventStore, FlowID, FlowStateStream};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct FlowStateHelper {
flow_event_store: Arc<dyn FlowEventStore>,
}

#[component(pub)]
impl FlowStateHelper {
pub(crate) fn new(flow_event_store: Arc<dyn FlowEventStore>) -> Self {
Self { flow_event_store }
}

pub fn get_stream(&self, flow_ids: Vec<FlowID>) -> FlowStateStream {
Box::pin(async_stream::try_stream! {
// 32-items batching will give a performance boost,
// but queries for long-lived datasets should not bee too heavy.
// This number was chosen without any performance measurements. Subject of change.
let chunk_size = 32;
for chunk in flow_ids.chunks(chunk_size) {
let flows = Flow::load_multi(
chunk.to_vec(),
self.flow_event_store.as_ref()
).await.int_err()?;
for flow in flows {
yield flow.int_err()?.into();
}
}
})
}
}
2 changes: 2 additions & 0 deletions src/domain/flow-system/services/src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ mod flow_abort_helper;
mod flow_agent_impl;
mod flow_query_service_impl;
mod flow_scheduling_helper;
mod flow_state_helper;

pub(crate) use flow_abort_helper::*;
pub use flow_agent_impl::*;
pub use flow_query_service_impl::*;
pub(crate) use flow_scheduling_helper::*;
pub(crate) use flow_state_helper::*;

0 comments on commit c4de068

Please sign in to comment.