Skip to content

Commit

Permalink
Refactor parallelism handling: Shift from parallel units to paralleli…
Browse files Browse the repository at this point in the history
…sm fields & simplify logic
  • Loading branch information
shanicky committed Jul 1, 2024
1 parent 3795168 commit a48ef13
Show file tree
Hide file tree
Showing 61 changed files with 2,076 additions and 2,907 deletions.
8 changes: 6 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ message WorkerNode {
WorkerType type = 2;
HostAddress host = 3;
State state = 4;
// TODO #8940 `parallel_units` should be moved into `Property`
repeated ParallelUnit parallel_units = 5;

reserved 5;
reserved "parallel_units";

Property property = 6;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
Expand All @@ -75,6 +77,8 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
55 changes: 16 additions & 39 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ message TableFragments {
repeated stream_plan.StreamActor actors = 4;
// Vnode mapping (which should be set in upstream dispatcher) of the fragment.
// This field is always set to `Some`. For singleton, the parallel unit for all vnodes will be the same.
common.ParallelUnitMapping vnode_mapping = 5;
reserved 5;
reserved "vnode_mapping";

repeated uint32 state_table_ids = 6;
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
// but in some scenarios (e.g. Scaling) it will lead to a lot of duplicate code,
Expand Down Expand Up @@ -129,7 +131,10 @@ message ActorLocation {

message MigrationPlan {
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 1;
reserved 1;
reserved "parallel_unit_migration_plan";

map<uint64, uint64> worker_slot_migration_plan = 2;
}

message FlushRequest {
Expand Down Expand Up @@ -244,8 +249,10 @@ message ListActorStatesResponse {
message ActorState {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 parallel_unit_id = 3;
reserved 3;
reserved "parallel_unit_id";
TableFragments.ActorStatus.ActorState state = 4;
uint32 worker_id = 5;
}
repeated ActorState states = 1;
}
Expand Down Expand Up @@ -492,16 +499,17 @@ message GetClusterInfoResponse {
uint64 revision = 5;
}

message Reschedule {
repeated uint32 added_parallel_units = 1;
repeated uint32 removed_parallel_units = 2;
message WorkerReschedule {
map<uint32, uint32> increased_actor_count = 1;
map<uint32, uint32> decreased_actor_count = 2;
}

message RescheduleRequest {
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 1;
reserved "reschedules";
reserved 1;
uint64 revision = 2;
bool resolve_no_shuffle_upstream = 3;
map<uint32, WorkerReschedule> worker_reschedules = 4;
}

message RescheduleResponse {
Expand Down Expand Up @@ -529,40 +537,9 @@ message TableParallelism {
}
}

message GetReschedulePlanRequest {
uint64 revision = 1;

message WorkerChanges {
repeated uint32 include_worker_ids = 1;
repeated uint32 exclude_worker_ids = 2;
optional uint32 target_parallelism = 3;
optional uint32 target_parallelism_per_worker = 4;
}

message StableResizePolicy {
map<uint32, WorkerChanges> fragment_worker_changes = 1;
}

oneof policy {
// The StableResizePolicy will generate a stable ReschedulePlan, without altering the distribution on WorkerId that's not involved.
// Note that this "Stable" doesn't refer to the "determinacy" of the algorithm.
// Multiple repeated calls may yield different ReschedulePlan results.
StableResizePolicy stable_resize_policy = 2;
}
}

message GetReschedulePlanResponse {
uint64 revision = 1;
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 2;
// todo, refactor needed
bool success = 3;
}

service ScaleService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
rpc GetReschedulePlan(GetReschedulePlanRequest) returns (GetReschedulePlanResponse);
}

message MembersRequest {}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallel_units.len()))
(0..(worker.parallelism as usize))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl WorkerNodeManager {
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallel_units.len())
(0..worker.parallelism as usize)
.map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();
Expand Down Expand Up @@ -337,7 +337,7 @@ impl WorkerNodeSelector {
};
worker_nodes
.iter()
.map(|node| node.parallel_units.len())
.map(|node| node.parallelism as usize)
.sum()
}

Expand Down Expand Up @@ -424,7 +424,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand All @@ -438,7 +438,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand Down
39 changes: 38 additions & 1 deletion src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::util::iter_util::ZipEqDebug;
// TODO: find a better place for this.
pub type ActorId = u32;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct WorkerSlotId(u64);

impl WorkerSlotId {
Expand Down Expand Up @@ -68,6 +68,12 @@ impl Display for WorkerSlotId {
}
}

impl Debug for WorkerSlotId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
}
}

/// Trait for items that can be used as keys in [`VnodeMapping`].
pub trait VnodeMappingItem {
/// The type of the item.
Expand Down Expand Up @@ -282,12 +288,14 @@ pub mod marker {

/// A marker type for items of [`ActorId`].
pub struct Actor;

impl VnodeMappingItem for Actor {
type Item = ActorId;
}

/// A marker type for items of [`ParallelUnitId`].
pub struct ParallelUnit;

impl VnodeMappingItem for ParallelUnit {
type Item = ParallelUnitId;
}
Expand Down Expand Up @@ -323,6 +331,26 @@ impl ActorMapping {
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
for (actor_id, worker_id) in actor_to_worker {
worker_actors
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor_id);
}

let mut actor_location = HashMap::new();
for (worker, actors) in worker_actors {
for (idx, &actor) in actors.iter().enumerate() {
actor_location.insert(*actor, WorkerSlotId::new(*worker, idx));
}
}

self.transform(&actor_location)
}

/// Create an actor mapping from the protobuf representation.
pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Expand Down Expand Up @@ -441,6 +469,13 @@ impl ParallelUnitMapping {
}
}

impl WorkerSlotMapping {
/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
self.transform(to_map)
}
}

#[cfg(test)]
mod tests {
use std::iter::repeat_with;
Expand All @@ -450,11 +485,13 @@ mod tests {
use super::*;

struct Test;

impl VnodeMappingItem for Test {
type Item = u32;
}

struct Test2;

impl VnodeMappingItem for Test2 {
type Item = u32;
}
Expand Down
31 changes: 6 additions & 25 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn place_vnode(
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlotId::new(w.id, idx)))
.map(|w| (0..w.parallelism as usize).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of worker slots, specified
Expand Down Expand Up @@ -198,42 +198,23 @@ pub fn place_vnode(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::common::WorkerNode;

use crate::hash::{ParallelUnitId, VirtualNode};
use crate::hash::VirtualNode;
use crate::vnode_mapping::vnode_placement::place_vnode;
#[test]
fn test_place_vnode() {
assert_eq!(VirtualNode::COUNT, 256);

let mut pu_id_counter: ParallelUnitId = 0;
let mut pu_to_worker: HashMap<ParallelUnitId, u32> = Default::default();
let serving_property = Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
};

let mut gen_pus_for_worker =
|worker_node_id: u32, number: u32, pu_to_worker: &mut HashMap<ParallelUnitId, u32>| {
let mut results = vec![];
for i in 0..number {
results.push(ParallelUnit {
id: pu_id_counter + i,
worker_node_id,
})
}
pu_id_counter += number;
for pu in &results {
pu_to_worker.insert(pu.id, pu.worker_node_id);
}
results
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
assert_eq!(wm1.len(), 256);
assert_eq!(wm2.len(), 256);
Expand All @@ -249,7 +230,7 @@ mod tests {

let worker_1 = WorkerNode {
id: 1,
parallel_units: gen_pus_for_worker(1, 1, &mut pu_to_worker),
parallelism: 1,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -264,7 +245,7 @@ mod tests {

let worker_2 = WorkerNode {
id: 2,
parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker),
parallelism: 50,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -283,7 +264,7 @@ mod tests {

let worker_3 = WorkerNode {
id: 3,
parallel_units: gen_pus_for_worker(3, 60, &mut pu_to_worker),
parallelism: 60,
property: Some(serving_property),
..Default::default()
};
Expand Down
Loading

0 comments on commit a48ef13

Please sign in to comment.