Skip to content

Commit

Permalink
feat(meta): deprecate parallel unit (#17523)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Jul 23, 2024
1 parent 40e5629 commit 007e802
Show file tree
Hide file tree
Showing 56 changed files with 2,241 additions and 1,726 deletions.
8 changes: 6 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ message WorkerNode {
WorkerType type = 2;
HostAddress host = 3;
State state = 4;
// TODO #8940 `parallel_units` should be moved into `Property`
repeated ParallelUnit parallel_units = 5;

reserved 5;
reserved "parallel_units";

Property property = 6;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
Expand All @@ -75,6 +77,8 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
30 changes: 22 additions & 8 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ message TableFragments {
uint32 fragment_type_mask = 2;
FragmentDistributionType distribution_type = 3;
repeated stream_plan.StreamActor actors = 4;

// NOTE: vnode_mapping is deprecated, we will generate the vnode_mapping by actors' bitmaps
// Vnode mapping (which should be set in upstream dispatcher) of the fragment.
// This field is always set to `Some`. For singleton, the parallel unit for all vnodes will be the same.
common.ParallelUnitMapping vnode_mapping = 5;
reserved 5;
reserved "vnode_mapping";

repeated uint32 state_table_ids = 6;
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
// but in some scenarios (e.g. Scaling) it will lead to a lot of duplicate code,
Expand Down Expand Up @@ -128,8 +132,13 @@ message ActorLocation {
}

message MigrationPlan {
// NOTE: parallel_unit_migration_plan is deprecated, using worker_slot_migration_plan instead
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 1;
reserved 1;
reserved "parallel_unit_migration_plan";

// map<src_worker_slot_id, dst_worker_slot_id>, the plan indicates that the actors will be migrated from old worker_slot to the new one.
map<uint64, uint64> worker_slot_migration_plan = 2;
}

message FlushRequest {
Expand Down Expand Up @@ -244,8 +253,10 @@ message ListActorStatesResponse {
message ActorState {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 parallel_unit_id = 3;
reserved 3;
reserved "parallel_unit_id";
TableFragments.ActorStatus.ActorState state = 4;
uint32 worker_id = 5;
}
repeated ActorState states = 1;
}
Expand Down Expand Up @@ -507,16 +518,19 @@ message GetClusterInfoResponse {
uint64 revision = 5;
}

message Reschedule {
repeated uint32 added_parallel_units = 1;
repeated uint32 removed_parallel_units = 2;
// For each fragment that needs to be rescheduled, there will be a WorkerReschedule,
// indicating on which workers the actors of this fragment need to be changed and by how many.
message WorkerReschedule {
// worker_id -> actor_diff
map<uint32, int32> worker_actor_diff = 1;
}

message RescheduleRequest {
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 1;
reserved "reschedules";
reserved 1;
uint64 revision = 2;
bool resolve_no_shuffle_upstream = 3;
map<uint32, WorkerReschedule> worker_reschedules = 4;
}

message RescheduleResponse {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand All @@ -439,7 +439,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand Down
32 changes: 32 additions & 0 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,31 @@ impl ActorMapping {
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
for actor_id in self.iter_unique() {
let worker_id = actor_to_worker
.get(&actor_id)
.cloned()
.unwrap_or_else(|| panic!("location for actor {} not found", actor_id));

worker_actors
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor_id);
}

let mut actor_location = HashMap::new();
for (worker, actors) in worker_actors {
for (idx, &actor) in actors.iter().enumerate() {
actor_location.insert(actor, WorkerSlotId::new(worker, idx));
}
}

self.transform(&actor_location)
}

/// Create an actor mapping from the protobuf representation.
pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Expand Down Expand Up @@ -447,6 +472,13 @@ impl ParallelUnitMapping {
}
}

impl WorkerSlotMapping {
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
self.transform(to_map)
}
}

#[cfg(test)]
mod tests {
use std::iter::repeat_with;
Expand Down
29 changes: 5 additions & 24 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,42 +198,23 @@ pub fn place_vnode(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::common::WorkerNode;

use crate::hash::{ParallelUnitId, VirtualNode};
use crate::hash::VirtualNode;
use crate::vnode_mapping::vnode_placement::place_vnode;
#[test]
fn test_place_vnode() {
assert_eq!(VirtualNode::COUNT, 256);

let mut pu_id_counter: ParallelUnitId = 0;
let mut pu_to_worker: HashMap<ParallelUnitId, u32> = Default::default();
let serving_property = Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
};

let mut gen_pus_for_worker =
|worker_node_id: u32, number: u32, pu_to_worker: &mut HashMap<ParallelUnitId, u32>| {
let mut results = vec![];
for i in 0..number {
results.push(ParallelUnit {
id: pu_id_counter + i,
worker_node_id,
})
}
pu_id_counter += number;
for pu in &results {
pu_to_worker.insert(pu.id, pu.worker_node_id);
}
results
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
assert_eq!(wm1.len(), 256);
assert_eq!(wm2.len(), 256);
Expand All @@ -249,7 +230,7 @@ mod tests {

let worker_1 = WorkerNode {
id: 1,
parallel_units: gen_pus_for_worker(1, 1, &mut pu_to_worker),
parallelism: 1,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -264,7 +245,7 @@ mod tests {

let worker_2 = WorkerNode {
id: 2,
parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker),
parallelism: 50,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -283,7 +264,7 @@ mod tests {

let worker_3 = WorkerNode {
id: 3,
parallel_units: gen_pus_for_worker(3, 60, &mut pu_to_worker),
parallelism: 60,
property: Some(serving_property),
..Default::default()
};
Expand Down
47 changes: 21 additions & 26 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use comfy_table::{Attribute, Cell, Row, Table};
use itertools::Itertools;
Expand Down Expand Up @@ -88,40 +88,33 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
revision,
} = get_cluster_info(context).await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
// Fragment ID -> [Worker ID -> [Actor ID]]
let mut fragments = BTreeMap::new();
// Fragment ID -> Table Fragments' State
let mut fragment_states = HashMap::new();

for table_fragment in &table_fragments {
for (&id, fragment) in &table_fragment.fragments {
for actor in &fragment.actors {
let parallel_unit = table_fragment
let worker_id = table_fragment
.actor_status
.get(&actor.actor_id)
.unwrap()
.get_parallel_unit()
.unwrap();
.unwrap()
.get_worker_node_id();

fragments
.entry(id)
.or_insert_with(HashMap::new)
.insert(parallel_unit.id, (parallel_unit, actor));
.or_insert_with(BTreeMap::new)
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor.actor_id);
}
fragment_states.insert(id, table_fragment.state());
}
}

// Parallel Unit ID -> Worker Node
let all_parallel_units: BTreeMap<_, _> = worker_nodes
.iter()
.flat_map(|worker_node| {
worker_node
.parallel_units
.iter()
.map(|parallel_unit| (parallel_unit.id, worker_node.clone()))
})
.collect();

let mut table = Table::new();

let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
Expand All @@ -132,11 +125,10 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
}
};

// Compute Node, Parallel Unit, Frag 1, Frag 2, ..., Frag N
// Compute Node, Frag 1, Frag 2, ..., Frag N
table.set_header({
let mut row = Row::new();
row.add_cell("Compute Node".into());
row.add_cell("Parallel Unit".into());
for &fid in fragments.keys() {
let cell = Cell::new(format!("Frag {fid}"));
let cell = cross_out_if_creating(cell, fid);
Expand All @@ -146,8 +138,8 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
});

let mut last_worker_id = None;
for (pu, worker) in all_parallel_units {
// Compute Node, Parallel Unit, Actor 1, Actor 11, -, ..., Actor N
for worker in worker_nodes {
// Compute Node, Actor 1, Actor 11, -, ..., Actor N
let mut row = Row::new();
row.add_cell(if last_worker_id == Some(worker.id) {
"".into()
Expand All @@ -166,14 +158,17 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
))
.add_attribute(Attribute::Bold)
});
row.add_cell(pu.into());
for (&fid, f) in &fragments {
let cell = if let Some((_pu, actor)) = f.get(&pu) {
actor.actor_id.into()
for (&fragment_id, worker_actors) in &fragments {
let cell = if let Some(actors) = worker_actors.get(&worker.id) {
actors
.iter()
.map(|actor| format!("{}", actor))
.join(",")
.into()
} else {
"-".into()
};
let cell = cross_out_if_creating(cell, fid);
let cell = cross_out_if_creating(cell, fragment_id);
row.add_cell(cell);
}
table.add_row(row);
Expand Down
8 changes: 1 addition & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.await?;
if worker.worker_type() == WorkerType::ComputeNode {
let pb_property = worker.worker_node.property.as_ref().unwrap();
let parallel_unit_ids = worker
.worker_node
.parallel_units
.iter()
.map(|pu| pu.id as i32)
.collect_vec();
let property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id() as _),
parallel_unit_ids: Set(parallel_unit_ids.into()),
is_streaming: Set(pb_property.is_streaming),
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallelism: Set(worker.worker_node.parallelism() as _),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
Loading

0 comments on commit 007e802

Please sign in to comment.