Skip to content

Commit

Permalink
Update proto schemas, refactor Rust code & adjust tests for worker ID…
Browse files Browse the repository at this point in the history
… usage and debug traits
  • Loading branch information
shanicky committed Jun 21, 2024
1 parent b6f4b47 commit 8da630f
Show file tree
Hide file tree
Showing 32 changed files with 1,079 additions and 1,540 deletions.
8 changes: 6 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ message WorkerNode {
WorkerType type = 2;
HostAddress host = 3;
State state = 4;
// TODO #8940 `parallel_units` should be moved into `Property`
repeated ParallelUnit parallel_units = 5;

reserved 5;
reserved "parallel_units";

Property property = 6;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
Expand All @@ -75,6 +77,8 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
4 changes: 3 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ message ListActorStatesResponse {
message ActorState {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 parallel_unit_id = 3;
reserved 3;
reserved "parallel_unit_id";
TableFragments.ActorStatus.ActorState state = 4;
uint32 worker_id = 5;
}
repeated ActorState states = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallel_units.len()))
(0..(worker.parallelism as usize))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl WorkerNodeManager {
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallel_units.len())
(0..worker.parallelism as usize)
.map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();
Expand Down Expand Up @@ -337,7 +337,7 @@ impl WorkerNodeSelector {
};
worker_nodes
.iter()
.map(|node| node.parallel_units.len())
.map(|node| node.parallelism as usize)
.sum()
}

Expand Down Expand Up @@ -424,7 +424,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand All @@ -438,7 +438,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand Down
8 changes: 7 additions & 1 deletion src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::util::iter_util::ZipEqDebug;
// TODO: find a better place for this.
pub type ActorId = u32;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct WorkerSlotId(u64);

impl WorkerSlotId {
Expand Down Expand Up @@ -68,6 +68,12 @@ impl Display for WorkerSlotId {
}
}

impl Debug for WorkerSlotId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
}
}

/// Trait for items that can be used as keys in [`VnodeMapping`].
pub trait VnodeMappingItem {
/// The type of the item.
Expand Down
34 changes: 9 additions & 25 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn place_vnode(
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlotId::new(w.id, idx)))
.map(|w| (0..w.parallelism as usize).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of worker slots, specified
Expand Down Expand Up @@ -198,42 +198,23 @@ pub fn place_vnode(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::common::WorkerNode;

use crate::hash::{ParallelUnitId, VirtualNode};
use crate::hash::{VirtualNode};
use crate::vnode_mapping::vnode_placement::place_vnode;
#[test]
fn test_place_vnode() {
assert_eq!(VirtualNode::COUNT, 256);

let mut pu_id_counter: ParallelUnitId = 0;
let mut pu_to_worker: HashMap<ParallelUnitId, u32> = Default::default();
let serving_property = Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
};

let mut gen_pus_for_worker =
|worker_node_id: u32, number: u32, pu_to_worker: &mut HashMap<ParallelUnitId, u32>| {
let mut results = vec![];
for i in 0..number {
results.push(ParallelUnit {
id: pu_id_counter + i,
worker_node_id,
})
}
pu_id_counter += number;
for pu in &results {
pu_to_worker.insert(pu.id, pu.worker_node_id);
}
results
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
assert_eq!(wm1.len(), 256);
assert_eq!(wm2.len(), 256);
Expand All @@ -249,7 +230,8 @@ mod tests {

let worker_1 = WorkerNode {
id: 1,
parallel_units: gen_pus_for_worker(1, 1, &mut pu_to_worker),
// parallel_units: gen_pus_for_worker(1, 1, &mut pu_to_worker),
parallelism: 1,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -264,7 +246,8 @@ mod tests {

let worker_2 = WorkerNode {
id: 2,
parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker),
// parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker),
parallelism: 50,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -283,7 +266,8 @@ mod tests {

let worker_3 = WorkerNode {
id: 3,
parallel_units: gen_pus_for_worker(3, 60, &mut pu_to_worker),
// parallel_units: gen_pus_for_worker(3, 60, &mut pu_to_worker),
parallelism: 60,
property: Some(serving_property),
..Default::default()
};
Expand Down
47 changes: 21 additions & 26 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use comfy_table::{Attribute, Cell, Row, Table};
use itertools::Itertools;
Expand Down Expand Up @@ -88,40 +88,33 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
revision,
} = get_cluster_info(context).await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
// Fragment ID -> [Worker ID -> [Actor ID]]
let mut fragments = BTreeMap::new();
// Fragment ID -> Table Fragments' State
let mut fragment_states = HashMap::new();

for table_fragment in &table_fragments {
for (&id, fragment) in &table_fragment.fragments {
for actor in &fragment.actors {
let parallel_unit = table_fragment
let worker_id = table_fragment
.actor_status
.get(&actor.actor_id)
.unwrap()
.get_parallel_unit()
.unwrap();
.unwrap()
.get_worker_node_id();

fragments
.entry(id)
.or_insert_with(HashMap::new)
.insert(parallel_unit.id, (parallel_unit, actor));
.or_insert_with(BTreeMap::new)
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor.actor_id);
}
fragment_states.insert(id, table_fragment.state());
}
}

// Parallel Unit ID -> Worker Node
let all_parallel_units: BTreeMap<_, _> = worker_nodes
.iter()
.flat_map(|worker_node| {
worker_node
.parallel_units
.iter()
.map(|parallel_unit| (parallel_unit.id, worker_node.clone()))
})
.collect();

let mut table = Table::new();

let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
Expand All @@ -132,11 +125,10 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
}
};

// Compute Node, Parallel Unit, Frag 1, Frag 2, ..., Frag N
// Compute Node, Frag 1, Frag 2, ..., Frag N
table.set_header({
let mut row = Row::new();
row.add_cell("Compute Node".into());
row.add_cell("Parallel Unit".into());
for &fid in fragments.keys() {
let cell = Cell::new(format!("Frag {fid}"));
let cell = cross_out_if_creating(cell, fid);
Expand All @@ -146,8 +138,8 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
});

let mut last_worker_id = None;
for (pu, worker) in all_parallel_units {
// Compute Node, Parallel Unit, Actor 1, Actor 11, -, ..., Actor N
for worker in worker_nodes {
// Compute Node, Actor 1, Actor 11, -, ..., Actor N
let mut row = Row::new();
row.add_cell(if last_worker_id == Some(worker.id) {
"".into()
Expand All @@ -166,14 +158,17 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
))
.add_attribute(Attribute::Bold)
});
row.add_cell(pu.into());
for (&fid, f) in &fragments {
let cell = if let Some((_pu, actor)) = f.get(&pu) {
actor.actor_id.into()
for (&fragment_id, worker_actors) in &fragments {
let cell = if let Some(actors) = worker_actors.get(&worker.id) {
actors
.iter()
.map(|actor| format!("{}", actor))
.join(",")
.into()
} else {
"-".into()
};
let cell = cross_out_if_creating(cell, fid);
let cell = cross_out_if_creating(cell, fragment_id);
row.add_cell(cell);
}
table.add_row(row);
Expand Down
15 changes: 8 additions & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,19 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.await?;
if worker.worker_type() == WorkerType::ComputeNode {
let pb_property = worker.worker_node.property.as_ref().unwrap();
let parallel_unit_ids = worker
.worker_node
.parallel_units
.iter()
.map(|pu| pu.id as i32)
.collect_vec();
// let parallel_unit_ids = worker
// .worker_node
// .parallel_units
// .iter()
// .map(|pu| pu.id as i32)
// .collect_vec();
let property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id() as _),
parallel_unit_ids: Set(parallel_unit_ids.into()),
// parallel_unit_ids: Set(parallel_unit_ids.into()),
is_streaming: Set(pb_property.is_streaming),
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallel_unit_ids: Set(Default::default()),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct RwActor {
#[primary_key]
actor_id: i32,
fragment_id: i32,
parallel_unit_id: i32,
worker_id: i32,
state: String,
}

Expand All @@ -36,7 +36,7 @@ async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActor>> {
.map(|state| RwActor {
actor_id: state.actor_id as i32,
fragment_id: state.fragment_id as i32,
parallel_unit_id: state.parallel_unit_id as i32,
worker_id: state.worker_id as i32,
state: state.state().as_str_name().into(),
})
.collect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@ use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
struct RwParallelUnit {
struct RwWorkerSlot {
#[primary_key]
slot_id: i32,
#[primary_key]
id: i32,
worker_id: i32,
}

#[system_catalog(table, "rw_catalog.rw_parallel_units")]
fn read_rw_parallel_units(reader: &SysCatalogReaderImpl) -> Result<Vec<RwParallelUnit>> {
fn read_rw_parallel_units(reader: &SysCatalogReaderImpl) -> Result<Vec<RwWorkerSlot>> {
let workers = reader.worker_node_manager.list_worker_nodes();

Ok(workers
.into_iter()
.flat_map(|worker| {
worker
.parallel_units
.into_iter()
.map(move |unit| RwParallelUnit {
id: unit.id as i32,
worker_id: worker.id as i32,
})
(0..worker.parallelism).map(move |slot_id| RwWorkerSlot {
slot_id: slot_id as _,
worker_id: worker.id as _,
})
})
.collect())
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
port: host.map(|h| h.port.to_string()),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().into(),
parallelism: worker.parallel_units.len() as i32,
parallelism: worker.parallelism as i32,
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ struct ShowClusterRow {
addr: String,
r#type: String,
state: String,
parallel_units: String,
// parallel_units: String,
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
Expand Down Expand Up @@ -435,7 +435,7 @@ pub async fn handle_show_object(
addr: addr.to_string(),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().to_string(),
parallel_units: worker.parallel_units.into_iter().map(|pu| pu.id).join(", "),
// parallel_units: worker.parallel_units.into_iter().map(|pu| pu.id).join(", "),
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
Loading

0 comments on commit 8da630f

Please sign in to comment.