From 90d4796374705045ec300559e5566c13cd5434b8 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Fri, 3 Jan 2025 17:32:27 +0100 Subject: [PATCH 1/2] Improve flow queries perfomance --- CHANGELOG.md | 4 + .../flows_mut/dataset_flow_runs_mut.rs | 10 +- .../src/queries/accounts/account_flow_runs.rs | 7 +- .../src/queries/datasets/dataset_flow_runs.rs | 12 +- src/adapter/graphql/src/queries/flows/flow.rs | 153 ++++++++++++------ 5 files changed, 128 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c71377dbc..b2c845c94e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order: - Fixed --> +## [Unreleased] +### Fixed +- GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here) + ## [0.216.0] - 2024-12-30 ### Changed - Flight SQL protocol now supports anonymous and bearer token authentication diff --git a/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_runs_mut.rs b/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_runs_mut.rs index 9107b30342..c2bb240a16 100644 --- a/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_runs_mut.rs +++ b/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_runs_mut.rs @@ -100,7 +100,10 @@ impl DatasetFlowRunsMut { })?; Ok(TriggerFlowResult::Success(TriggerFlowSuccess { - flow: Flow::new(flow_state), + flow: Flow::build_batch(vec![flow_state], ctx) + .await? + .pop() + .unwrap(), })) } @@ -139,7 +142,10 @@ impl DatasetFlowRunsMut { Ok(CancelScheduledTasksResult::Success( CancelScheduledTasksSuccess { - flow: Flow::new(flow_state), + flow: Flow::build_batch(vec![flow_state], ctx) + .await? + .pop() + .unwrap(), }, )) } diff --git a/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs b/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs index 7fb5416e85..6a889934ee 100644 --- a/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs +++ b/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs @@ -86,12 +86,9 @@ impl AccountFlowRuns { .await .int_err()?; - let matched_flows: Vec<_> = flows_state_listing - .matched_stream - .map_ok(Flow::new) - .try_collect() - .await?; + let matched_flow_states: Vec<_> = flows_state_listing.matched_stream.try_collect().await?; let total_count = flows_state_listing.total_count; + let matched_flows = Flow::build_batch(matched_flow_states, ctx).await?; Ok(FlowConnection::new( matched_flows, diff --git a/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs b/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs index 9aad65a504..777511a119 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs @@ -53,7 +53,10 @@ impl DatasetFlowRuns { .int_err()?; Ok(GetFlowResult::Success(GetFlowSuccess { - flow: Flow::new(flow_state), + flow: Flow::build_batch(vec![flow_state], ctx) + .await? + .pop() + .unwrap(), })) } @@ -106,12 +109,9 @@ impl DatasetFlowRuns { .await .int_err()?; - let matched_flows: Vec<_> = flows_state_listing - .matched_stream - .map_ok(Flow::new) - .try_collect() - .await?; + let matched_flow_states: Vec<_> = flows_state_listing.matched_stream.try_collect().await?; let total_count = flows_state_listing.total_count; + let matched_flows = Flow::build_batch(matched_flow_states, ctx).await?; Ok(FlowConnection::new( matched_flows, diff --git a/src/adapter/graphql/src/queries/flows/flow.rs b/src/adapter/graphql/src/queries/flows/flow.rs index 8b35e4769a..6c36b23a88 100644 --- a/src/adapter/graphql/src/queries/flows/flow.rs +++ b/src/adapter/graphql/src/queries/flows/flow.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; + use chrono::{DateTime, Utc}; use kamu_core::{DatasetChangesService, DatasetRegistry, DatasetRegistryExt, MetadataQueryService}; use kamu_flow_system::FlowResultDatasetUpdate; @@ -28,15 +30,41 @@ use crate::utils; #[derive(Clone)] pub struct Flow { flow_state: Box, + description: FlowDescription, } #[Object] impl Flow { #[graphql(skip)] - pub fn new(flow_state: fs::FlowState) -> Self { - Self { - flow_state: Box::new(flow_state), + pub async fn build_batch( + flow_states: Vec, + ctx: &Context<'_>, + ) -> Result> { + let mut result: Vec = Vec::new(); + // We need this HashMap to avoid multiple queries to the same dataset polling + // source and cover cases when dataset has no Ingest flows, so we will + // build flow descriptions without searching of polling sources + // + // In addition it might be useful if we will add another entity which cause + // duplicate requests + let mut dataset_polling_sources: HashMap< + opendatafabric::DatasetID, + Option<( + odf::Multihash, + odf::MetadataBlockTyped, + )>, + > = HashMap::new(); + + for flow_state in &flow_states { + let flow_description = + Self::build_description(ctx, flow_state, &mut dataset_polling_sources).await?; + result.push(Self { + flow_state: Box::new(flow_state.clone()), + description: flow_description, + }); } + + Ok(result) } /// Unique identifier of the flow @@ -44,44 +72,79 @@ impl Flow { self.flow_state.flow_id.into() } - /// Description of key flow parameters - async fn description(&self, ctx: &Context<'_>) -> Result { - Ok(match &self.flow_state.flow_key { - fs::FlowKey::Dataset(fk_dataset) => { - FlowDescription::Dataset(self.dataset_flow_description(ctx, fk_dataset).await?) - } + #[graphql(skip)] + async fn build_description( + ctx: &Context<'_>, + flow_state: &fs::FlowState, + dataset_polling_sources_maybe: &mut HashMap< + opendatafabric::DatasetID, + Option<( + odf::Multihash, + odf::MetadataBlockTyped, + )>, + >, + ) -> Result { + Ok(match &flow_state.flow_key { + fs::FlowKey::Dataset(fk_dataset) => FlowDescription::Dataset( + Self::dataset_flow_description( + ctx, + flow_state, + fk_dataset, + dataset_polling_sources_maybe, + ) + .await?, + ), fs::FlowKey::System(fk_system) => { - FlowDescription::System(self.system_flow_description(fk_system)) + FlowDescription::System(Self::system_flow_description(fk_system)) } }) } + /// Description of key flow parameters + async fn description(&self) -> FlowDescription { + self.description.clone() + } + #[graphql(skip)] async fn dataset_flow_description( - &self, ctx: &Context<'_>, + flow_state: &fs::FlowState, dataset_key: &fs::FlowKeyDataset, + dataset_polling_sources: &mut HashMap< + opendatafabric::DatasetID, + Option<( + odf::Multihash, + odf::MetadataBlockTyped, + )>, + >, ) -> Result { Ok(match dataset_key.flow_type { fs::DatasetFlowType::Ingest => { - let (dataset_registry, metadata_query_service, dataset_changes_svc) = from_catalog_n!( - ctx, - dyn DatasetRegistry, - dyn MetadataQueryService, - dyn DatasetChangesService - ); - let target = dataset_registry - .get_dataset_by_ref(&dataset_key.dataset_id.as_local_ref()) - .await - .int_err()?; - - let maybe_polling_source = metadata_query_service - .get_active_polling_source(target) - .await - .int_err()?; + let maybe_polling_source = if let Some(existing_polling_source) = + dataset_polling_sources.get(&dataset_key.dataset_id) + { + existing_polling_source.clone() + } else { + let (dataset_registry, metadata_query_service) = + from_catalog_n!(ctx, dyn DatasetRegistry, dyn MetadataQueryService); + let target = dataset_registry + .get_dataset_by_ref(&dataset_key.dataset_id.as_local_ref()) + .await + .int_err()?; + + let polling_source_maybe = metadata_query_service + .get_active_polling_source(target) + .await + .int_err()?; + + dataset_polling_sources + .insert(dataset_key.dataset_id.clone(), polling_source_maybe.clone()); + polling_source_maybe + }; + let dataset_changes_svc = from_catalog_n!(ctx, dyn DatasetChangesService); let ingest_result = FlowDescriptionUpdateResult::from_maybe_flow_outcome( - self.flow_state.outcome.as_ref(), + flow_state.outcome.as_ref(), &dataset_key.dataset_id, dataset_changes_svc.as_ref(), ) @@ -94,7 +157,7 @@ impl Flow { ingest_result, }) } else { - let source_name = self.flow_state.primary_trigger().push_source_name(); + let source_name = flow_state.primary_trigger().push_source_name(); FlowDescriptionDataset::PushIngest(FlowDescriptionDatasetPushIngest { dataset_id: dataset_key.dataset_id.clone().into(), source_name, @@ -109,7 +172,7 @@ impl Flow { FlowDescriptionDataset::ExecuteTransform(FlowDescriptionDatasetExecuteTransform { dataset_id: dataset_key.dataset_id.clone().into(), transform_result: FlowDescriptionUpdateResult::from_maybe_flow_outcome( - self.flow_state.outcome.as_ref(), + flow_state.outcome.as_ref(), &dataset_key.dataset_id, dataset_changes_svc.as_ref(), ) @@ -122,7 +185,7 @@ impl Flow { dataset_id: dataset_key.dataset_id.clone().into(), compaction_result: FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome( - self.flow_state.outcome.as_ref(), + flow_state.outcome.as_ref(), ), }) } @@ -130,7 +193,7 @@ impl Flow { FlowDescriptionDataset::Reset(FlowDescriptionDatasetReset { dataset_id: dataset_key.dataset_id.clone().into(), reset_result: FlowDescriptionResetResult::from_maybe_flow_outcome( - self.flow_state.outcome.as_ref(), + flow_state.outcome.as_ref(), ), }) } @@ -138,7 +201,7 @@ impl Flow { } #[graphql(skip)] - fn system_flow_description(&self, system_key: &fs::FlowKeySystem) -> FlowDescriptionSystem { + fn system_flow_description(system_key: &fs::FlowKeySystem) -> FlowDescriptionSystem { match system_key.flow_type { fs::SystemFlowType::GC => { FlowDescriptionSystem::GC(FlowDescriptionSystemGC { dummy: true }) @@ -236,7 +299,7 @@ impl Flow { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Union)] +#[derive(Union, Clone)] enum FlowDescription { #[graphql(flatten)] Dataset(FlowDescriptionDataset), @@ -244,17 +307,17 @@ enum FlowDescription { System(FlowDescriptionSystem), } -#[derive(Union)] +#[derive(Union, Clone)] enum FlowDescriptionSystem { GC(FlowDescriptionSystemGC), } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionSystemGC { dummy: bool, } -#[derive(Union)] +#[derive(Union, Clone)] enum FlowDescriptionDataset { PollingIngest(FlowDescriptionDatasetPollingIngest), PushIngest(FlowDescriptionDatasetPushIngest), @@ -263,13 +326,13 @@ enum FlowDescriptionDataset { Reset(FlowDescriptionDatasetReset), } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionDatasetPollingIngest { dataset_id: DatasetID, ingest_result: Option, } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionDatasetPushIngest { dataset_id: DatasetID, source_name: Option, @@ -277,19 +340,19 @@ struct FlowDescriptionDatasetPushIngest { ingest_result: Option, } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionDatasetExecuteTransform { dataset_id: DatasetID, transform_result: Option, } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionDatasetHardCompaction { dataset_id: DatasetID, compaction_result: Option, } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionDatasetReset { dataset_id: DatasetID, reset_result: Option, @@ -297,19 +360,19 @@ struct FlowDescriptionDatasetReset { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Union)] +#[derive(Union, Clone)] enum FlowDescriptionUpdateResult { UpToDate(FlowDescriptionUpdateResultUpToDate), Success(FlowDescriptionUpdateResultSuccess), } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionUpdateResultUpToDate { /// The value indicates whether the api cache was used uncacheable: bool, } -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionUpdateResultSuccess { num_blocks: u64, num_records: u64, @@ -417,7 +480,7 @@ impl FlowDescriptionDatasetHardCompactionResult { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(SimpleObject)] +#[derive(SimpleObject, Clone)] struct FlowDescriptionResetResult { new_head: Multihash, } From b993dfa5cbbccc8623ecbc6613706d1cf7808766 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Mon, 6 Jan 2025 12:36:37 +0100 Subject: [PATCH 2/2] Fix review comments - Add description builder struct --- src/adapter/graphql/src/queries/flows/flow.rs | 369 +----------------- .../src/queries/flows/flow_description.rs | 363 +++++++++++++++++ src/adapter/graphql/src/queries/flows/mod.rs | 1 + .../src/services/metadata_query_service.rs | 13 +- .../services/metadata_query_service_impl.rs | 8 +- 5 files changed, 377 insertions(+), 377 deletions(-) create mode 100644 src/adapter/graphql/src/queries/flows/flow_description.rs diff --git a/src/adapter/graphql/src/queries/flows/flow.rs b/src/adapter/graphql/src/queries/flows/flow.rs index 6c36b23a88..77efbfdd3a 100644 --- a/src/adapter/graphql/src/queries/flows/flow.rs +++ b/src/adapter/graphql/src/queries/flows/flow.rs @@ -7,13 +7,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::HashMap; - -use chrono::{DateTime, Utc}; -use kamu_core::{DatasetChangesService, DatasetRegistry, DatasetRegistryExt, MetadataQueryService}; -use kamu_flow_system::FlowResultDatasetUpdate; -use {kamu_flow_system as fs, opendatafabric as odf}; +use kamu_flow_system as fs; +use super::flow_description::{FlowDescription, FlowDescriptionBuilder}; use super::{ FlowConfigurationSnapshot, FlowEvent, @@ -41,25 +37,12 @@ impl Flow { ctx: &Context<'_>, ) -> Result> { let mut result: Vec = Vec::new(); - // We need this HashMap to avoid multiple queries to the same dataset polling - // source and cover cases when dataset has no Ingest flows, so we will - // build flow descriptions without searching of polling sources - // - // In addition it might be useful if we will add another entity which cause - // duplicate requests - let mut dataset_polling_sources: HashMap< - opendatafabric::DatasetID, - Option<( - odf::Multihash, - odf::MetadataBlockTyped, - )>, - > = HashMap::new(); + let mut flow_description_builder = FlowDescriptionBuilder::new(); - for flow_state in &flow_states { - let flow_description = - Self::build_description(ctx, flow_state, &mut dataset_polling_sources).await?; + for flow_state in flow_states { + let flow_description = flow_description_builder.build(ctx, &flow_state).await?; result.push(Self { - flow_state: Box::new(flow_state.clone()), + flow_state: Box::new(flow_state), description: flow_description, }); } @@ -72,143 +55,11 @@ impl Flow { self.flow_state.flow_id.into() } - #[graphql(skip)] - async fn build_description( - ctx: &Context<'_>, - flow_state: &fs::FlowState, - dataset_polling_sources_maybe: &mut HashMap< - opendatafabric::DatasetID, - Option<( - odf::Multihash, - odf::MetadataBlockTyped, - )>, - >, - ) -> Result { - Ok(match &flow_state.flow_key { - fs::FlowKey::Dataset(fk_dataset) => FlowDescription::Dataset( - Self::dataset_flow_description( - ctx, - flow_state, - fk_dataset, - dataset_polling_sources_maybe, - ) - .await?, - ), - fs::FlowKey::System(fk_system) => { - FlowDescription::System(Self::system_flow_description(fk_system)) - } - }) - } - /// Description of key flow parameters async fn description(&self) -> FlowDescription { self.description.clone() } - #[graphql(skip)] - async fn dataset_flow_description( - ctx: &Context<'_>, - flow_state: &fs::FlowState, - dataset_key: &fs::FlowKeyDataset, - dataset_polling_sources: &mut HashMap< - opendatafabric::DatasetID, - Option<( - odf::Multihash, - odf::MetadataBlockTyped, - )>, - >, - ) -> Result { - Ok(match dataset_key.flow_type { - fs::DatasetFlowType::Ingest => { - let maybe_polling_source = if let Some(existing_polling_source) = - dataset_polling_sources.get(&dataset_key.dataset_id) - { - existing_polling_source.clone() - } else { - let (dataset_registry, metadata_query_service) = - from_catalog_n!(ctx, dyn DatasetRegistry, dyn MetadataQueryService); - let target = dataset_registry - .get_dataset_by_ref(&dataset_key.dataset_id.as_local_ref()) - .await - .int_err()?; - - let polling_source_maybe = metadata_query_service - .get_active_polling_source(target) - .await - .int_err()?; - - dataset_polling_sources - .insert(dataset_key.dataset_id.clone(), polling_source_maybe.clone()); - polling_source_maybe - }; - - let dataset_changes_svc = from_catalog_n!(ctx, dyn DatasetChangesService); - let ingest_result = FlowDescriptionUpdateResult::from_maybe_flow_outcome( - flow_state.outcome.as_ref(), - &dataset_key.dataset_id, - dataset_changes_svc.as_ref(), - ) - .await - .int_err()?; - - if maybe_polling_source.is_some() { - FlowDescriptionDataset::PollingIngest(FlowDescriptionDatasetPollingIngest { - dataset_id: dataset_key.dataset_id.clone().into(), - ingest_result, - }) - } else { - let source_name = flow_state.primary_trigger().push_source_name(); - FlowDescriptionDataset::PushIngest(FlowDescriptionDatasetPushIngest { - dataset_id: dataset_key.dataset_id.clone().into(), - source_name, - input_records_count: 0, // TODO - ingest_result, - }) - } - } - fs::DatasetFlowType::ExecuteTransform => { - let dataset_changes_svc = from_catalog_n!(ctx, dyn DatasetChangesService); - - FlowDescriptionDataset::ExecuteTransform(FlowDescriptionDatasetExecuteTransform { - dataset_id: dataset_key.dataset_id.clone().into(), - transform_result: FlowDescriptionUpdateResult::from_maybe_flow_outcome( - flow_state.outcome.as_ref(), - &dataset_key.dataset_id, - dataset_changes_svc.as_ref(), - ) - .await - .int_err()?, - }) - } - fs::DatasetFlowType::HardCompaction => { - FlowDescriptionDataset::HardCompaction(FlowDescriptionDatasetHardCompaction { - dataset_id: dataset_key.dataset_id.clone().into(), - compaction_result: - FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome( - flow_state.outcome.as_ref(), - ), - }) - } - fs::DatasetFlowType::Reset => { - FlowDescriptionDataset::Reset(FlowDescriptionDatasetReset { - dataset_id: dataset_key.dataset_id.clone().into(), - reset_result: FlowDescriptionResetResult::from_maybe_flow_outcome( - flow_state.outcome.as_ref(), - ), - }) - } - }) - } - - #[graphql(skip)] - fn system_flow_description(system_key: &fs::FlowKeySystem) -> FlowDescriptionSystem { - match system_key.flow_type { - fs::SystemFlowType::GC => { - FlowDescriptionSystem::GC(FlowDescriptionSystemGC { dummy: true }) - } - } - } - /// Status of the flow async fn status(&self) -> FlowStatus { self.flow_state.status().into() @@ -298,211 +149,3 @@ impl Flow { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(Union, Clone)] -enum FlowDescription { - #[graphql(flatten)] - Dataset(FlowDescriptionDataset), - #[graphql(flatten)] - System(FlowDescriptionSystem), -} - -#[derive(Union, Clone)] -enum FlowDescriptionSystem { - GC(FlowDescriptionSystemGC), -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionSystemGC { - dummy: bool, -} - -#[derive(Union, Clone)] -enum FlowDescriptionDataset { - PollingIngest(FlowDescriptionDatasetPollingIngest), - PushIngest(FlowDescriptionDatasetPushIngest), - ExecuteTransform(FlowDescriptionDatasetExecuteTransform), - HardCompaction(FlowDescriptionDatasetHardCompaction), - Reset(FlowDescriptionDatasetReset), -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionDatasetPollingIngest { - dataset_id: DatasetID, - ingest_result: Option, -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionDatasetPushIngest { - dataset_id: DatasetID, - source_name: Option, - input_records_count: u64, - ingest_result: Option, -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionDatasetExecuteTransform { - dataset_id: DatasetID, - transform_result: Option, -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionDatasetHardCompaction { - dataset_id: DatasetID, - compaction_result: Option, -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionDatasetReset { - dataset_id: DatasetID, - reset_result: Option, -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(Union, Clone)] -enum FlowDescriptionUpdateResult { - UpToDate(FlowDescriptionUpdateResultUpToDate), - Success(FlowDescriptionUpdateResultSuccess), -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionUpdateResultUpToDate { - /// The value indicates whether the api cache was used - uncacheable: bool, -} - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionUpdateResultSuccess { - num_blocks: u64, - num_records: u64, - updated_watermark: Option>, -} - -impl FlowDescriptionUpdateResult { - async fn from_maybe_flow_outcome( - maybe_outcome: Option<&fs::FlowOutcome>, - dataset_id: &odf::DatasetID, - dataset_changes_service: &dyn DatasetChangesService, - ) -> Result, InternalError> { - if let Some(outcome) = maybe_outcome { - match outcome { - fs::FlowOutcome::Success(result) => match result { - fs::FlowResult::Empty - | fs::FlowResult::DatasetCompact(_) - | fs::FlowResult::DatasetReset(_) => Ok(None), - fs::FlowResult::DatasetUpdate(update) => match update { - FlowResultDatasetUpdate::Changed(update_result) => { - let increment = dataset_changes_service - .get_increment_between( - dataset_id, - update_result.old_head.as_ref(), - &update_result.new_head, - ) - .await - .int_err()?; - - Ok(Some(Self::Success(FlowDescriptionUpdateResultSuccess { - num_blocks: increment.num_blocks, - num_records: increment.num_records, - updated_watermark: increment.updated_watermark, - }))) - } - FlowResultDatasetUpdate::UpToDate(up_to_date_result) => { - Ok(Some(Self::UpToDate(FlowDescriptionUpdateResultUpToDate { - uncacheable: up_to_date_result.uncacheable, - }))) - } - }, - }, - _ => Ok(None), - } - } else { - Ok(None) - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(Union, Debug, Clone)] -enum FlowDescriptionDatasetHardCompactionResult { - NothingToDo(FlowDescriptionHardCompactionNothingToDo), - Success(FlowDescriptionHardCompactionSuccess), -} - -#[derive(SimpleObject, Debug, Clone)] -struct FlowDescriptionHardCompactionSuccess { - original_blocks_count: u64, - resulting_blocks_count: u64, - new_head: Multihash, -} - -#[derive(SimpleObject, Debug, Clone)] -#[graphql(complex)] -pub struct FlowDescriptionHardCompactionNothingToDo { - pub _dummy: String, -} - -#[ComplexObject] -impl FlowDescriptionHardCompactionNothingToDo { - async fn message(&self) -> String { - "Nothing to do".to_string() - } -} - -impl FlowDescriptionDatasetHardCompactionResult { - fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option { - if let Some(outcome) = maybe_outcome { - match outcome { - fs::FlowOutcome::Success(result) => match result { - fs::FlowResult::DatasetUpdate(_) | fs::FlowResult::DatasetReset(_) => None, - fs::FlowResult::Empty => Some(Self::NothingToDo( - FlowDescriptionHardCompactionNothingToDo { - _dummy: "Nothing to do".to_string(), - }, - )), - fs::FlowResult::DatasetCompact(compact) => { - Some(Self::Success(FlowDescriptionHardCompactionSuccess { - original_blocks_count: compact.old_num_blocks as u64, - resulting_blocks_count: compact.new_num_blocks as u64, - new_head: compact.new_head.clone().into(), - })) - } - }, - _ => None, - } - } else { - None - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(SimpleObject, Clone)] -struct FlowDescriptionResetResult { - new_head: Multihash, -} - -impl FlowDescriptionResetResult { - fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option { - if let Some(outcome) = maybe_outcome { - match outcome { - fs::FlowOutcome::Success(result) => match result { - fs::FlowResult::Empty - | fs::FlowResult::DatasetCompact(_) - | fs::FlowResult::DatasetUpdate(_) => None, - fs::FlowResult::DatasetReset(reset_result) => Some(Self { - new_head: reset_result.new_head.clone().into(), - }), - }, - _ => None, - } - } else { - None - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/queries/flows/flow_description.rs b/src/adapter/graphql/src/queries/flows/flow_description.rs new file mode 100644 index 0000000000..7954dab57e --- /dev/null +++ b/src/adapter/graphql/src/queries/flows/flow_description.rs @@ -0,0 +1,363 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use kamu_core::{ + DatasetChangesService, + DatasetRegistry, + DatasetRegistryExt, + MetadataQueryService, + PollingSourceBlockInfo, +}; +use kamu_flow_system::FlowResultDatasetUpdate; +use {kamu_flow_system as fs, opendatafabric as odf}; + +use crate::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Union, Clone)] +pub(crate) enum FlowDescription { + #[graphql(flatten)] + Dataset(FlowDescriptionDataset), + #[graphql(flatten)] + System(FlowDescriptionSystem), +} + +#[derive(Union, Clone)] +pub(crate) enum FlowDescriptionSystem { + GC(FlowDescriptionSystemGC), +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionSystemGC { + dummy: bool, +} + +#[derive(Union, Clone)] +pub(crate) enum FlowDescriptionDataset { + PollingIngest(FlowDescriptionDatasetPollingIngest), + PushIngest(FlowDescriptionDatasetPushIngest), + ExecuteTransform(FlowDescriptionDatasetExecuteTransform), + HardCompaction(FlowDescriptionDatasetHardCompaction), + Reset(FlowDescriptionDatasetReset), +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionDatasetPollingIngest { + dataset_id: DatasetID, + ingest_result: Option, +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionDatasetPushIngest { + dataset_id: DatasetID, + source_name: Option, + input_records_count: u64, + ingest_result: Option, +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionDatasetExecuteTransform { + dataset_id: DatasetID, + transform_result: Option, +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionDatasetHardCompaction { + dataset_id: DatasetID, + compaction_result: Option, +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionDatasetReset { + dataset_id: DatasetID, + reset_result: Option, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Union, Clone)] +pub(crate) enum FlowDescriptionUpdateResult { + UpToDate(FlowDescriptionUpdateResultUpToDate), + Success(FlowDescriptionUpdateResultSuccess), +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionUpdateResultUpToDate { + /// The value indicates whether the api cache was used + uncacheable: bool, +} + +#[derive(SimpleObject, Clone)] +pub(crate) struct FlowDescriptionUpdateResultSuccess { + num_blocks: u64, + num_records: u64, + updated_watermark: Option>, +} + +impl FlowDescriptionUpdateResult { + async fn from_maybe_flow_outcome( + maybe_outcome: Option<&fs::FlowOutcome>, + dataset_id: &odf::DatasetID, + dataset_changes_service: &dyn DatasetChangesService, + ) -> Result, InternalError> { + if let Some(outcome) = maybe_outcome { + match outcome { + fs::FlowOutcome::Success(result) => match result { + fs::FlowResult::Empty + | fs::FlowResult::DatasetCompact(_) + | fs::FlowResult::DatasetReset(_) => Ok(None), + fs::FlowResult::DatasetUpdate(update) => match update { + FlowResultDatasetUpdate::Changed(update_result) => { + let increment = dataset_changes_service + .get_increment_between( + dataset_id, + update_result.old_head.as_ref(), + &update_result.new_head, + ) + .await + .int_err()?; + + Ok(Some(Self::Success(FlowDescriptionUpdateResultSuccess { + num_blocks: increment.num_blocks, + num_records: increment.num_records, + updated_watermark: increment.updated_watermark, + }))) + } + FlowResultDatasetUpdate::UpToDate(up_to_date_result) => { + Ok(Some(Self::UpToDate(FlowDescriptionUpdateResultUpToDate { + uncacheable: up_to_date_result.uncacheable, + }))) + } + }, + }, + _ => Ok(None), + } + } else { + Ok(None) + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Union, Debug, Clone)] +enum FlowDescriptionDatasetHardCompactionResult { + NothingToDo(FlowDescriptionHardCompactionNothingToDo), + Success(FlowDescriptionHardCompactionSuccess), +} + +#[derive(SimpleObject, Debug, Clone)] +struct FlowDescriptionHardCompactionSuccess { + original_blocks_count: u64, + resulting_blocks_count: u64, + new_head: Multihash, +} + +#[derive(SimpleObject, Debug, Clone)] +#[graphql(complex)] +pub struct FlowDescriptionHardCompactionNothingToDo { + pub _dummy: String, +} + +#[ComplexObject] +impl FlowDescriptionHardCompactionNothingToDo { + async fn message(&self) -> String { + "Nothing to do".to_string() + } +} + +impl FlowDescriptionDatasetHardCompactionResult { + fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option { + if let Some(outcome) = maybe_outcome { + match outcome { + fs::FlowOutcome::Success(result) => match result { + fs::FlowResult::DatasetUpdate(_) | fs::FlowResult::DatasetReset(_) => None, + fs::FlowResult::Empty => Some(Self::NothingToDo( + FlowDescriptionHardCompactionNothingToDo { + _dummy: "Nothing to do".to_string(), + }, + )), + fs::FlowResult::DatasetCompact(compact) => { + Some(Self::Success(FlowDescriptionHardCompactionSuccess { + original_blocks_count: compact.old_num_blocks as u64, + resulting_blocks_count: compact.new_num_blocks as u64, + new_head: compact.new_head.clone().into(), + })) + } + }, + _ => None, + } + } else { + None + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(SimpleObject, Clone)] +struct FlowDescriptionResetResult { + new_head: Multihash, +} + +impl FlowDescriptionResetResult { + fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option { + if let Some(outcome) = maybe_outcome { + match outcome { + fs::FlowOutcome::Success(result) => match result { + fs::FlowResult::Empty + | fs::FlowResult::DatasetCompact(_) + | fs::FlowResult::DatasetUpdate(_) => None, + fs::FlowResult::DatasetReset(reset_result) => Some(Self { + new_head: reset_result.new_head.clone().into(), + }), + }, + _ => None, + } + } else { + None + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct FlowDescriptionBuilder { + // We need this HashMap to avoid multiple queries to the same dataset polling + // source and cover cases when dataset has no Ingest flows, so we will + // build flow descriptions without searching of polling sources + // + // In addition it might be useful if we will add another entity which cause + // duplicate requests + dataset_polling_sources: HashMap>, +} + +impl FlowDescriptionBuilder { + pub fn new() -> Self { + Self { + dataset_polling_sources: HashMap::new(), + } + } + + pub async fn build( + &mut self, + ctx: &Context<'_>, + flow_state: &fs::FlowState, + ) -> Result { + Ok(match &flow_state.flow_key { + fs::FlowKey::Dataset(fk_dataset) => FlowDescription::Dataset( + self.dataset_flow_description(ctx, flow_state, fk_dataset) + .await?, + ), + fs::FlowKey::System(fk_system) => { + FlowDescription::System(self.system_flow_description(fk_system)) + } + }) + } + + fn system_flow_description(&self, system_key: &fs::FlowKeySystem) -> FlowDescriptionSystem { + match system_key.flow_type { + fs::SystemFlowType::GC => { + FlowDescriptionSystem::GC(FlowDescriptionSystemGC { dummy: true }) + } + } + } + + async fn dataset_flow_description( + &mut self, + ctx: &Context<'_>, + flow_state: &fs::FlowState, + dataset_key: &fs::FlowKeyDataset, + ) -> Result { + Ok(match dataset_key.flow_type { + fs::DatasetFlowType::Ingest => { + let maybe_polling_source = if let Some(existing_polling_source) = + self.dataset_polling_sources.get(&dataset_key.dataset_id) + { + existing_polling_source.clone() + } else { + let (dataset_registry, metadata_query_service) = + from_catalog_n!(ctx, dyn DatasetRegistry, dyn MetadataQueryService); + let target = dataset_registry + .get_dataset_by_ref(&dataset_key.dataset_id.as_local_ref()) + .await + .int_err()?; + + let polling_source_maybe = metadata_query_service + .get_active_polling_source(target) + .await + .int_err()?; + + self.dataset_polling_sources + .insert(dataset_key.dataset_id.clone(), polling_source_maybe.clone()); + polling_source_maybe + }; + + let dataset_changes_svc = from_catalog_n!(ctx, dyn DatasetChangesService); + let ingest_result = FlowDescriptionUpdateResult::from_maybe_flow_outcome( + flow_state.outcome.as_ref(), + &dataset_key.dataset_id, + dataset_changes_svc.as_ref(), + ) + .await + .int_err()?; + + if maybe_polling_source.is_some() { + FlowDescriptionDataset::PollingIngest(FlowDescriptionDatasetPollingIngest { + dataset_id: dataset_key.dataset_id.clone().into(), + ingest_result, + }) + } else { + let source_name = flow_state.primary_trigger().push_source_name(); + FlowDescriptionDataset::PushIngest(FlowDescriptionDatasetPushIngest { + dataset_id: dataset_key.dataset_id.clone().into(), + source_name, + input_records_count: 0, // TODO + ingest_result, + }) + } + } + fs::DatasetFlowType::ExecuteTransform => { + let dataset_changes_svc = from_catalog_n!(ctx, dyn DatasetChangesService); + + FlowDescriptionDataset::ExecuteTransform(FlowDescriptionDatasetExecuteTransform { + dataset_id: dataset_key.dataset_id.clone().into(), + transform_result: FlowDescriptionUpdateResult::from_maybe_flow_outcome( + flow_state.outcome.as_ref(), + &dataset_key.dataset_id, + dataset_changes_svc.as_ref(), + ) + .await + .int_err()?, + }) + } + fs::DatasetFlowType::HardCompaction => { + FlowDescriptionDataset::HardCompaction(FlowDescriptionDatasetHardCompaction { + dataset_id: dataset_key.dataset_id.clone().into(), + compaction_result: + FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome( + flow_state.outcome.as_ref(), + ), + }) + } + fs::DatasetFlowType::Reset => { + FlowDescriptionDataset::Reset(FlowDescriptionDatasetReset { + dataset_id: dataset_key.dataset_id.clone().into(), + reset_result: FlowDescriptionResetResult::from_maybe_flow_outcome( + flow_state.outcome.as_ref(), + ), + }) + } + }) + } +} diff --git a/src/adapter/graphql/src/queries/flows/mod.rs b/src/adapter/graphql/src/queries/flows/mod.rs index 0d47153bec..1048a5ff7f 100644 --- a/src/adapter/graphql/src/queries/flows/mod.rs +++ b/src/adapter/graphql/src/queries/flows/mod.rs @@ -9,6 +9,7 @@ mod flow; mod flow_config_snapshot; +mod flow_description; mod flow_event; mod flow_outcome; mod flow_start_condition; diff --git a/src/domain/core/src/services/metadata_query_service.rs b/src/domain/core/src/services/metadata_query_service.rs index cc1c28cf27..607f3d5e50 100644 --- a/src/domain/core/src/services/metadata_query_service.rs +++ b/src/domain/core/src/services/metadata_query_service.rs @@ -21,13 +21,7 @@ pub trait MetadataQueryService: Send + Sync { async fn get_active_polling_source( &self, target: ResolvedDataset, - ) -> Result< - Option<( - odf::Multihash, - odf::MetadataBlockTyped, - )>, - InternalError, - >; + ) -> Result, InternalError>; /// Returns the set of active push sources async fn get_active_push_sources( @@ -49,3 +43,8 @@ pub trait MetadataQueryService: Send + Sync { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub type PollingSourceBlockInfo = ( + odf::Multihash, + odf::MetadataBlockTyped, +); diff --git a/src/infra/core/src/services/metadata_query_service_impl.rs b/src/infra/core/src/services/metadata_query_service_impl.rs index e9bc47175d..bd154e7c69 100644 --- a/src/infra/core/src/services/metadata_query_service_impl.rs +++ b/src/infra/core/src/services/metadata_query_service_impl.rs @@ -27,13 +27,7 @@ impl MetadataQueryService for MetadataQueryServiceImpl { async fn get_active_polling_source( &self, target: ResolvedDataset, - ) -> Result< - Option<( - odf::Multihash, - odf::MetadataBlockTyped, - )>, - InternalError, - > { + ) -> Result, InternalError> { // TODO: Support source evolution Ok(target .as_metadata_chain()