diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index e0a2fb390f42d..6d2dc256ec54a 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2435,12 +2435,15 @@ impl GlobalStreamManager { .await; })); + tracing::debug!("pausing tick lock in source manager"); let _source_pause_guard = self.source_manager.paused.lock().await; self.barrier_scheduler .run_config_change_command_with_pause(command) .await?; + tracing::info!("reschedule done"); + Ok(()) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 8aef7a68d5483..14922ac3e3366 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -755,27 +755,51 @@ impl GlobalStreamManager { .map(|node| node.id) .collect::>(); - let reschedules = if deferred { - HashMap::new() - } else { + let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]); + + if deferred { + tracing::debug!( + "deferred mode enabled for job {}, set the parallelism directly to {:?}", + table_id, + parallelism + ); self.scale_controller + .as_ref() + .unwrap() + .post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment) + .await?; + } else { + let reschedules = self + .scale_controller .as_ref() .unwrap() .generate_table_resize_plan(TableResizePolicy { worker_ids, - table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(), + table_parallelisms: table_parallelism_assignment + .iter() + .map(|(id, parallelism)| (id.table_id, *parallelism)) + .collect(), }) - .await? - }; + .await?; - self.reschedule_actors( - reschedules, - RescheduleOptions { - resolve_no_shuffle_upstream: false, - }, - Some(HashMap::from([(TableId::new(table_id), parallelism)])), - ) - .await?; + if reschedules.is_empty() { + tracing::debug!("empty reschedule plan generated for job {}, set the parallelism directly to {:?}", table_id, parallelism); + self.scale_controller + .as_ref() + .unwrap() + .post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment) + .await?; + } else { + self.reschedule_actors( + reschedules, + RescheduleOptions { + resolve_no_shuffle_upstream: false, + }, + Some(table_parallelism_assignment), + ) + .await?; + } + }; Ok(()) }