Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: deprecate get_reschedule_plan and clean up code #17584

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -529,40 +529,9 @@ message TableParallelism {
}
}

message GetReschedulePlanRequest {
uint64 revision = 1;

message WorkerChanges {
repeated uint32 include_worker_ids = 1;
repeated uint32 exclude_worker_ids = 2;
optional uint32 target_parallelism = 3;
optional uint32 target_parallelism_per_worker = 4;
}

message StableResizePolicy {
map<uint32, WorkerChanges> fragment_worker_changes = 1;
}

oneof policy {
// The StableResizePolicy will generate a stable ReschedulePlan, without altering the distribution on WorkerId that's not involved.
// Note that this "Stable" doesn't refer to the "determinacy" of the algorithm.
// Multiple repeated calls may yield different ReschedulePlan results.
StableResizePolicy stable_resize_policy = 2;
}
}

message GetReschedulePlanResponse {
uint64 revision = 1;
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 2;
// todo, refactor needed
bool success = 3;
}

service ScaleService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
rpc GetReschedulePlan(GetReschedulePlanRequest) returns (GetReschedulePlanResponse);
}

message MembersRequest {}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallel_units.len()))
(0..(worker.parallelism()))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();
Expand Down
8 changes: 2 additions & 6 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ impl WorkerNodeManager {
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallel_units.len())
.map(move |i| (WorkerSlotId::new(worker.id, i), worker))
(0..worker.parallelism()).map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();

Expand Down Expand Up @@ -335,10 +334,7 @@ impl WorkerNodeSelector {
} else {
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.iter()
.map(|node| node.parallel_units.len())
.sum()
worker_nodes.iter().map(|node| node.parallelism()).sum()
}

pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerSlotMapping> {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn place_vnode(
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlotId::new(w.id, idx)))
.map(|w| (0..w.parallelism()).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of worker slots, specified
Expand Down
13 changes: 1 addition & 12 deletions src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use inquire::Confirm;
use itertools::Itertools;
use regex::{Match, Regex};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::get_reschedule_plan_request::PbPolicy;
use risingwave_pb::meta::table_fragments::ActorStatus;
use risingwave_pb::meta::{GetClusterInfoResponse, GetReschedulePlanResponse, Reschedule};
use risingwave_pb::meta::{GetClusterInfoResponse, Reschedule};
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -225,16 +224,6 @@ fn parse_plan(mut plan: String) -> Result<HashMap<u32, Reschedule>, Error> {
Ok(reschedules)
}

pub async fn get_reschedule_plan(
context: &CtlContext,
policy: PbPolicy,
revision: u64,
) -> Result<GetReschedulePlanResponse> {
let meta_client = context.meta_client().await?;
let response = meta_client.get_reschedule_plan(policy, revision).await?;
Ok(response)
}

pub async fn unregister_workers(
context: &CtlContext,
workers: Vec<String>,
Expand Down
Loading
Loading