From 3a27704ffb61962034d566afceb6b34f5cf87753 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 16 Aug 2024 17:31:08 +0800 Subject: [PATCH] Improve parallelism handling: Check limits in `alter_parallelism.rs`, log warnings for adaptive mode in `ddl_controller.rs`. --- src/frontend/src/handler/alter_parallelism.rs | 33 +++++++-------- src/meta/src/rpc/ddl_controller.rs | 4 +- src/meta/src/stream/scale.rs | 1 + .../scale/streaming_parallelism.rs | 41 ++++++++++++++++++- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index d8f844bb73bee..3c6ab52f51e39 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -106,24 +106,25 @@ pub async fn handle_alter_parallelism( let mut builder = RwPgResponse::builder(stmt_type); - let parallelism = match &target_parallelism.parallelism { - Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => Some(available_parallelism), - Some(Parallelism::Fixed(FixedParallelism { parallelism })) => Some(*parallelism), - _ => None, + match &target_parallelism.parallelism { + Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { + if available_parallelism > VirtualNode::COUNT as u32 { + builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::COUNT)); + } + } + Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { + if *parallelism > VirtualNode::COUNT as u32 { + builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::COUNT)); + target_parallelism = PbTableParallelism { + parallelism: Some(PbParallelism::Fixed(FixedParallelism { + parallelism: VirtualNode::COUNT as u32, + })), + }; + } + } + _ => {} }; - if let Some(parallelism) = parallelism - && parallelism > VirtualNode::COUNT as u32 - { - target_parallelism = PbTableParallelism { - parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: VirtualNode::COUNT as u32, - })), - }; - - builder = builder.notice("Available or provided parallelism exceeds the maximum parallelism limit, resetting to FIXED(256).".to_string()); - } - let catalog_writer = session.catalog_writer()?; catalog_writer .alter_parallelism(table_id, target_parallelism, deferred) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index fb3185f34cf3f..8062675156fe4 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1640,9 +1640,9 @@ impl DdlController { // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE. // Otherwise, it defaults to FIXED based on deduction. let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) { - // If the parallelism is limited, the parallelism is set to FIXED. (None, DefaultParallelism::Full) if parallelism_limited => { - TableParallelism::Fixed(parallelism.get()) + tracing::warn!("Parallelism limited to 256 in ADAPTIVE mode"); + TableParallelism::Adaptive } (None, DefaultParallelism::Full) => TableParallelism::Adaptive, _ => TableParallelism::Fixed(parallelism.get()), diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index e85027be44891..a2a6ca7f1d83e 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2088,6 +2088,7 @@ impl ScaleController { } TableParallelism::Fixed(mut n) => { if n > VirtualNode::COUNT { + // This should be unreachable, but we still intercept it to prevent accidental modifications. tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); n = VirtualNode::COUNT } diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index 8f0b3d9e34dbf..bef6b1ca1f1c2 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -165,12 +165,18 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { session .run("select parallelism from rw_streaming_parallelism where name = 't'") .await? - .assert_result_eq(format!("FIXED({})", vnode_max)); + .assert_result_eq("ADAPTIVE"); + + session + .run("select distinct parallelism from rw_fragment_parallelism where name = 't'") + .await? + .assert_result_eq(format!("{}", vnode_max)); + Ok(()) } #[tokio::test] -async fn test_parallelism_exceed_virtual_node_max_alter() -> Result<()> { +async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { let vnode_max = VirtualNode::COUNT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; @@ -193,3 +199,34 @@ async fn test_parallelism_exceed_virtual_node_max_alter() -> Result<()> { .assert_result_eq(format!("FIXED({})", vnode_max)); Ok(()) } + +#[tokio::test] +async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { + let vnode_max = VirtualNode::COUNT; + let mut configuration = Configuration::for_scale(); + configuration.compute_nodes = 1; + configuration.compute_node_cores = vnode_max + 100; + let mut cluster = Cluster::start(configuration).await?; + let mut session = cluster.start_session(); + session.run("set streaming_parallelism = 1").await?; + session.run("create table t(v int)").await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't'") + .await? + .assert_result_eq("FIXED(1)"); + + session + .run("alter table t set parallelism = adaptive") + .await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't'") + .await? + .assert_result_eq("ADAPTIVE"); + + session + .run("select distinct parallelism from rw_fragment_parallelism where name = 't'") + .await? + .assert_result_eq(format!("{}", vnode_max)); + + Ok(()) +}