From ac3b425a02461d7e15ea31e9b5ba4290b642425e Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 5 Apr 2024 15:46:18 +0800 Subject: [PATCH] fix: Use a warning when the parallelism does not meet the fixed requirements (#16157) --- src/meta/src/stream/scale.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 5f64aad214efb..a7ca37f55f0be 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -51,6 +51,7 @@ use tokio::sync::oneshot::Receiver; use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tokio::time::{Instant, MissedTickBehavior}; +use tracing::warn; use crate::barrier::{Command, Reschedule, StreamRpcManager}; use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; @@ -2007,12 +2008,19 @@ impl ScaleController { ), ); } - TableParallelism::Fixed(n) => { - if n > all_available_parallel_unit_ids.len() { - bail!( - "Not enough ParallelUnits available for fragment {}", - fragment_id + TableParallelism::Fixed(mut n) => { + let available_parallelism = all_available_parallel_unit_ids.len(); + + if n > available_parallelism { + warn!( + "not enough parallel units available for job {} fragment {}, required {}, resetting to {}", + table_id, + fragment_id, + n, + available_parallelism, ); + + n = available_parallelism; } let rebalance_result =