diff --git a/src/ctl/src/cmd_impl/scale/resize.rs b/src/ctl/src/cmd_impl/scale/resize.rs index 59c2280d17873..4829ae58f9788 100644 --- a/src/ctl/src/cmd_impl/scale/resize.rs +++ b/src/ctl/src/cmd_impl/scale/resize.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::ops::Sub; use std::process::exit; use inquire::Confirm; @@ -60,6 +61,7 @@ impl From for ScaleCommandContext { yes, fragments, target_parallelism_per_worker: None, + exclusive_for_vertical: false, } } } @@ -76,6 +78,7 @@ impl From for ScaleCommandContext { yes, fragments, }, + exclusive, } = value; Self { @@ -87,6 +90,7 @@ impl From for ScaleCommandContext { yes, fragments, target_parallelism_per_worker, + exclusive_for_vertical: exclusive, } } } @@ -100,6 +104,7 @@ pub struct ScaleCommandContext { yes: bool, fragments: Option>, target_parallelism_per_worker: Option, + exclusive_for_vertical: bool, } pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> anyhow::Result<()> { @@ -191,10 +196,11 @@ pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> any output, yes, fragments, + exclusive_for_vertical, } = scale_ctx; let worker_changes = { - let exclude_worker_ids = + let mut exclude_worker_ids = worker_input_to_worker_ids(exclude_workers.unwrap_or_default(), false); let include_worker_ids = worker_input_to_worker_ids(include_workers.unwrap_or_default(), true); @@ -231,6 +237,20 @@ pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> any } } + if exclusive_for_vertical { + let all_worker_ids: HashSet<_> = + streaming_workers_index_by_id.keys().cloned().collect(); + + let include_worker_id_set: HashSet<_> = include_worker_ids.iter().cloned().collect(); + let generated_exclude_worker_ids = all_worker_ids.sub(&include_worker_id_set); + + exclude_worker_ids = exclude_worker_ids + .into_iter() + .chain(generated_exclude_worker_ids) + .unique() + .collect(); + } + WorkerChanges { include_worker_ids, exclude_worker_ids, diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 09f383016aded..281c9839afe1e 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -348,6 +348,10 @@ pub struct ScaleVerticalCommands { /// The target parallelism per worker, requires `workers` to be set. #[clap(long, required = true)] target_parallelism_per_worker: Option, + + /// It will exclude all other workers to maintain the target parallelism only for the target workers. + #[clap(long, default_value_t = false)] + exclusive: bool, } #[derive(Subcommand, Debug)]