Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: modify the scale command to support both horizontal and vertical scaling. (#12087) #12091

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 71 additions & 5 deletions src/ctl/src/cmd_impl/scale/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde_yaml;

use crate::cmd_impl::meta::ReschedulePayload;
use crate::common::CtlContext;
use crate::ScaleResizeCommands;
use crate::{ScaleCommon, ScaleHorizonCommands, ScaleVerticalCommands};

macro_rules! fail {
($($arg:tt)*) => {{
Expand All @@ -36,8 +36,74 @@ macro_rules! fail {
}};
}

pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
impl From<ScaleHorizonCommands> for ScaleCommandContext {
fn from(value: ScaleHorizonCommands) -> Self {
let ScaleHorizonCommands {
exclude_workers,
include_workers,
target_parallelism,
common:
ScaleCommon {
generate,
output,
yes,
fragments,
},
} = value;

Self {
exclude_workers,
include_workers,
target_parallelism,
generate,
output,
yes,
fragments,
target_parallelism_per_worker: None,
}
}
}

impl From<ScaleVerticalCommands> for ScaleCommandContext {
fn from(value: ScaleVerticalCommands) -> Self {
let ScaleVerticalCommands {
workers,
target_parallelism_per_worker,
common:
ScaleCommon {
generate,
output,
yes,
fragments,
},
} = value;

Self {
exclude_workers: None,
include_workers: workers,
target_parallelism: None,
generate,
output,
yes,
fragments,
target_parallelism_per_worker,
}
}
}

pub struct ScaleCommandContext {
exclude_workers: Option<Vec<String>>,
include_workers: Option<Vec<String>>,
target_parallelism: Option<u32>,
generate: bool,
output: Option<String>,
yes: bool,
fragments: Option<Vec<u32>>,
target_parallelism_per_worker: Option<u32>,
}

pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> anyhow::Result<()> {
let meta_client = ctl_ctx.meta_client().await?;

let GetClusterInfoResponse {
worker_nodes,
Expand Down Expand Up @@ -116,7 +182,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
streaming_workers_index_by_id.len()
);

let ScaleResizeCommands {
let ScaleCommandContext {
exclude_workers,
include_workers,
target_parallelism,
Expand All @@ -125,7 +191,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
output,
yes,
fragments,
} = resize;
} = scale_ctx;

let worker_changes = {
let exclude_worker_ids =
Expand Down
59 changes: 42 additions & 17 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,8 @@ enum TableCommands {
List,
}

#[derive(clap::Args, Debug)]
#[clap(group(clap::ArgGroup::new("workers_group").required(true).multiple(true).args(&["include_workers", "exclude_workers", "target_parallelism"])))]
pub struct ScaleResizeCommands {
#[derive(clap::Args, Debug, Clone)]
pub struct ScaleHorizonCommands {
/// The worker that needs to be excluded during scheduling, worker_id and worker_host are both
/// supported
#[clap(
Expand All @@ -288,15 +287,12 @@ pub struct ScaleResizeCommands {
#[clap(long)]
target_parallelism: Option<u32>,

/// The target parallelism per worker, conflicts with `target_parallelism`, requires
/// `include_workers` to be set.
#[clap(
long,
requires = "include_workers",
conflicts_with = "target_parallelism"
)]
target_parallelism_per_worker: Option<u32>,
#[command(flatten)]
common: ScaleCommon,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ScaleCommon {
/// Will generate a plan supported by the `reschedule` command and save it to the provided path
/// by the `--output`.
#[clap(long, default_value_t = false)]
Expand All @@ -316,12 +312,37 @@ pub struct ScaleResizeCommands {
fragments: Option<Vec<u32>>,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ScaleVerticalCommands {
#[command(flatten)]
common: ScaleCommon,

/// The worker that needs to be scheduled, worker_id and worker_host are both
/// supported
#[clap(
long,
value_delimiter = ',',
value_name = "all or worker_id or worker_host, ..."
)]
workers: Option<Vec<String>>,

/// The target parallelism per worker, requires `workers` to be set.
#[clap(long, requires = "workers")]
target_parallelism_per_worker: Option<u32>,
}

#[derive(Subcommand, Debug)]
enum ScaleCommands {
/// The resize command scales the cluster by specifying the workers to be included and
/// excluded.
Resize(ScaleResizeCommands),
/// mark a compute node as unschedulable
/// Scale the compute nodes horizontally, alias of `horizon`
Resize(ScaleHorizonCommands),

/// Scale the compute nodes horizontally
Horizon(ScaleHorizonCommands),

/// Scale the compute nodes vertically
Vertical(ScaleVerticalCommands),

/// Mark a compute node as unschedulable
#[clap(verbatim_doc_comment)]
Cordon {
/// Workers that need to be cordoned, both id and host are supported.
Expand Down Expand Up @@ -616,8 +637,12 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Profile(ProfileCommands::Heap { dir }) => {
cmd_impl::profile::heap_profile(context, dir).await?
}
Commands::Scale(ScaleCommands::Resize(resize)) => {
cmd_impl::scale::resize(context, resize).await?
Commands::Scale(ScaleCommands::Horizon(resize))
| Commands::Scale(ScaleCommands::Resize(resize)) => {
cmd_impl::scale::resize(context, resize.into()).await?
}
Commands::Scale(ScaleCommands::Vertical(resize)) => {
cmd_impl::scale::resize(context, resize.into()).await?
}
Commands::Scale(ScaleCommands::Cordon { workers }) => {
cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
Expand Down