From e0f305e4fb7df5c8bfa6dd1b23275695a161c443 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 8 Mar 2024 17:45:26 +0800 Subject: [PATCH] fix: Correctly handle the Parallelism inference under the DefaultParallelism configuration. (#15543) Signed-off-by: Shanicky Chen --- src/common/src/config.rs | 2 +- src/meta/src/barrier/recovery.rs | 168 +++++++++++++++++++++++------ src/meta/src/rpc/ddl_controller.rs | 8 +- 3 files changed, 144 insertions(+), 34 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f2e00232a38c9..ace9ab6f21fc9 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -350,7 +350,7 @@ pub struct MetaConfig { pub enable_dropped_column_reclaim: bool, } -#[derive(Clone, Debug, Default)] +#[derive(Copy, Clone, Debug, Default)] pub enum DefaultParallelism { #[default] Full, diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 9aed700ec199f..cd71f9eea707e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -19,6 +19,7 @@ use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::DefaultParallelism; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -747,6 +748,45 @@ impl GlobalBarrierManagerContext { Ok(()) } + // We infer the new parallelism strategy based on the prior level of parallelism of the table. + // If the parallelism strategy is Fixed or Auto, we won't make any modifications. + // For Custom, we'll assess the parallelism of the core fragment; + // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive. + // If it's lower, we'll set it to Fixed. + // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full, + // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed. + fn derive_target_parallelism( + available_parallelism: usize, + assigned_parallelism: TableParallelism, + actual_fragment_parallelism: Option, + default_parallelism: DefaultParallelism, + ) -> TableParallelism { + match assigned_parallelism { + TableParallelism::Custom => { + if let Some(fragment_parallelism) = actual_fragment_parallelism { + if fragment_parallelism >= available_parallelism { + TableParallelism::Adaptive + } else { + TableParallelism::Fixed(fragment_parallelism) + } + } else { + TableParallelism::Adaptive + } + } + TableParallelism::Adaptive => { + match (default_parallelism, actual_fragment_parallelism) { + (DefaultParallelism::Default(n), Some(fragment_parallelism)) + if fragment_parallelism == n.get() => + { + TableParallelism::Fixed(fragment_parallelism) + } + _ => TableParallelism::Adaptive, + } + } + _ => assigned_parallelism, + } + } + async fn scale_actors_v1(&self, workers: Vec) -> MetaResult<()> { let info = self.resolve_actor_info(workers.clone()).await?; @@ -758,43 +798,16 @@ impl GlobalBarrierManagerContext { return Ok(()); } - let current_parallelism = info + let available_parallelism = info .node_map .values() .flat_map(|worker_node| worker_node.parallel_units.iter()) .count(); - if current_parallelism == 0 { + if available_parallelism == 0 { return Err(anyhow!("no available parallel units for auto scaling").into()); } - /// We infer the new parallelism strategy based on the prior level of parallelism of the table. - /// If the parallelism strategy is Fixed or Auto, we won't make any modifications. - /// For Custom, we'll assess the parallelism of the core fragment; - /// if the parallelism is higher than the currently available parallelism, we'll set it to Auto. - /// If it's lower, we'll set it to Fixed. - fn derive_target_parallelism_for_custom( - current_parallelism: usize, - table: &TableFragments, - ) -> TableParallelism { - let derive_from_fragment = table.mview_fragment().or_else(|| table.sink_fragment()); - - if let TableParallelism::Custom = &table.assigned_parallelism { - if let Some(fragment) = derive_from_fragment { - let fragment_parallelism = fragment.get_actors().len(); - if fragment_parallelism >= current_parallelism { - TableParallelism::Adaptive - } else { - TableParallelism::Fixed(fragment_parallelism) - } - } else { - TableParallelism::Adaptive - } - } else { - table.assigned_parallelism - } - } - let all_table_parallelisms: HashMap<_, _> = { let guard = mgr.fragment_manager.get_fragment_read_guard().await; @@ -803,9 +816,18 @@ impl GlobalBarrierManagerContext { .iter() .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| { + let actual_fragment_parallelism = table + .mview_fragment() + .or_else(|| table.sink_fragment()) + .map(|fragment| fragment.get_actors().len()); ( *table_id, - derive_target_parallelism_for_custom(current_parallelism, table), + Self::derive_target_parallelism( + available_parallelism, + table.assigned_parallelism, + actual_fragment_parallelism, + self.env.opts.default_parallelism, + ), ) }) .collect() @@ -1089,3 +1111,89 @@ impl GlobalBarrierManager { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::num::NonZeroUsize; + + use super::*; + #[test] + fn test_derive_target_parallelism() { + // total 10, assigned custom, actual 5, default full -> fixed(5) + assert_eq!( + TableParallelism::Fixed(5), + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Custom, + Some(5), + DefaultParallelism::Full, + ) + ); + + // total 10, assigned custom, actual 10, default full -> adaptive + assert_eq!( + TableParallelism::Adaptive, + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Custom, + Some(10), + DefaultParallelism::Full, + ) + ); + + // total 10, assigned custom, actual 11, default full -> adaptive + assert_eq!( + TableParallelism::Adaptive, + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Custom, + Some(11), + DefaultParallelism::Full, + ) + ); + + // total 10, assigned fixed(5), actual _, default full -> fixed(5) + assert_eq!( + TableParallelism::Adaptive, + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Custom, + None, + DefaultParallelism::Full, + ) + ); + + // total 10, assigned adaptive, actual _, default full -> adaptive + assert_eq!( + TableParallelism::Adaptive, + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Adaptive, + None, + DefaultParallelism::Full, + ) + ); + + // total 10, assigned adaptive, actual 5, default 5 -> fixed(5) + assert_eq!( + TableParallelism::Fixed(5), + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Adaptive, + Some(5), + DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()), + ) + ); + + // total 10, assigned adaptive, actual 6, default 5 -> adaptive + assert_eq!( + TableParallelism::Adaptive, + GlobalBarrierManagerContext::derive_target_parallelism( + 10, + TableParallelism::Adaptive, + Some(6), + DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()), + ) + ); + } +} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 040bcce10110f..d2fbab5acaf1c 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1399,9 +1399,11 @@ impl DdlController { // and the context that contains all information needed for building the // actors on the compute nodes. - let table_parallelism = match default_parallelism { - None => TableParallelism::Adaptive, - Some(parallelism) => TableParallelism::Fixed(parallelism.get()), + // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE. + // Otherwise, it defaults to FIXED based on deduction. + let table_parallelism = match (default_parallelism, &self.env.opts.default_parallelism) { + (None, DefaultParallelism::Full) => TableParallelism::Adaptive, + _ => TableParallelism::Fixed(parallelism.get()), }; let table_fragments = TableFragments::new(