From bf7205a4b57eda1713c1334fa244d5933fb53154 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 16 Sep 2024 14:49:38 +0800 Subject: [PATCH] do not rewrite alter parallelism in frontend Signed-off-by: Bugen Zhao --- src/frontend/src/handler/alter_parallelism.rs | 20 +++++++++---------- src/meta/src/stream/scale.rs | 2 -- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 23ccc6706fa0..3a823330e974 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -93,7 +93,7 @@ pub async fn handle_alter_parallelism( } }; - let mut target_parallelism = extract_table_parallelism(parallelism)?; + let target_parallelism = extract_table_parallelism(parallelism)?; let available_parallelism = session .env() @@ -103,26 +103,24 @@ pub async fn handle_alter_parallelism( .filter(|w| w.is_streaming_schedulable()) .map(|w| w.parallelism) .sum::(); - // TODO(var-vnode): get max parallelism from catalogs. - // Although the meta service will clamp the value for us, we should still check it here for better UI. - let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; let mut builder = RwPgResponse::builder(stmt_type); + // TODO(var-vnode): get correct max parallelism from catalogs. + // Although the meta service will clamp the value for us and print warnings there, + // we may still check it here for better UI. + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; + match &target_parallelism.parallelism { Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { if available_parallelism > max_parallelism as u32 { - builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {max_parallelism}")); + builder = builder.notice("Available parallelism may exceed the maximum parallelism limit, the actual parallelism will be limited"); } } Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { if *parallelism > max_parallelism as u32 { - builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({max_parallelism})")); - target_parallelism = PbTableParallelism { - parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: max_parallelism as u32, - })), - }; + builder = builder.notice("Provided parallelism may exceed the maximum parallelism limit, will be reset to FIXED(max_parallelism)"); + // Rewriting will be done in meta service. } } _ => {} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 4a2604725e97..29cf74b152d0 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2226,8 +2226,6 @@ impl ScaleController { } TableParallelism::Fixed(mut n) => { if n > max_parallelism { - // This should be unreachable as it was already checked and rewritten in the frontend. - // We still intercept it to prevent accidental modifications. tracing::warn!("specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"); n = max_parallelism }