Skip to content

Commit

Permalink
fix: Use a warning when the parallelism does not meet the fixed requi…
Browse files Browse the repository at this point in the history
…rements (#16157)
  • Loading branch information
shanicky authored and yezizp2012 committed Apr 7, 2024
1 parent 9fbda61 commit ac3b425
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::{Instant, MissedTickBehavior};
use tracing::warn;

use crate::barrier::{Command, Reschedule, StreamRpcManager};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
Expand Down Expand Up @@ -2007,12 +2008,19 @@ impl ScaleController {
),
);
}
TableParallelism::Fixed(n) => {
if n > all_available_parallel_unit_ids.len() {
bail!(
"Not enough ParallelUnits available for fragment {}",
fragment_id
TableParallelism::Fixed(mut n) => {
let available_parallelism = all_available_parallel_unit_ids.len();

if n > available_parallelism {
warn!(
"not enough parallel units available for job {} fragment {}, required {}, resetting to {}",
table_id,
fragment_id,
n,
available_parallelism,
);

n = available_parallelism;
}

let rebalance_result =
Expand Down

0 comments on commit ac3b425

Please sign in to comment.