diff --git a/src/ctl/src/cmd_impl/scale/resize.rs b/src/ctl/src/cmd_impl/scale/resize.rs index 554952048e0d2..46063c9075024 100644 --- a/src/ctl/src/cmd_impl/scale/resize.rs +++ b/src/ctl/src/cmd_impl/scale/resize.rs @@ -27,7 +27,7 @@ use serde_yaml; use crate::cmd_impl::meta::ReschedulePayload; use crate::common::CtlContext; -use crate::ScaleResizeCommands; +use crate::{ScaleCommon, ScaleHorizonCommands, ScaleVerticalCommands}; macro_rules! fail { ($($arg:tt)*) => {{ @@ -36,8 +36,74 @@ macro_rules! fail { }}; } -pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow::Result<()> { - let meta_client = context.meta_client().await?; +impl From for ScaleCommandContext { + fn from(value: ScaleHorizonCommands) -> Self { + let ScaleHorizonCommands { + exclude_workers, + include_workers, + target_parallelism, + common: + ScaleCommon { + generate, + output, + yes, + fragments, + }, + } = value; + + Self { + exclude_workers, + include_workers, + target_parallelism, + generate, + output, + yes, + fragments, + target_parallelism_per_worker: None, + } + } +} + +impl From for ScaleCommandContext { + fn from(value: ScaleVerticalCommands) -> Self { + let ScaleVerticalCommands { + workers, + target_parallelism_per_worker, + common: + ScaleCommon { + generate, + output, + yes, + fragments, + }, + } = value; + + Self { + exclude_workers: None, + include_workers: workers, + target_parallelism: None, + generate, + output, + yes, + fragments, + target_parallelism_per_worker, + } + } +} + +pub struct ScaleCommandContext { + exclude_workers: Option>, + include_workers: Option>, + target_parallelism: Option, + generate: bool, + output: Option, + yes: bool, + fragments: Option>, + target_parallelism_per_worker: Option, +} + +pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> anyhow::Result<()> { + let meta_client = ctl_ctx.meta_client().await?; let GetClusterInfoResponse { worker_nodes, @@ -116,7 +182,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow streaming_workers_index_by_id.len() ); - let ScaleResizeCommands { + let ScaleCommandContext { exclude_workers, include_workers, target_parallelism, @@ -125,7 +191,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow output, yes, fragments, - } = resize; + } = scale_ctx; let worker_changes = { let exclude_worker_ids = diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index a88bf8decdd63..2238a31b4e383 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -261,9 +261,8 @@ enum TableCommands { List, } -#[derive(clap::Args, Debug)] -#[clap(group(clap::ArgGroup::new("workers_group").required(true).multiple(true).args(&["include_workers", "exclude_workers", "target_parallelism"])))] -pub struct ScaleResizeCommands { +#[derive(clap::Args, Debug, Clone)] +pub struct ScaleHorizonCommands { /// The worker that needs to be excluded during scheduling, worker_id and worker_host are both /// supported #[clap( @@ -288,15 +287,12 @@ pub struct ScaleResizeCommands { #[clap(long)] target_parallelism: Option, - /// The target parallelism per worker, conflicts with `target_parallelism`, requires - /// `include_workers` to be set. - #[clap( - long, - requires = "include_workers", - conflicts_with = "target_parallelism" - )] - target_parallelism_per_worker: Option, + #[command(flatten)] + common: ScaleCommon, +} +#[derive(clap::Args, Debug, Clone)] +pub struct ScaleCommon { /// Will generate a plan supported by the `reschedule` command and save it to the provided path /// by the `--output`. #[clap(long, default_value_t = false)] @@ -316,12 +312,37 @@ pub struct ScaleResizeCommands { fragments: Option>, } +#[derive(clap::Args, Debug, Clone)] +pub struct ScaleVerticalCommands { + #[command(flatten)] + common: ScaleCommon, + + /// The worker that needs to be scheduled, worker_id and worker_host are both + /// supported + #[clap( + long, + value_delimiter = ',', + value_name = "all or worker_id or worker_host, ..." + )] + workers: Option>, + + /// The target parallelism per worker, requires `workers` to be set. + #[clap(long, requires = "workers")] + target_parallelism_per_worker: Option, +} + #[derive(Subcommand, Debug)] enum ScaleCommands { - /// The resize command scales the cluster by specifying the workers to be included and - /// excluded. - Resize(ScaleResizeCommands), - /// mark a compute node as unschedulable + /// Scale the compute nodes horizontally, alias of `horizon` + Resize(ScaleHorizonCommands), + + /// Scale the compute nodes horizontally + Horizon(ScaleHorizonCommands), + + /// Scale the compute nodes vertically + Vertical(ScaleVerticalCommands), + + /// Mark a compute node as unschedulable #[clap(verbatim_doc_comment)] Cordon { /// Workers that need to be cordoned, both id and host are supported. @@ -616,8 +637,12 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Profile(ProfileCommands::Heap { dir }) => { cmd_impl::profile::heap_profile(context, dir).await? } - Commands::Scale(ScaleCommands::Resize(resize)) => { - cmd_impl::scale::resize(context, resize).await? + Commands::Scale(ScaleCommands::Horizon(resize)) + | Commands::Scale(ScaleCommands::Resize(resize)) => { + cmd_impl::scale::resize(context, resize.into()).await? + } + Commands::Scale(ScaleCommands::Vertical(resize)) => { + cmd_impl::scale::resize(context, resize.into()).await? } Commands::Scale(ScaleCommands::Cordon { workers }) => { cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)