Skip to content

Commit

Permalink
Improve flow queries perfomance
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko committed Jan 3, 2025
1 parent a574f62 commit 90d4796
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 58 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Fixed
- GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here)

## [0.216.0] - 2024-12-30
### Changed
- Flight SQL protocol now supports anonymous and bearer token authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ impl DatasetFlowRunsMut {
})?;

Ok(TriggerFlowResult::Success(TriggerFlowSuccess {
flow: Flow::new(flow_state),
flow: Flow::build_batch(vec![flow_state], ctx)
.await?
.pop()
.unwrap(),
}))
}

Expand Down Expand Up @@ -139,7 +142,10 @@ impl DatasetFlowRunsMut {

Ok(CancelScheduledTasksResult::Success(
CancelScheduledTasksSuccess {
flow: Flow::new(flow_state),
flow: Flow::build_batch(vec![flow_state], ctx)
.await?
.pop()
.unwrap(),
},
))
}
Expand Down
7 changes: 2 additions & 5 deletions src/adapter/graphql/src/queries/accounts/account_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,9 @@ impl AccountFlowRuns {
.await
.int_err()?;

let matched_flows: Vec<_> = flows_state_listing
.matched_stream
.map_ok(Flow::new)
.try_collect()
.await?;
let matched_flow_states: Vec<_> = flows_state_listing.matched_stream.try_collect().await?;
let total_count = flows_state_listing.total_count;
let matched_flows = Flow::build_batch(matched_flow_states, ctx).await?;

Ok(FlowConnection::new(
matched_flows,
Expand Down
12 changes: 6 additions & 6 deletions src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ impl DatasetFlowRuns {
.int_err()?;

Ok(GetFlowResult::Success(GetFlowSuccess {
flow: Flow::new(flow_state),
flow: Flow::build_batch(vec![flow_state], ctx)
.await?
.pop()
.unwrap(),
}))
}

Expand Down Expand Up @@ -106,12 +109,9 @@ impl DatasetFlowRuns {
.await
.int_err()?;

let matched_flows: Vec<_> = flows_state_listing
.matched_stream
.map_ok(Flow::new)
.try_collect()
.await?;
let matched_flow_states: Vec<_> = flows_state_listing.matched_stream.try_collect().await?;
let total_count = flows_state_listing.total_count;
let matched_flows = Flow::build_batch(matched_flow_states, ctx).await?;

Ok(FlowConnection::new(
matched_flows,
Expand Down
153 changes: 108 additions & 45 deletions src/adapter/graphql/src/queries/flows/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;

use chrono::{DateTime, Utc};
use kamu_core::{DatasetChangesService, DatasetRegistry, DatasetRegistryExt, MetadataQueryService};
use kamu_flow_system::FlowResultDatasetUpdate;
Expand All @@ -28,60 +30,121 @@ use crate::utils;
#[derive(Clone)]
pub struct Flow {
flow_state: Box<fs::FlowState>,
description: FlowDescription,
}

#[Object]
impl Flow {
#[graphql(skip)]
pub fn new(flow_state: fs::FlowState) -> Self {
Self {
flow_state: Box::new(flow_state),
pub async fn build_batch(
flow_states: Vec<fs::FlowState>,
ctx: &Context<'_>,
) -> Result<Vec<Self>> {
let mut result: Vec<Self> = Vec::new();
// We need this HashMap to avoid multiple queries to the same dataset polling
// source and cover cases when dataset has no Ingest flows, so we will
// build flow descriptions without searching of polling sources
//
// In addition it might be useful if we will add another entity which cause
// duplicate requests
let mut dataset_polling_sources: HashMap<
opendatafabric::DatasetID,
Option<(
odf::Multihash,
odf::MetadataBlockTyped<odf::SetPollingSource>,
)>,
> = HashMap::new();

for flow_state in &flow_states {
let flow_description =
Self::build_description(ctx, flow_state, &mut dataset_polling_sources).await?;
result.push(Self {
flow_state: Box::new(flow_state.clone()),
description: flow_description,
});
}

Ok(result)
}

/// Unique identifier of the flow
async fn flow_id(&self) -> FlowID {
self.flow_state.flow_id.into()
}

/// Description of key flow parameters
async fn description(&self, ctx: &Context<'_>) -> Result<FlowDescription> {
Ok(match &self.flow_state.flow_key {
fs::FlowKey::Dataset(fk_dataset) => {
FlowDescription::Dataset(self.dataset_flow_description(ctx, fk_dataset).await?)
}
#[graphql(skip)]
async fn build_description(
ctx: &Context<'_>,
flow_state: &fs::FlowState,
dataset_polling_sources_maybe: &mut HashMap<
opendatafabric::DatasetID,
Option<(
odf::Multihash,
odf::MetadataBlockTyped<odf::SetPollingSource>,
)>,
>,
) -> Result<FlowDescription> {
Ok(match &flow_state.flow_key {
fs::FlowKey::Dataset(fk_dataset) => FlowDescription::Dataset(
Self::dataset_flow_description(
ctx,
flow_state,
fk_dataset,
dataset_polling_sources_maybe,
)
.await?,
),
fs::FlowKey::System(fk_system) => {
FlowDescription::System(self.system_flow_description(fk_system))
FlowDescription::System(Self::system_flow_description(fk_system))
}
})
}

/// Description of key flow parameters
async fn description(&self) -> FlowDescription {
self.description.clone()
}

#[graphql(skip)]
async fn dataset_flow_description(
&self,
ctx: &Context<'_>,
flow_state: &fs::FlowState,
dataset_key: &fs::FlowKeyDataset,
dataset_polling_sources: &mut HashMap<
opendatafabric::DatasetID,
Option<(
odf::Multihash,
odf::MetadataBlockTyped<odf::SetPollingSource>,
)>,
>,
) -> Result<FlowDescriptionDataset> {
Ok(match dataset_key.flow_type {
fs::DatasetFlowType::Ingest => {
let (dataset_registry, metadata_query_service, dataset_changes_svc) = from_catalog_n!(
ctx,
dyn DatasetRegistry,
dyn MetadataQueryService,
dyn DatasetChangesService
);
let target = dataset_registry
.get_dataset_by_ref(&dataset_key.dataset_id.as_local_ref())
.await
.int_err()?;

let maybe_polling_source = metadata_query_service
.get_active_polling_source(target)
.await
.int_err()?;
let maybe_polling_source = if let Some(existing_polling_source) =
dataset_polling_sources.get(&dataset_key.dataset_id)
{
existing_polling_source.clone()
} else {
let (dataset_registry, metadata_query_service) =
from_catalog_n!(ctx, dyn DatasetRegistry, dyn MetadataQueryService);
let target = dataset_registry
.get_dataset_by_ref(&dataset_key.dataset_id.as_local_ref())
.await
.int_err()?;

let polling_source_maybe = metadata_query_service
.get_active_polling_source(target)
.await
.int_err()?;

dataset_polling_sources
.insert(dataset_key.dataset_id.clone(), polling_source_maybe.clone());
polling_source_maybe
};

let dataset_changes_svc = from_catalog_n!(ctx, dyn DatasetChangesService);
let ingest_result = FlowDescriptionUpdateResult::from_maybe_flow_outcome(
self.flow_state.outcome.as_ref(),
flow_state.outcome.as_ref(),
&dataset_key.dataset_id,
dataset_changes_svc.as_ref(),
)
Expand All @@ -94,7 +157,7 @@ impl Flow {
ingest_result,
})
} else {
let source_name = self.flow_state.primary_trigger().push_source_name();
let source_name = flow_state.primary_trigger().push_source_name();
FlowDescriptionDataset::PushIngest(FlowDescriptionDatasetPushIngest {
dataset_id: dataset_key.dataset_id.clone().into(),
source_name,
Expand All @@ -109,7 +172,7 @@ impl Flow {
FlowDescriptionDataset::ExecuteTransform(FlowDescriptionDatasetExecuteTransform {
dataset_id: dataset_key.dataset_id.clone().into(),
transform_result: FlowDescriptionUpdateResult::from_maybe_flow_outcome(
self.flow_state.outcome.as_ref(),
flow_state.outcome.as_ref(),
&dataset_key.dataset_id,
dataset_changes_svc.as_ref(),
)
Expand All @@ -122,23 +185,23 @@ impl Flow {
dataset_id: dataset_key.dataset_id.clone().into(),
compaction_result:
FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome(
self.flow_state.outcome.as_ref(),
flow_state.outcome.as_ref(),
),
})
}
fs::DatasetFlowType::Reset => {
FlowDescriptionDataset::Reset(FlowDescriptionDatasetReset {
dataset_id: dataset_key.dataset_id.clone().into(),
reset_result: FlowDescriptionResetResult::from_maybe_flow_outcome(
self.flow_state.outcome.as_ref(),
flow_state.outcome.as_ref(),
),
})
}
})
}

#[graphql(skip)]
fn system_flow_description(&self, system_key: &fs::FlowKeySystem) -> FlowDescriptionSystem {
fn system_flow_description(system_key: &fs::FlowKeySystem) -> FlowDescriptionSystem {
match system_key.flow_type {
fs::SystemFlowType::GC => {
FlowDescriptionSystem::GC(FlowDescriptionSystemGC { dummy: true })
Expand Down Expand Up @@ -236,25 +299,25 @@ impl Flow {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Union)]
#[derive(Union, Clone)]
enum FlowDescription {
#[graphql(flatten)]
Dataset(FlowDescriptionDataset),
#[graphql(flatten)]
System(FlowDescriptionSystem),
}

#[derive(Union)]
#[derive(Union, Clone)]
enum FlowDescriptionSystem {
GC(FlowDescriptionSystemGC),
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionSystemGC {
dummy: bool,
}

#[derive(Union)]
#[derive(Union, Clone)]
enum FlowDescriptionDataset {
PollingIngest(FlowDescriptionDatasetPollingIngest),
PushIngest(FlowDescriptionDatasetPushIngest),
Expand All @@ -263,53 +326,53 @@ enum FlowDescriptionDataset {
Reset(FlowDescriptionDatasetReset),
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionDatasetPollingIngest {
dataset_id: DatasetID,
ingest_result: Option<FlowDescriptionUpdateResult>,
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionDatasetPushIngest {
dataset_id: DatasetID,
source_name: Option<String>,
input_records_count: u64,
ingest_result: Option<FlowDescriptionUpdateResult>,
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionDatasetExecuteTransform {
dataset_id: DatasetID,
transform_result: Option<FlowDescriptionUpdateResult>,
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionDatasetHardCompaction {
dataset_id: DatasetID,
compaction_result: Option<FlowDescriptionDatasetHardCompactionResult>,
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionDatasetReset {
dataset_id: DatasetID,
reset_result: Option<FlowDescriptionResetResult>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Union)]
#[derive(Union, Clone)]
enum FlowDescriptionUpdateResult {
UpToDate(FlowDescriptionUpdateResultUpToDate),
Success(FlowDescriptionUpdateResultSuccess),
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionUpdateResultUpToDate {
/// The value indicates whether the api cache was used
uncacheable: bool,
}

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionUpdateResultSuccess {
num_blocks: u64,
num_records: u64,
Expand Down Expand Up @@ -417,7 +480,7 @@ impl FlowDescriptionDatasetHardCompactionResult {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject)]
#[derive(SimpleObject, Clone)]
struct FlowDescriptionResetResult {
new_head: Multihash,
}
Expand Down

0 comments on commit 90d4796

Please sign in to comment.