Skip to content

Commit

Permalink
feat: modify the scale command to support both horizontal and vertica…
Browse files Browse the repository at this point in the history
…l scaling. (#12087) (#12091)
  • Loading branch information
github-actions[bot] authored Sep 6, 2023
1 parent 616edbd commit 4865ed6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 22 deletions.
76 changes: 71 additions & 5 deletions src/ctl/src/cmd_impl/scale/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde_yaml;

use crate::cmd_impl::meta::ReschedulePayload;
use crate::common::CtlContext;
use crate::ScaleResizeCommands;
use crate::{ScaleCommon, ScaleHorizonCommands, ScaleVerticalCommands};

macro_rules! fail {
($($arg:tt)*) => {{
Expand All @@ -36,8 +36,74 @@ macro_rules! fail {
}};
}

pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
impl From<ScaleHorizonCommands> for ScaleCommandContext {
fn from(value: ScaleHorizonCommands) -> Self {
let ScaleHorizonCommands {
exclude_workers,
include_workers,
target_parallelism,
common:
ScaleCommon {
generate,
output,
yes,
fragments,
},
} = value;

Self {
exclude_workers,
include_workers,
target_parallelism,
generate,
output,
yes,
fragments,
target_parallelism_per_worker: None,
}
}
}

impl From<ScaleVerticalCommands> for ScaleCommandContext {
fn from(value: ScaleVerticalCommands) -> Self {
let ScaleVerticalCommands {
workers,
target_parallelism_per_worker,
common:
ScaleCommon {
generate,
output,
yes,
fragments,
},
} = value;

Self {
exclude_workers: None,
include_workers: workers,
target_parallelism: None,
generate,
output,
yes,
fragments,
target_parallelism_per_worker,
}
}
}

pub struct ScaleCommandContext {
exclude_workers: Option<Vec<String>>,
include_workers: Option<Vec<String>>,
target_parallelism: Option<u32>,
generate: bool,
output: Option<String>,
yes: bool,
fragments: Option<Vec<u32>>,
target_parallelism_per_worker: Option<u32>,
}

pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> anyhow::Result<()> {
let meta_client = ctl_ctx.meta_client().await?;

let GetClusterInfoResponse {
worker_nodes,
Expand Down Expand Up @@ -116,7 +182,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
streaming_workers_index_by_id.len()
);

let ScaleResizeCommands {
let ScaleCommandContext {
exclude_workers,
include_workers,
target_parallelism,
Expand All @@ -125,7 +191,7 @@ pub async fn resize(context: &CtlContext, resize: ScaleResizeCommands) -> anyhow
output,
yes,
fragments,
} = resize;
} = scale_ctx;

let worker_changes = {
let exclude_worker_ids =
Expand Down
59 changes: 42 additions & 17 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,8 @@ enum TableCommands {
List,
}

#[derive(clap::Args, Debug)]
#[clap(group(clap::ArgGroup::new("workers_group").required(true).multiple(true).args(&["include_workers", "exclude_workers", "target_parallelism"])))]
pub struct ScaleResizeCommands {
#[derive(clap::Args, Debug, Clone)]
pub struct ScaleHorizonCommands {
/// The worker that needs to be excluded during scheduling, worker_id and worker_host are both
/// supported
#[clap(
Expand All @@ -288,15 +287,12 @@ pub struct ScaleResizeCommands {
#[clap(long)]
target_parallelism: Option<u32>,

/// The target parallelism per worker, conflicts with `target_parallelism`, requires
/// `include_workers` to be set.
#[clap(
long,
requires = "include_workers",
conflicts_with = "target_parallelism"
)]
target_parallelism_per_worker: Option<u32>,
#[command(flatten)]
common: ScaleCommon,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ScaleCommon {
/// Will generate a plan supported by the `reschedule` command and save it to the provided path
/// by the `--output`.
#[clap(long, default_value_t = false)]
Expand All @@ -316,12 +312,37 @@ pub struct ScaleResizeCommands {
fragments: Option<Vec<u32>>,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ScaleVerticalCommands {
#[command(flatten)]
common: ScaleCommon,

/// The worker that needs to be scheduled, worker_id and worker_host are both
/// supported
#[clap(
long,
value_delimiter = ',',
value_name = "all or worker_id or worker_host, ..."
)]
workers: Option<Vec<String>>,

/// The target parallelism per worker, requires `workers` to be set.
#[clap(long, requires = "workers")]
target_parallelism_per_worker: Option<u32>,
}

#[derive(Subcommand, Debug)]
enum ScaleCommands {
/// The resize command scales the cluster by specifying the workers to be included and
/// excluded.
Resize(ScaleResizeCommands),
/// mark a compute node as unschedulable
/// Scale the compute nodes horizontally, alias of `horizon`
Resize(ScaleHorizonCommands),

/// Scale the compute nodes horizontally
Horizon(ScaleHorizonCommands),

/// Scale the compute nodes vertically
Vertical(ScaleVerticalCommands),

/// Mark a compute node as unschedulable
#[clap(verbatim_doc_comment)]
Cordon {
/// Workers that need to be cordoned, both id and host are supported.
Expand Down Expand Up @@ -616,8 +637,12 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Profile(ProfileCommands::Heap { dir }) => {
cmd_impl::profile::heap_profile(context, dir).await?
}
Commands::Scale(ScaleCommands::Resize(resize)) => {
cmd_impl::scale::resize(context, resize).await?
Commands::Scale(ScaleCommands::Horizon(resize))
| Commands::Scale(ScaleCommands::Resize(resize)) => {
cmd_impl::scale::resize(context, resize.into()).await?
}
Commands::Scale(ScaleCommands::Vertical(resize)) => {
cmd_impl::scale::resize(context, resize.into()).await?
}
Commands::Scale(ScaleCommands::Cordon { workers }) => {
cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
Expand Down

0 comments on commit 4865ed6

Please sign in to comment.