Skip to content

Commit

Permalink
Remove rescheduling RPCs, clean up scaling code
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jun 24, 2024
1 parent 81c1466 commit b08dbcc
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 1,033 deletions.
31 changes: 0 additions & 31 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -539,40 +539,9 @@ message TableParallelism {
}
}

message GetReschedulePlanRequest {
uint64 revision = 1;

message WorkerChanges {
repeated uint32 include_worker_ids = 1;
repeated uint32 exclude_worker_ids = 2;
optional uint32 target_parallelism = 3;
optional uint32 target_parallelism_per_worker = 4;
}

message StableResizePolicy {
map<uint32, WorkerChanges> fragment_worker_changes = 1;
}

oneof policy {
// The StableResizePolicy will generate a stable ReschedulePlan, without altering the distribution on WorkerId that's not involved.
// Note that this "Stable" doesn't refer to the "determinacy" of the algorithm.
// Multiple repeated calls may yield different ReschedulePlan results.
StableResizePolicy stable_resize_policy = 2;
}
}

message GetReschedulePlanResponse {
uint64 revision = 1;
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 2;
// todo, refactor needed
bool success = 3;
}

service ScaleService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
rpc GetReschedulePlan(GetReschedulePlanRequest) returns (GetReschedulePlanResponse);
}

message MembersRequest {}
Expand Down
8 changes: 1 addition & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,13 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.await?;
if worker.worker_type() == WorkerType::ComputeNode {
let pb_property = worker.worker_node.property.as_ref().unwrap();
// let parallel_unit_ids = worker
// .worker_node
// .parallel_units
// .iter()
// .map(|pu| pu.id as i32)
// .collect_vec();
let property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id() as _),
// parallel_unit_ids: Set(parallel_unit_ids.into()),
is_streaming: Set(pb_property.is_streaming),
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallel_unit_ids: Set(Default::default()),
parallelism: Set(worker.worker_node.parallelism as _),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
178 changes: 90 additions & 88 deletions src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
use std::collections::{HashMap, HashSet};
use std::process::exit;

use anyhow::{anyhow, Error, Result};
use anyhow::{anyhow, Result};
use inquire::Confirm;
use itertools::Itertools;
use regex::{Match, Regex};
use risingwave_meta::manager::WorkerId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::get_reschedule_plan_request::PbPolicy;
use risingwave_pb::meta::table_fragments::ActorStatus;
use risingwave_pb::meta::{GetClusterInfoResponse, GetReschedulePlanResponse, Reschedule};
use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand All @@ -34,16 +33,16 @@ pub struct ReschedulePayload {
pub reschedule_revision: u64,

#[serde(rename = "reschedule_plan")]
pub reschedule_plan: HashMap<u32, FragmentReschedulePlan>,
pub worker_reschedule_plan: HashMap<u32, WorkerReschedulePlan>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct FragmentReschedulePlan {
#[serde(rename = "added_parallel_units")]
pub added_parallel_units: Vec<u32>,
pub struct WorkerReschedulePlan {
#[serde(rename = "increased_actor_count")]
pub increased_actor_count: HashMap<WorkerId, usize>,

#[serde(rename = "removed_parallel_units")]
pub removed_parallel_units: Vec<u32>,
#[serde(rename = "decreased_actor_count")]
pub decreased_actor_count: HashMap<WorkerId, usize>,
}

#[derive(Debug)]
Expand All @@ -52,30 +51,42 @@ pub enum RescheduleInput {
FilePath(String),
}

impl From<FragmentReschedulePlan> for Reschedule {
fn from(value: FragmentReschedulePlan) -> Self {
let FragmentReschedulePlan {
added_parallel_units,
removed_parallel_units,
impl From<WorkerReschedulePlan> for PbWorkerReschedule {
fn from(value: WorkerReschedulePlan) -> Self {
let WorkerReschedulePlan {
increased_actor_count,
decreased_actor_count,
} = value;

Reschedule {
added_parallel_units,
removed_parallel_units,
PbWorkerReschedule {
increased_actor_count: increased_actor_count
.into_iter()
.map(|(k, v)| (k as _, v as _))
.collect(),
decreased_actor_count: decreased_actor_count
.into_iter()
.map(|(k, v)| (k as _, v as _))
.collect(),
}
}
}

impl From<Reschedule> for FragmentReschedulePlan {
fn from(value: Reschedule) -> Self {
let Reschedule {
added_parallel_units,
removed_parallel_units,
impl From<PbWorkerReschedule> for WorkerReschedulePlan {
fn from(value: PbWorkerReschedule) -> Self {
let PbWorkerReschedule {
increased_actor_count,
decreased_actor_count,
} = value;

FragmentReschedulePlan {
added_parallel_units,
removed_parallel_units,
WorkerReschedulePlan {
increased_actor_count: increased_actor_count
.into_iter()
.map(|(k, v)| (k as _, v as _))
.collect(),
decreased_actor_count: decreased_actor_count
.into_iter()
.map(|(k, v)| (k as _, v as _))
.collect(),
}
}
}
Expand Down Expand Up @@ -128,13 +139,13 @@ pub async fn reschedule(
let file = std::fs::File::open(path)?;
let ReschedulePayload {
reschedule_revision,
reschedule_plan,
worker_reschedule_plan,
} = serde_yaml::from_reader(file)?;
(
reschedule_plan
worker_reschedule_plan
.into_iter()
.map(|(fragment_id, fragment_reschedule_plan)| {
(fragment_id, fragment_reschedule_plan.into())
.map(|(fragment_id, worker_reschedule_plan)| {
(fragment_id, worker_reschedule_plan.into())
})
.collect(),
reschedule_revision,
Expand All @@ -145,12 +156,12 @@ pub async fn reschedule(

for (fragment_id, reschedule) in &reschedules {
println!("For fragment #{}", fragment_id);
if !reschedule.removed_parallel_units.is_empty() {
println!("\tRemove: {:?}", reschedule.removed_parallel_units);
if !reschedule.decreased_actor_count.is_empty() {
println!("\tRemove: {:?}", reschedule.decreased_actor_count);
}

if !reschedule.added_parallel_units.is_empty() {
println!("\tAdd: {:?}", reschedule.added_parallel_units);
if !reschedule.increased_actor_count.is_empty() {
println!("\tAdd: {:?}", reschedule.increased_actor_count);
}

println!();
Expand All @@ -177,64 +188,55 @@ pub async fn reschedule(
Ok(())
}

fn parse_plan(mut plan: String) -> Result<HashMap<u32, Reschedule>, Error> {
let mut reschedules = HashMap::new();
fn parse_plan(plan: String) -> Result<HashMap<u32, PbWorkerReschedule>> {
let reschedules = HashMap::new();

// let regex = Regex::new(RESCHEDULE_MATCH_REGEXP)?;
//
// plan.retain(|c| !c.is_whitespace());
//
// for fragment_reschedule_plan in plan.split(';') {
// let captures = regex
// .captures(fragment_reschedule_plan)
// .ok_or_else(|| anyhow!("plan \"{}\" format illegal", fragment_reschedule_plan))?;
//
// let fragment_id = captures
// .name(RESCHEDULE_FRAGMENT_KEY)
// .and_then(|mat| mat.as_str().parse::<u32>().ok())
// .ok_or_else(|| anyhow!("plan \"{}\" does not have a valid fragment id", plan))?;
//
// let split_fn = |mat: Match<'_>| {
// mat.as_str()
// .split(',')
// .map(|id_str| id_str.parse::<u32>().map_err(Error::msg))
// .collect::<Result<Vec<_>>>()
// };
//
// let removed_parallel_units = captures
// .name(RESCHEDULE_REMOVED_KEY)
// .map(split_fn)
// .transpose()?
// .unwrap_or_default();
// let added_parallel_units = captures
// .name(RESCHEDULE_ADDED_KEY)
// .map(split_fn)
// .transpose()?
// .unwrap_or_default();
//
// if !(removed_parallel_units.is_empty() && added_parallel_units.is_empty()) {
// reschedules.insert(
// fragment_id,
// Reschedule {
// added_parallel_units,
// removed_parallel_units,
// },
// );
// }
// }

let regex = Regex::new(RESCHEDULE_MATCH_REGEXP)?;

plan.retain(|c| !c.is_whitespace());

for fragment_reschedule_plan in plan.split(';') {
let captures = regex
.captures(fragment_reschedule_plan)
.ok_or_else(|| anyhow!("plan \"{}\" format illegal", fragment_reschedule_plan))?;

let fragment_id = captures
.name(RESCHEDULE_FRAGMENT_KEY)
.and_then(|mat| mat.as_str().parse::<u32>().ok())
.ok_or_else(|| anyhow!("plan \"{}\" does not have a valid fragment id", plan))?;

let split_fn = |mat: Match<'_>| {
mat.as_str()
.split(',')
.map(|id_str| id_str.parse::<u32>().map_err(Error::msg))
.collect::<Result<Vec<_>>>()
};

let removed_parallel_units = captures
.name(RESCHEDULE_REMOVED_KEY)
.map(split_fn)
.transpose()?
.unwrap_or_default();
let added_parallel_units = captures
.name(RESCHEDULE_ADDED_KEY)
.map(split_fn)
.transpose()?
.unwrap_or_default();

if !(removed_parallel_units.is_empty() && added_parallel_units.is_empty()) {
reschedules.insert(
fragment_id,
Reschedule {
added_parallel_units,
removed_parallel_units,
},
);
}
}
Ok(reschedules)
}

pub async fn get_reschedule_plan(
context: &CtlContext,
policy: PbPolicy,
revision: u64,
) -> Result<GetReschedulePlanResponse> {
let meta_client = context.meta_client().await?;
let response = meta_client.get_reschedule_plan(policy, revision).await?;
Ok(response)
}

pub async fn unregister_workers(
context: &CtlContext,
workers: Vec<String>,
Expand Down
Loading

0 comments on commit b08dbcc

Please sign in to comment.