Skip to content

Commit

Permalink
feat: Add exclusive mode to vertical scaling (#13697)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Nov 28, 2023
1 parent ff90f8c commit edfb9b9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
22 changes: 21 additions & 1 deletion src/ctl/src/cmd_impl/scale/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::ops::Sub;
use std::process::exit;

use inquire::Confirm;
Expand Down Expand Up @@ -60,6 +61,7 @@ impl From<ScaleHorizonCommands> for ScaleCommandContext {
yes,
fragments,
target_parallelism_per_worker: None,
exclusive_for_vertical: false,
}
}
}
Expand All @@ -76,6 +78,7 @@ impl From<ScaleVerticalCommands> for ScaleCommandContext {
yes,
fragments,
},
exclusive,
} = value;

Self {
Expand All @@ -87,6 +90,7 @@ impl From<ScaleVerticalCommands> for ScaleCommandContext {
yes,
fragments,
target_parallelism_per_worker,
exclusive_for_vertical: exclusive,
}
}
}
Expand All @@ -100,6 +104,7 @@ pub struct ScaleCommandContext {
yes: bool,
fragments: Option<Vec<u32>>,
target_parallelism_per_worker: Option<u32>,
exclusive_for_vertical: bool,
}

pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> anyhow::Result<()> {
Expand Down Expand Up @@ -191,10 +196,11 @@ pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> any
output,
yes,
fragments,
exclusive_for_vertical,
} = scale_ctx;

let worker_changes = {
let exclude_worker_ids =
let mut exclude_worker_ids =
worker_input_to_worker_ids(exclude_workers.unwrap_or_default(), false);
let include_worker_ids =
worker_input_to_worker_ids(include_workers.unwrap_or_default(), true);
Expand Down Expand Up @@ -231,6 +237,20 @@ pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> any
}
}

if exclusive_for_vertical {
let all_worker_ids: HashSet<_> =
streaming_workers_index_by_id.keys().cloned().collect();

let include_worker_id_set: HashSet<_> = include_worker_ids.iter().cloned().collect();
let generated_exclude_worker_ids = all_worker_ids.sub(&include_worker_id_set);

exclude_worker_ids = exclude_worker_ids
.into_iter()
.chain(generated_exclude_worker_ids)
.unique()
.collect();
}

WorkerChanges {
include_worker_ids,
exclude_worker_ids,
Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ pub struct ScaleVerticalCommands {
/// The target parallelism per worker, requires `workers` to be set.
#[clap(long, required = true)]
target_parallelism_per_worker: Option<u32>,

/// It will exclude all other workers to maintain the target parallelism only for the target workers.
#[clap(long, default_value_t = false)]
exclusive: bool,
}

#[derive(Subcommand, Debug)]
Expand Down

0 comments on commit edfb9b9

Please sign in to comment.