From dbe51b5a8478f5cc1bfd4f34be1b247be1526263 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 5 Jul 2024 18:03:11 +0800 Subject: [PATCH] Refactor: Removed ParallelUnitId, simplified actor handling. Remove `ParallelUnit` and `parallel_unit` keywords. Removed `ParallelUnitMapping`, updated comments & fields for `WorkerSlotMapping`. Refactor Protobuf defs, update Rust actor status usage Signed-off-by: Shanicky Chen --- proto/common.proto | 13 +- proto/meta.proto | 14 +- .../src/hash/consistent_hash/mapping.rs | 107 +---------- src/common/src/hash/consistent_hash/vnode.rs | 4 - src/ctl/src/cmd_impl/meta/cluster_info.rs | 4 +- src/ctl/src/cmd_impl/meta/reschedule.rs | 4 +- src/ctl/src/lib.rs | 12 +- src/frontend/src/scheduler/local.rs | 2 +- src/meta/model_v2/src/lib.rs | 5 - src/meta/service/src/stream_service.rs | 2 +- src/meta/src/barrier/command.rs | 4 +- src/meta/src/controller/fragment.rs | 21 +-- src/meta/src/controller/streaming_job.rs | 10 +- src/meta/src/error.rs | 8 - src/meta/src/manager/catalog/fragment.rs | 49 ++--- src/meta/src/manager/cluster.rs | 4 +- src/meta/src/manager/id.rs | 6 - src/meta/src/model/stream.rs | 20 +- src/meta/src/rpc/metrics.rs | 16 +- src/meta/src/stream/scale.rs | 19 +- src/meta/src/stream/stream_graph/actor.rs | 2 +- src/meta/src/stream/test_scale.rs | 177 +++++------------- src/prost/src/lib.rs | 15 ++ src/tests/simulation/src/ctl_ext.rs | 11 +- 24 files changed, 132 insertions(+), 397 deletions(-) diff --git a/proto/common.proto b/proto/common.proto index 8b1d19d5f96a3..b34fcce42b47e 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -35,8 +35,11 @@ enum WorkerType { WORKER_TYPE_META = 5; } -message ParallelUnit { - uint32 id = 1; +// renamed from `ParallelUnit` +message ActorLocation { + // previous `id` field for parallel unit id is deprecated. + reserved "id"; + reserved 1; uint32 worker_node_id = 2; } @@ -90,12 +93,6 @@ message Buffer { bytes body = 2; } -// Vnode mapping for stream fragments. Stores mapping from virtual node to parallel unit id. -message ParallelUnitMapping { - repeated uint32 original_indices = 1; - repeated uint32 data = 2; -} - // Vnode mapping for stream fragments. Stores mapping from virtual node to (worker id, slot index). message WorkerSlotMapping { repeated uint32 original_indices = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 840572d874232..0192b18ec690c 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -65,8 +65,8 @@ message TableFragments { // Running normally RUNNING = 2; } - // Current on which parallel unit - common.ParallelUnit parallel_unit = 1; + // Current on which worker + common.ActorLocation location = 1; // Current state ActorState state = 2; } @@ -105,16 +105,6 @@ message TableFragments { TableParallelism parallelism = 7; } -/// Parallel unit mapping with fragment id, used for notification. -message FragmentParallelUnitMapping { - uint32 fragment_id = 1; - common.ParallelUnitMapping mapping = 2; -} - -message FragmentParallelUnitMappings { - repeated FragmentParallelUnitMapping mappings = 1; -} - /// Worker slot mapping with fragment id, used for notification. message FragmentWorkerSlotMapping { uint32 fragment_id = 1; diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index d78d902fe7efb..a462acb291853 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,21 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; -use std::ops::{Index, Sub}; +use std::ops::Index; use educe::Educe; use itertools::Itertools; -use risingwave_pb::common::{ - ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping, -}; +use risingwave_pb::common::PbWorkerSlotMapping; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; use super::bitmap::VnodeBitmapExt; -use super::vnode::{ParallelUnitId, VirtualNode}; use crate::bitmap::{Bitmap, BitmapBuilder}; +use crate::hash::VirtualNode; use crate::util::compress::compress_data; use crate::util::iter_util::ZipEqDebug; @@ -78,7 +76,7 @@ impl Debug for WorkerSlotId { pub trait VnodeMappingItem { /// The type of the item. /// - /// Currently, there are two types of items: [`ParallelUnitId`] and [`ActorId`]. We don't use + /// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use /// them directly as the generic parameter because they're the same type aliases. type Item: Copy + Ord + Hash + Debug; } @@ -292,12 +290,6 @@ pub mod marker { type Item = ActorId; } - /// A marker type for items of [`ParallelUnitId`]. - pub struct ParallelUnit; - impl VnodeMappingItem for ParallelUnit { - type Item = ParallelUnitId; - } - /// A marker type for items of [`WorkerSlotId`]. pub struct WorkerSlot; impl VnodeMappingItem for WorkerSlot { @@ -310,25 +302,12 @@ pub type ActorMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`ActorId`]. pub type ExpandedActorMapping = ExpandedMapping; -/// A mapping from [`VirtualNode`] to [`ParallelUnitId`]. -pub type ParallelUnitMapping = VnodeMapping; -/// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`]. -pub type ExpandedParallelUnitMapping = ExpandedMapping; - /// A mapping from [`VirtualNode`] to [`WorkerSlotId`]. pub type WorkerSlotMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`]. pub type ExpandedWorkerSlotMapping = ExpandedMapping; impl ActorMapping { - /// Transform this actor mapping to a parallel unit mapping, essentially `transform`. - pub fn to_parallel_unit(&self, to_map: &M) -> ParallelUnitMapping - where - M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>, - { - self.transform(to_map) - } - /// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker. pub fn to_worker_slot(&self, actor_to_worker: &HashMap) -> WorkerSlotMapping { let mut worker_actors = HashMap::new(); @@ -372,12 +351,6 @@ impl ActorMapping { } } -#[derive(thiserror::Error, Debug)] -pub enum ParallelUnitError { - #[error("parallel units {0:?} are not covered by the worker slot mapping")] - NotCovered(HashSet), -} - impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { @@ -402,76 +375,6 @@ impl WorkerSlotMapping { } } -impl ParallelUnitMapping { - /// Create a uniform parallel unit mapping from the given parallel units, essentially - /// `new_uniform`. - pub fn build(parallel_units: &[ParallelUnit]) -> Self { - Self::new_uniform(parallel_units.iter().map(|pu| pu.id)) - } - - /// Create a uniform parallel unit mapping from the given parallel units ids - pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self { - Self::new_uniform(parallel_unit_ids.iter().cloned()) - } - - /// Transform this parallel unit mapping to an actor mapping, essentially `transform`. - pub fn to_actor(&self, to_map: &HashMap) -> ActorMapping { - self.transform(to_map) - } - - /// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`. - pub fn to_worker_slot( - &self, - to_map: &HashMap, - ) -> Result { - let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); - for (parallel_unit_id, worker_id) in to_map { - worker_to_parallel_units - .entry(*worker_id) - .or_default() - .insert(*parallel_unit_id); - } - - let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len()); - - for (worker_id, parallel_unit_ids) in worker_to_parallel_units { - for (index, ¶llel_unit_id) in parallel_unit_ids.iter().enumerate() { - parallel_unit_to_worker_slot - .insert(parallel_unit_id, WorkerSlotId::new(worker_id, index)); - } - } - - let available_parallel_unit_ids: HashSet<_> = - parallel_unit_to_worker_slot.keys().copied().collect(); - - let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect(); - - let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids); - if sub_set.is_empty() { - Ok(self.transform(¶llel_unit_to_worker_slot)) - } else { - Err(ParallelUnitError::NotCovered(sub_set)) - } - } - - /// Create a parallel unit mapping from the protobuf representation. - pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self { - assert_eq!(proto.original_indices.len(), proto.data.len()); - Self { - original_indices: proto.original_indices.clone(), - data: proto.data.clone(), - } - } - - /// Convert this parallel unit mapping to the protobuf representation. - pub fn to_protobuf(&self) -> ParallelUnitMappingProto { - ParallelUnitMappingProto { - original_indices: self.original_indices.clone(), - data: self.data.clone(), - } - } -} - impl WorkerSlotMapping { /// Transform this worker slot mapping to an actor mapping, essentially `transform`. pub fn to_actor(&self, to_map: &HashMap) -> ActorMapping { diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index fc6dac0978adf..f528544689f31 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -22,10 +22,6 @@ use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; -/// Parallel unit is the minimal scheduling unit. -// TODO: make it a newtype -pub type ParallelUnitId = u32; - /// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for /// consistent hashing. #[repr(transparent)] diff --git a/src/ctl/src/cmd_impl/meta/cluster_info.rs b/src/ctl/src/cmd_impl/meta/cluster_info.rs index 387746f106cb8..cbc21ca6ec610 100644 --- a/src/ctl/src/cmd_impl/meta/cluster_info.rs +++ b/src/ctl/src/cmd_impl/meta/cluster_info.rs @@ -100,9 +100,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> { .actor_status .get(&actor.actor_id) .unwrap() - .get_parallel_unit() - .unwrap() - .get_worker_node_id(); + .worker_id(); fragments .entry(id) diff --git a/src/ctl/src/cmd_impl/meta/reschedule.rs b/src/ctl/src/cmd_impl/meta/reschedule.rs index fffbc46f079ea..505343fc95db8 100644 --- a/src/ctl/src/cmd_impl/meta/reschedule.rs +++ b/src/ctl/src/cmd_impl/meta/reschedule.rs @@ -21,7 +21,6 @@ use itertools::Itertools; use regex::Regex; use risingwave_meta::manager::WorkerId; use risingwave_pb::common::WorkerNode; -use risingwave_pb::meta::table_fragments::ActorStatus; use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule}; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -265,9 +264,8 @@ pub async fn unregister_workers( table_fragments .actor_status .get(&actor.actor_id) - .and_then(|ActorStatus { parallel_unit, .. }| parallel_unit.clone()) + .map(|actor_status| actor_status.worker_id()) .unwrap() - .worker_node_id }) .collect(); diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index efbd681f8cae5..7714b60a6f3ec 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -405,16 +405,16 @@ enum MetaCommands { SourceSplitInfo, /// Reschedule the parallel unit in the stream graph /// - /// The format is `fragment_id-[removed]+[added]` - /// You can provide either `removed` only or `added` only, but `removed` should be preceded by + /// The format is `fragment_id-[worker_id:count]+[worker_id:count]` + /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by /// `added` when both are provided. /// - /// For example, for plan `100-[1,2,3]+[4,5]` the follow request will be generated: + /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated: /// ```text /// { - /// 100: Reschedule { - /// added_parallel_units: [4,5], - /// removed_parallel_units: [1,2,3], + /// 100: WorkerReschedule { + /// increased_actor_count: { 1: 1 }, + /// decreased_actor_count: { 4: 1 }, /// } /// } /// ``` diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 99f1b6eebda1e..2b5015629284f 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -566,7 +566,7 @@ impl LocalQueryExecution { self.get_fragment_id(&side_table_desc.table_id.into())?, )?; - // TODO: should we use `pb::ParallelUnitMapping` here? + // TODO: should we use `pb::WorkerSlotMapping` here? node.inner_side_vnode_mapping = mapping.to_expanded().into_iter().map(u64::from).collect(); node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes(); diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 20c886a5ce531..23f97e7778519 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -397,11 +397,6 @@ derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer); derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping); derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext); -derive_from_blob!( - FragmentVnodeMapping, - risingwave_pb::common::ParallelUnitMapping -); - derive_array_from_blob!( HummockVersionDeltaArray, risingwave_pb::hummock::PbHummockVersionDelta, diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 15a2157a0642d..d50a088972eeb 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -374,7 +374,7 @@ impl StreamManagerService for StreamServiceImpl { actor_id, fragment_id: actor_to_fragment[&actor_id], state: status.state, - worker_id: status.parallel_unit.as_ref().unwrap().worker_node_id, + worker_id: status.worker_id(), } }) }) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 4cfc21fd8f1c4..f394a953b997f 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -120,9 +120,7 @@ impl ReplaceTablePlan { .actor_status .get(&actor.actor_id) .expect("should exist") - .get_parallel_unit() - .expect("should set") - .worker_node_id, + .worker_id(), ) }) .collect(), diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 28c46012d66e1..78b3da81c1aee 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -29,7 +29,7 @@ use risingwave_meta_model_v2::{ ConnectorSplits, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; -use risingwave_pb::common::PbParallelUnit; +use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; @@ -244,7 +244,7 @@ impl CatalogController { ) })?; - let worker_id = status.parallel_unit.clone().unwrap().get_worker_node_id() as WorkerId; + let worker_id = status.worker_id() as _; assert_eq!( pb_upstream_actor_id @@ -430,10 +430,7 @@ impl CatalogController { pb_actor_status.insert( actor_id as _, PbActorStatus { - parallel_unit: Some(PbParallelUnit { - id: u32::MAX, - worker_node_id: worker_id as _, - }), + location: PbActorLocation::from_worker(worker_id as u32), state: PbActorState::from(status) as _, }, ); @@ -1059,10 +1056,7 @@ impl CatalogController { let (table_fragments, actor_status, _) = Self::compose_fragment(fragment, actors, dispatcher_info)?; for actor in table_fragments.actors { - let node_id = actor_status[&actor.actor_id] - .get_parallel_unit() - .unwrap() - .worker_node_id as WorkerId; + let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId; node_actors .entry(node_id) .or_insert_with(Vec::new) @@ -1429,7 +1423,7 @@ mod tests { actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, }; - use risingwave_pb::common::ParallelUnit; + use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment}; @@ -1554,10 +1548,7 @@ mod tests { ( actor_id, PbActorStatus { - parallel_unit: Some(ParallelUnit { - id: u32::MAX, - worker_node_id: 0, - }), + location: PbActorLocation::from_worker(0), state: PbActorState::Running as _, }, ) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 8de621abc97b1..ce86ca96ea507 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -39,7 +39,6 @@ use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo}; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::{ PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation, RelationGroup, @@ -1426,11 +1425,7 @@ impl CatalogController { expr_context, .. }, - // actor_status - PbActorStatus { - parallel_unit, - state: _, - }, + actor_status, ) in newly_created_actors { let mut actor_upstreams = BTreeMap::>::new(); @@ -1467,7 +1462,6 @@ impl CatalogController { ); let actor_upstreams = ActorUpstreamActors(actor_upstreams); - let parallel_unit = parallel_unit.unwrap(); let splits = actor_splits .get(&actor_id) @@ -1478,7 +1472,7 @@ impl CatalogController { fragment_id: Set(fragment_id as _), status: Set(ActorStatus::Running), splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())), - worker_id: Set(parallel_unit.worker_node_id as _), + worker_id: Set(actor_status.worker_id() as _), upstream_actor_ids: Set(actor_upstreams), vnode_bitmap: Set(vnode_bitmap.as_ref().map(|bitmap| bitmap.into())), expr_context: Set(expr_context.as_ref().unwrap().into()), diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 9d3d558ac4839..8aeaed2f9c5a8 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_common::error::BoxedError; -use risingwave_common::hash::ParallelUnitError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -126,13 +125,6 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, - - #[error("ParallelUnit error: {0}")] - ParallelUnit( - #[from] - #[backtrace] - ParallelUnitError, - ), } impl MetaError { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 36cc97e2665cf..76b4bbaac2f01 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -27,7 +27,7 @@ use risingwave_common::util::stream_graph_visitor::{ use risingwave_common::util::worker_util::WorkerNodeId; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::common::ParallelUnit; +use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; @@ -87,7 +87,7 @@ impl FragmentManagerCore { .exactly_one() .expect("single actor"); let status = table_fragment.actor_status.get(&actor.actor_id).unwrap(); - let worker_id = status.get_parallel_unit().unwrap().get_worker_node_id(); + let worker_id = status.worker_id(); result.push(FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, mapping: Some( @@ -110,7 +110,7 @@ impl FragmentManagerCore { ActorState::Running => {} } - let worker_id = status.get_parallel_unit().unwrap().get_worker_node_id(); + let worker_id = status.worker_id(); actor_bitmaps.insert( actor.actor_id as ActorId, Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()), @@ -789,13 +789,7 @@ impl FragmentManager { .get(&actor.actor_id) .expect("should exist"); if status.state == ActorState::Running as i32 { - Some(( - actor.actor_id, - status - .get_parallel_unit() - .expect("should set") - .worker_node_id, - )) + Some((actor.actor_id, status.worker_id())) } else { None } @@ -835,9 +829,7 @@ impl FragmentManager { .actor_status .get(&actor.actor_id) .unwrap() - .get_parallel_unit() - .unwrap() - .get_worker_node_id(); + .worker_id(); let fragment_ref = worker_fragment_map.entry(worker).or_insert(HashMap::new()); fragment_ref @@ -863,10 +855,7 @@ impl FragmentManager { .actor_status .get_mut(&actor) .expect("should exist"); - status.parallel_unit = Some(ParallelUnit { - id: u32::MAX, - worker_node_id: target.worker_id(), - }) + status.location = PbActorLocation::from_worker(target.worker_id()); } } @@ -901,9 +890,7 @@ impl FragmentManager { .values() .filter(|tf| { for status in tf.actor_status.values() { - if expired_workers - .contains(&status.get_parallel_unit().unwrap().get_worker_node_id()) - { + if expired_workers.contains(&status.worker_id()) { return true; } } @@ -933,9 +920,7 @@ impl FragmentManager { .actor_status .get(&actor.actor_id) .unwrap() - .get_parallel_unit() - .unwrap() - .get_worker_node_id(); + .worker_id(); let fragment_ref = worker_fragment_map.entry(worker).or_insert(HashMap::new()); *fragment_ref.entry(fragment.fragment_id).or_insert(0) += 1; @@ -986,10 +971,8 @@ impl FragmentManager { let map = &self.core.read().await.table_fragments; for fragments in map.values() { for actor_status in fragments.actor_status.values() { - if let Some(pu) = &actor_status.parallel_unit { - let e = actor_count.entry(pu.worker_node_id).or_insert(0); - *e += 1; - } + let e = actor_count.entry(actor_status.worker_id()).or_insert(0); + *e += 1; } } @@ -1361,7 +1344,7 @@ impl FragmentManager { let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment.actors.len()); for actor in &fragment.actors { let actor_status = &actor_status[&actor.actor_id]; - let worker_id = actor_status.parallel_unit.as_ref().unwrap().worker_node_id; + let worker_id = actor_status.worker_id(); actor_to_worker.insert(actor.actor_id, worker_id); if let Some(vnode_bitmap) = &actor.vnode_bitmap { @@ -1593,10 +1576,7 @@ impl FragmentManager { .actor_status .iter() .for_each(|(actor_id, status)| { - actor_locations.insert( - *actor_id, - status.get_parallel_unit().unwrap().worker_node_id, - ); + actor_locations.insert(*actor_id, status.worker_id()); }); } @@ -1656,10 +1636,7 @@ impl FragmentManager { .actor_status .iter() .for_each(|(actor_id, status)| { - actor_locations.insert( - *actor_id, - status.get_parallel_unit().unwrap().worker_node_id, - ); + actor_locations.insert(*actor_id, status.worker_id()); }); }); diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index e791bd98b95c3..41e1ae02a21d0 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -945,13 +945,13 @@ mod tests { } async fn assert_cluster_manager(cluster_manager: &ClusterManager, parallel_count: usize) { - let parallel_units: usize = cluster_manager + let parallelism: usize = cluster_manager .list_active_serving_compute_nodes() .await .into_iter() .map(|w| w.parallelism as usize) .sum(); - assert_eq!(parallel_units, parallel_count); + assert_eq!(parallelism, parallel_count); } // This test takes seconds because the TTL is measured in seconds. diff --git a/src/meta/src/manager/id.rs b/src/meta/src/manager/id.rs index 023483116fdc8..95aa4f1935b58 100644 --- a/src/meta/src/manager/id.rs +++ b/src/meta/src/manager/id.rs @@ -128,7 +128,6 @@ pub mod IdCategory { pub const Actor: IdCategoryType = 6; pub const Backup: IdCategoryType = 7; pub const HummockSstableId: IdCategoryType = 8; - pub const ParallelUnit: IdCategoryType = 9; pub const _Source: IdCategoryType = 10; pub const HummockCompactionTask: IdCategoryType = 11; pub const User: IdCategoryType = 12; @@ -159,7 +158,6 @@ pub struct IdGeneratorManager { backup: Arc, hummock_ss_table_id: Arc, hummock_compaction_task: Arc, - parallel_unit: Arc, compaction_group: Arc, connection: Arc, secret: Arc, @@ -198,9 +196,6 @@ impl IdGeneratorManager { StoredIdGenerator::new(meta_store.clone(), "hummock_compaction_task", Some(1)) .await, ), - parallel_unit: Arc::new( - StoredIdGenerator::new(meta_store.clone(), "parallel_unit", None).await, - ), compaction_group: Arc::new( StoredIdGenerator::new( meta_store.clone(), @@ -230,7 +225,6 @@ impl IdGeneratorManager { IdCategory::Backup => &self.backup, IdCategory::Worker => &self.worker, IdCategory::HummockSstableId => &self.hummock_ss_table_id, - IdCategory::ParallelUnit => &self.parallel_unit, IdCategory::HummockCompactionTask => &self.hummock_compaction_task, IdCategory::CompactionGroup => &self.compaction_group, IdCategory::Connection => &self.connection, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 368adaec52ead..3ef81dcda7a68 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::WorkerSlotId; use risingwave_connector::source::SplitImpl; -use risingwave_pb::common::ParallelUnit; +use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; use risingwave_pb::meta::table_parallelism::{ @@ -174,9 +174,11 @@ impl MetadataModel for TableFragments { parallelism: Some(Parallelism::Custom(PbCustomParallelism {})), }; + let state = prost.state(); + Self { table_id: TableId::new(prost.table_id), - state: prost.state(), + state, fragments: prost.fragments.into_iter().collect(), actor_status: prost.actor_status.into_iter().collect(), actor_splits: build_actor_split_impls(&prost.actor_splits), @@ -217,10 +219,7 @@ impl TableFragments { ( actor_id, ActorStatus { - parallel_unit: Some(ParallelUnit { - id: u32::MAX, - worker_node_id: worker_slot_id.worker_id(), - }), + location: PbActorLocation::from_worker(worker_slot_id.worker_id()), state: ActorState::Inactive as i32, }, ) @@ -478,7 +477,7 @@ impl TableFragments { pub fn worker_actor_states(&self) -> BTreeMap> { let mut map = BTreeMap::default(); for (&actor_id, actor_status) in &self.actor_status { - let node_id = actor_status.get_parallel_unit().unwrap().worker_node_id as WorkerId; + let node_id = actor_status.worker_id() as WorkerId; map.entry(node_id) .or_insert_with(Vec::new) .push((actor_id, actor_status.state())); @@ -490,7 +489,7 @@ impl TableFragments { pub fn worker_actor_ids(&self) -> BTreeMap> { let mut map = BTreeMap::default(); for (&actor_id, actor_status) in &self.actor_status { - let node_id = actor_status.get_parallel_unit().unwrap().worker_node_id as WorkerId; + let node_id = actor_status.worker_id() as WorkerId; map.entry(node_id).or_insert_with(Vec::new).push(actor_id); } map @@ -501,10 +500,7 @@ impl TableFragments { let mut actors = BTreeMap::default(); for fragment in self.fragments.values() { for actor in &fragment.actors { - let node_id = self.actor_status[&actor.actor_id] - .get_parallel_unit() - .unwrap() - .worker_node_id as WorkerId; + let node_id = self.actor_status[&actor.actor_id].worker_id() as WorkerId; if !include_inactive && self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 { diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 6fe0df07bb359..79d34c6904be2 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -966,17 +966,11 @@ pub fn start_fragment_info_monitor( if let Some(actor_status) = table_fragments.actor_status.get(&actor.actor_id) { - if let Some(pu) = &actor_status.parallel_unit { - if let Some(address) = workers.get(&pu.worker_node_id) { - meta_metrics - .actor_info - .with_label_values(&[ - &actor_id_str, - &fragment_id_str, - address, - ]) - .set(1); - } + if let Some(address) = workers.get(&actor_status.worker_id()) { + meta_metrics + .actor_info + .with_label_values(&[&actor_id_str, &fragment_id_str, address]) + .set(1); } } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 679dde8e81bc5..7d8f8049d5c4d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -31,7 +31,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnit, WorkerNode, WorkerType}; +use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::{ @@ -530,10 +530,7 @@ impl ScaleController { } for (actor_id, status) in &table_fragments.actor_status { - actor_status.insert( - *actor_id, - status.get_parallel_unit().unwrap().get_worker_node_id(), - ); + actor_status.insert(*actor_id, status.worker_id()); } fragment_to_table.extend( @@ -861,7 +858,7 @@ impl ScaleController { } } - // Index for fragment -> { actor -> parallel_unit } after reschedule. + // Index for fragment -> { actor -> worker_id } after reschedule. // Since we need to organize the upstream and downstream relationships of NoShuffle, // we need to organize the actor distribution after a scaling. let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len()); @@ -1455,10 +1452,7 @@ impl ScaleController { ( actor, ActorStatus { - parallel_unit: Some(ParallelUnit { - id: u32::MAX, - worker_node_id: *worker_id, - }), + location: PbActorLocation::from_worker(*worker_id), state: ActorState::Inactive as i32, }, ), @@ -1930,10 +1924,7 @@ impl ScaleController { } for (actor_id, status) in &table_fragments.actor_status { - actor_location.insert( - *actor_id, - status.get_parallel_unit().unwrap().get_worker_node_id(), - ); + actor_location.insert(*actor_id, status.worker_id()); } } diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 254c4d0470cd3..0a12d8ab5f9b8 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -530,7 +530,7 @@ impl ActorGraphBuildStateInner { DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => { // Add dispatchers for the upstream actors. let dispatcher = if let DispatcherType::Hash = dt { - // Transform the `ParallelUnitMapping` from the downstream distribution to the + // Transform the `WorkerSlotMapping` from the downstream distribution to the // `ActorMapping`, used for the `HashDispatcher` for the upstream actors. let downstream_locations: HashMap = downstream .actor_ids diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index 636ca8e1af5e8..0dc0bced84005 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -19,14 +19,13 @@ mod tests { use itertools::Itertools; use maplit::btreeset; use risingwave_common::bitmap::Bitmap; - use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping, VirtualNode}; - use risingwave_pb::common::ParallelUnit; + use risingwave_common::hash::{ActorMapping, VirtualNode}; use crate::model::ActorId; use crate::stream::scale::rebalance_actor_vnode; use crate::stream::CustomActorInfo; - fn simulated_parallel_unit_nums(min: Option, max: Option) -> Vec { + fn simulated_parallelism(min: Option, max: Option) -> Vec { let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT]; if let Some(min) = min { raw.retain(|n| *n > min); @@ -39,31 +38,20 @@ mod tests { raw } - fn build_fake_actors(info: &[(ActorId, ParallelUnitId)]) -> Vec { - let parallel_units = generate_parallel_units(info); - - let vnode_bitmaps = ParallelUnitMapping::build(¶llel_units).to_bitmaps(); - - info.iter() - .map(|(actor_id, parallel_unit_id)| CustomActorInfo { + fn build_fake_actors(actor_ids: Vec) -> Vec { + let actor_bitmaps = ActorMapping::new_uniform(actor_ids.clone().into_iter()).to_bitmaps(); + actor_ids + .iter() + .map(|actor_id| CustomActorInfo { actor_id: *actor_id, - vnode_bitmap: vnode_bitmaps - .get(parallel_unit_id) + vnode_bitmap: actor_bitmaps + .get(actor_id) .map(|bitmap| bitmap.to_protobuf()), ..Default::default() }) .collect() } - fn generate_parallel_units(info: &[(ActorId, ParallelUnitId)]) -> Vec { - info.iter() - .map(|(_, parallel_unit_id)| ParallelUnit { - id: *parallel_unit_id, - ..Default::default() - }) - .collect_vec() - } - fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) { let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()); @@ -98,22 +86,19 @@ mod tests { } #[test] - fn test_build_vnode_mapping() { - for parallel_units_num in simulated_parallel_unit_nums(None, None) { - let info = (0..parallel_units_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(); - let parallel_units = generate_parallel_units(&info); - let vnode_mapping = ParallelUnitMapping::build(¶llel_units); + fn test_build_actor_mapping() { + for parallelism in simulated_parallelism(None, None) { + let actor_ids = (0..parallelism as ActorId).collect_vec(); + let actor_mapping = ActorMapping::new_uniform(actor_ids.into_iter()); - assert_eq!(vnode_mapping.len(), VirtualNode::COUNT); + assert_eq!(actor_mapping.len(), VirtualNode::COUNT); let mut check: HashMap> = HashMap::new(); - for (vnode, parallel_unit_id) in vnode_mapping.iter_with_vnode() { - check.entry(parallel_unit_id).or_default().push(vnode); + for (vnode, actor_id) in actor_mapping.iter_with_vnode() { + check.entry(actor_id).or_default().push(vnode); } - assert_eq!(check.len(), parallel_units_num); + assert_eq!(check.len(), parallelism); let (min, max) = check .values() @@ -126,46 +111,9 @@ mod tests { } } - #[test] - fn test_vnode_mapping_to_bitmaps() { - for parallel_units_num in simulated_parallel_unit_nums(None, None) { - let info = (0..parallel_units_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(); - let parallel_units = generate_parallel_units(&info); - let bitmaps = ParallelUnitMapping::build(¶llel_units).to_bitmaps(); - check_bitmaps(&bitmaps); - } - } - - #[test] - fn test_mapping_convert() { - for parallel_unit_num in simulated_parallel_unit_nums(None, None) { - let (actor_mapping, _) = generate_actor_mapping(parallel_unit_num); - - let actor_to_parallel_unit_map: HashMap<_, _> = (0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect(); - let parallel_unit_mapping = actor_mapping.to_parallel_unit(&actor_to_parallel_unit_map); - - let parallel_unit_to_actor_map: HashMap<_, _> = actor_to_parallel_unit_map - .into_iter() - .map(|(k, v)| (v, k)) - .collect(); - - let new_actor_mapping = parallel_unit_mapping.to_actor(¶llel_unit_to_actor_map); - - assert_eq!(actor_mapping, new_actor_mapping) - } - } - - fn generate_actor_mapping( - parallel_unit_num: usize, - ) -> (ActorMapping, HashMap) { - let parallel_units = (0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(); - let actors = build_fake_actors(¶llel_units); + fn generate_actor_mapping(parallelism: usize) -> (ActorMapping, HashMap) { + let actor_ids = (0..parallelism).map(|i| i as ActorId).collect_vec(); + let actors = build_fake_actors(actor_ids); let bitmaps: HashMap<_, _> = actors .into_iter() @@ -182,8 +130,8 @@ mod tests { #[test] fn test_actor_mapping_from_bitmaps() { - for parallel_unit_num in simulated_parallel_unit_nums(None, None) { - let (actor_mapping, bitmaps) = generate_actor_mapping(parallel_unit_num); + for parallelism in simulated_parallelism(None, None) { + let (actor_mapping, bitmaps) = generate_actor_mapping(parallelism); check_bitmaps(&bitmaps); for (actor_id, bitmap) in &bitmaps { @@ -198,7 +146,7 @@ mod tests { #[test] fn test_rebalance_empty() { - let actors = build_fake_actors(&(0..3).map(|i| (i, i)).collect_vec()); + let actors = build_fake_actors((0..3 as ActorId).collect_vec()); // empty input let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &BTreeSet::new()); @@ -207,12 +155,8 @@ mod tests { #[test] fn test_rebalance_scale_in() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), None) { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), None) { + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // remove 1 let actors_to_remove = btreeset! {0}; @@ -222,7 +166,7 @@ mod tests { check_affinity_for_scale_in(result.get(&(1 as ActorId)).unwrap(), &actors[1]); // remove n-1 - let actors_to_remove = (1..parallel_unit_num as ActorId).collect(); + let actors_to_remove = (1..parallelism as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &actors_to_remove, &BTreeSet::new()); assert_eq!(result.len(), 1); check_bitmaps(&result); @@ -234,28 +178,19 @@ mod tests { #[test] fn test_rebalance_scale_out() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), Some(VirtualNode::COUNT - 1)) - { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT - 1)) { + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add 1 - let actors_to_add = btreeset! {parallel_unit_num as ActorId}; + let actors_to_add = btreeset! {parallelism as ActorId}; let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); + // add to VirtualNode::COUNT - let actors_to_add = - (parallel_unit_num as ActorId..VirtualNode::COUNT as ActorId).collect(); + let actors_to_add = (parallelism as ActorId..VirtualNode::COUNT as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); @@ -264,16 +199,12 @@ mod tests { #[test] fn test_rebalance_migration() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), None) { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), None) { + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - for idx in 0..parallel_unit_num { + for idx in 0..parallelism { let actors_to_remove = btreeset! {idx as ActorId}; - let actors_to_add = btreeset! {parallel_unit_num as ActorId}; + let actors_to_add = btreeset! {parallelism as ActorId}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -293,18 +224,12 @@ mod tests { assert!(prev_bitmap.eq(target_bitmap)); } } + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); - - for migration_count in 1..parallel_unit_num { + for migration_count in 1..parallelism { let actors_to_remove = (0..migration_count as ActorId).collect(); - let actors_to_add = (parallel_unit_num as ActorId - ..(parallel_unit_num + migration_count) as ActorId) - .collect(); + let actors_to_add = + (parallelism as ActorId..(parallelism + migration_count) as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -319,16 +244,13 @@ mod tests { #[test] fn test_rebalance_scale() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), None) { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), None) { + let actor_ids = (0..parallelism as ActorId).collect_vec(); + let actors = build_fake_actors(actor_ids); - let parallel_unit_num = parallel_unit_num as ActorId; + let parallelism = parallelism as ActorId; let actors_to_remove = btreeset! {0}; - let actors_to_add = btreeset! {parallel_unit_num, parallel_unit_num+1}; + let actors_to_add = btreeset! {parallelism, parallelism+1}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -338,7 +260,7 @@ mod tests { check_bitmaps(&result); let actors_to_remove = btreeset! {0, 1}; - let actors_to_add = btreeset! {parallel_unit_num}; + let actors_to_add = btreeset! {parallelism}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -353,11 +275,8 @@ mod tests { #[test] fn test_rebalance_scale_real() { - let parallel_units = (0..(VirtualNode::COUNT - 1) as ActorId) - .map(|i| (i, i)) - .collect_vec(); - let actors = build_fake_actors(¶llel_units); - + let actor_ids = (0..(VirtualNode::COUNT - 1) as ActorId).collect_vec(); + let actors = build_fake_actors(actor_ids); let actors_to_remove = btreeset! {0, 1}; let actors_to_add = btreeset! {255}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index d1958dc74e42f..38e03db30a23f 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -224,6 +224,21 @@ impl stream_plan::SourceNode { } } +impl meta::table_fragments::ActorStatus { + pub fn worker_id(&self) -> u32 { + self.location + .as_ref() + .expect("actor location should be exist") + .worker_node_id + } +} + +impl common::ActorLocation { + pub fn from_worker(worker_node_id: u32) -> Option { + Some(Self { worker_node_id }) + } +} + impl stream_plan::StreamNode { /// Find the external stream source info inside the stream node, if any. /// diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 0703bb448d677..ac9fb2f9dbf1a 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -26,7 +26,7 @@ use itertools::Itertools; use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng}; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ParallelUnitId, WorkerSlotId}; +use risingwave_common::hash::WorkerSlotId; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; @@ -249,12 +249,9 @@ impl Fragment { .table_fragments .iter() .flat_map(|tf| { - tf.actor_status.iter().map(|(&actor_id, status)| { - ( - actor_id, - status.get_parallel_unit().unwrap().get_worker_node_id(), - ) - }) + tf.actor_status + .iter() + .map(|(&actor_id, status)| (actor_id, status.worker_id())) }) .collect();