diff --git a/proto/common.proto b/proto/common.proto index fab50dcfecac2..4f0d56b4823a9 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -92,12 +92,6 @@ message ParallelUnitMapping { repeated uint32 data = 2; } -// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id. -message WorkerMapping { - repeated uint32 original_indices = 1; - repeated uint32 data = 2; -} - message BatchQueryEpoch { oneof epoch { uint64 committed = 1; diff --git a/proto/meta.proto b/proto/meta.proto index d6c3ffe709d8f..dadc5b364c623 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -111,16 +111,6 @@ message FragmentParallelUnitMappings { repeated FragmentParallelUnitMapping mappings = 1; } -/// Worker mapping with fragment id, used for notification. -message FragmentWorkerMapping { - uint32 fragment_id = 1; - common.WorkerMapping mapping = 2; -} - -message FragmentWorkerMappings { - repeated FragmentWorkerMapping mappings = 1; -} - // TODO: remove this when dashboard refactored. message ActorLocation { common.WorkerNode node = 1; @@ -387,10 +377,8 @@ message SubscribeRequest { message MetaSnapshot { message SnapshotVersion { uint64 catalog_version = 1; - reserved 2; - reserved "parallel_unit_mapping_version"; + uint64 parallel_unit_mapping_version = 2; uint64 worker_node_version = 3; - uint64 streaming_worker_mapping_version = 4; } repeated catalog.Database databases = 1; repeated catalog.Schema schemas = 2; @@ -403,20 +391,16 @@ message MetaSnapshot { repeated catalog.Connection connections = 17; repeated catalog.Subscription subscriptions = 19; repeated user.UserInfo users = 8; - reserved 9; - reserved "parallel_unit_mappings"; GetSessionParamsResponse session_params = 20; + // for streaming + repeated FragmentParallelUnitMapping parallel_unit_mappings = 9; repeated common.WorkerNode nodes = 10; hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; hummock.WriteLimits hummock_write_limits = 16; - reserved 18; - reserved "serving_parallel_unit_mappings"; - - // for streaming - repeated FragmentWorkerMapping streaming_worker_mappings = 21; - repeated FragmentWorkerMapping serving_worker_mappings = 22; + // for serving + repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18; SnapshotVersion version = 13; } @@ -455,6 +439,8 @@ message SubscribeResponse { catalog.Function function = 6; user.UserInfo user = 11; SetSessionParamRequest session_param = 26; + // for streaming + FragmentParallelUnitMapping parallel_unit_mapping = 12; common.WorkerNode node = 13; hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; @@ -464,15 +450,10 @@ message SubscribeResponse { hummock.WriteLimits hummock_write_limits = 20; RelationGroup relation_group = 21; catalog.Connection connection = 22; + FragmentParallelUnitMappings serving_parallel_unit_mappings = 23; hummock.HummockVersionStats hummock_stats = 24; Recovery recovery = 25; - FragmentWorkerMapping streaming_worker_mapping = 27; - FragmentWorkerMappings serving_worker_mappings = 28; } - reserved 12; - reserved "parallel_unit_mapping"; - reserved 23; - reserved "serving_parallel_unit_mappings"; } service NotificationService { @@ -647,10 +628,8 @@ service SessionParamService { message GetServingVnodeMappingsRequest {} message GetServingVnodeMappingsResponse { - reserved 1; - reserved "mappings"; + repeated FragmentParallelUnitMapping mappings = 1; map fragment_to_table = 2; - repeated FragmentWorkerMapping worker_mappings = 3; } service ServingService { diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 838e4b738e082..17b257106fb5b 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -21,7 +21,7 @@ use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ - ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId, + ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, }; use risingwave_common::memory::MemoryContext; use risingwave_common::types::{DataType, Datum}; @@ -29,6 +29,7 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::scan_range::ScanRange; use risingwave_common::util::tracing::TracingContext; +use risingwave_common::util::worker_util::get_pu_to_worker_mapping; use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::exchange_info::DistributionMode; use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan; @@ -51,7 +52,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; struct InnerSideExecutorBuilder { table_desc: StorageTableDesc, table_distribution: TableDistribution, - vnode_mapping: ExpandedWorkerMapping, + vnode_mapping: ExpandedParallelUnitMapping, outer_side_key_types: Vec, inner_side_schema: Schema, inner_side_column_ids: Vec, @@ -60,8 +61,8 @@ struct InnerSideExecutorBuilder { context: C, task_id: TaskId, epoch: BatchQueryEpoch, - worker_mapping: HashMap, - worker_to_scan_range_mapping: HashMap>, + pu_to_worker_mapping: HashMap, + pu_to_scan_range_mapping: HashMap>, chunk_size: usize, shutdown_rx: ShutdownToken, next_stage_id: usize, @@ -91,7 +92,7 @@ impl InnerSideExecutorBuilder { /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table /// based on the passed `scan_range` and virtual node. fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result { - let list = self.worker_to_scan_range_mapping.get(id).unwrap(); + let list = self.pu_to_scan_range_mapping.get(id).unwrap(); let mut scan_ranges = vec![]; let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len()); @@ -113,11 +114,11 @@ impl InnerSideExecutorBuilder { } /// Creates the `PbExchangeSource` using the given `id`. - fn build_prost_exchange_source(&self, id: &WorkerId) -> Result { + fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result { let worker = self - .worker_mapping + .pu_to_worker_mapping .get(id) - .context("No worker node found for the given worker id.")?; + .context("No worker node found for the given parallel unit id.")?; let local_execute_plan = LocalExecutePlan { plan: Some(PlanFragment { @@ -159,7 +160,7 @@ impl InnerSideExecutorBuilder { #[async_trait::async_trait] impl LookupExecutorBuilder for InnerSideExecutorBuilder { fn reset(&mut self) { - self.worker_to_scan_range_mapping = HashMap::new(); + self.pu_to_scan_range_mapping = HashMap::new(); } /// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id @@ -190,11 +191,11 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder } let vnode = self.get_virtual_node(&scan_range)?; - let worker_id = self.vnode_mapping[vnode.to_index()]; + let parallel_unit_id = self.vnode_mapping[vnode.to_index()]; let list = self - .worker_to_scan_range_mapping - .entry(worker_id) + .pu_to_scan_range_mapping + .entry(parallel_unit_id) .or_default(); list.push((scan_range, vnode)); @@ -206,7 +207,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder async fn build_executor(&mut self) -> Result { self.next_stage_id += 1; let mut sources = vec![]; - for id in self.worker_to_scan_range_mapping.keys() { + for id in self.pu_to_scan_range_mapping.keys() { sources.push(self.build_prost_exchange_source(id)?); } @@ -372,14 +373,6 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { let chunk_size = source.context.get_config().developer.chunk_size; - let worker_nodes = lookup_join_node.get_worker_nodes(); - let worker_mapping: HashMap = worker_nodes - .iter() - .map(|worker| (worker.id, worker.clone())) - .collect(); - - assert_eq!(worker_mapping.len(), worker_nodes.len()); - let inner_side_builder = InnerSideExecutorBuilder { table_desc: table_desc.clone(), table_distribution: TableDistribution::new_from_storage_table_desc( @@ -395,11 +388,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { context: source.context().clone(), task_id: source.task_id.clone(), epoch: source.epoch(), - worker_to_scan_range_mapping: HashMap::new(), + pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()), + pu_to_scan_range_mapping: HashMap::new(), chunk_size, shutdown_rx: source.shutdown_rx.clone(), next_stage_id: 0, - worker_mapping, }; let identity = source.plan_node().get_identity().clone(); diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index f2a0edf135104..5b0813186fd1c 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -18,7 +18,8 @@ use std::time::Duration; use rand::seq::SliceRandom; use risingwave_common::bail; -use risingwave_common::hash::{WorkerId, WorkerMapping}; +use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; +use risingwave_common::util::worker_util::get_pu_to_worker_mapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; @@ -35,10 +36,12 @@ pub struct WorkerNodeManager { struct WorkerNodeManagerInner { worker_nodes: Vec, + /// A cache for parallel units to worker nodes. It should be consistent with `worker_nodes`. + pu_to_worker: HashMap, /// fragment vnode mapping info for streaming - streaming_fragment_vnode_mapping: HashMap, + streaming_fragment_vnode_mapping: HashMap, /// fragment vnode mapping info for serving - serving_fragment_vnode_mapping: HashMap, + serving_fragment_vnode_mapping: HashMap, } pub type WorkerNodeManagerRef = Arc; @@ -54,6 +57,7 @@ impl WorkerNodeManager { Self { inner: RwLock::new(WorkerNodeManagerInner { worker_nodes: Default::default(), + pu_to_worker: Default::default(), streaming_fragment_vnode_mapping: Default::default(), serving_fragment_vnode_mapping: Default::default(), }), @@ -64,6 +68,7 @@ impl WorkerNodeManager { /// Used in tests. pub fn mock(worker_nodes: Vec) -> Self { let inner = RwLock::new(WorkerNodeManagerInner { + pu_to_worker: get_pu_to_worker_mapping(&worker_nodes), worker_nodes, streaming_fragment_vnode_mapping: HashMap::new(), serving_fragment_vnode_mapping: HashMap::new(), @@ -115,18 +120,23 @@ impl WorkerNodeManager { *w = node; } } + // Update `pu_to_worker` + write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn remove_worker_node(&self, node: WorkerNode) { let mut write_guard = self.inner.write().unwrap(); write_guard.worker_nodes.retain(|x| x.id != node.id); + + // Update `pu_to_worker` + write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn refresh( &self, nodes: Vec, - streaming_mapping: HashMap, - serving_mapping: HashMap, + streaming_mapping: HashMap, + serving_mapping: HashMap, ) { let mut write_guard = self.inner.write().unwrap(); tracing::debug!("Refresh worker nodes {:?}.", nodes); @@ -139,43 +149,42 @@ impl WorkerNodeManager { serving_mapping.keys() ); write_guard.worker_nodes = nodes; + // Update `pu_to_worker` + write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); write_guard.streaming_fragment_vnode_mapping = streaming_mapping; write_guard.serving_fragment_vnode_mapping = serving_mapping; } - /// If worker ids is empty, the scheduler may fail to schedule any task and stuck at + /// If parallel unit ids is empty, the scheduler may fail to schedule any task and stuck at /// schedule next stage. If we do not return error in this case, needs more complex control /// logic above. Report in this function makes the schedule root fail reason more clear. - pub fn get_workers_by_worker_ids(&self, worker_ids: &[WorkerId]) -> Result> { - if worker_ids.is_empty() { + pub fn get_workers_by_parallel_unit_ids( + &self, + parallel_unit_ids: &[ParallelUnitId], + ) -> Result> { + if parallel_unit_ids.is_empty() { return Err(BatchError::EmptyWorkerNodes); } let guard = self.inner.read().unwrap(); - // TODO: Does the return order of this function need to match the order of the parameters? - let worker_index: HashMap<_, _> = guard - .worker_nodes - .iter() - .map(|worker| (worker.id, worker.clone())) - .collect(); - - let mut workers = Vec::with_capacity(worker_ids.len()); - - for worker_id in worker_ids { - match worker_index.get(worker_id) { + let mut workers = Vec::with_capacity(parallel_unit_ids.len()); + for parallel_unit_id in parallel_unit_ids { + match guard.pu_to_worker.get(parallel_unit_id) { Some(worker) => workers.push(worker.clone()), - None => bail!("No worker node found for worker id: {}", worker_id), + None => bail!( + "No worker node found for parallel unit id: {}", + parallel_unit_id + ), } } - Ok(workers) } pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, - ) -> Result { + ) -> Result { self.inner .read() .unwrap() @@ -188,7 +197,7 @@ impl WorkerNodeManager { pub fn insert_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: WorkerMapping, + vnode_mapping: ParallelUnitMapping, ) { self.inner .write() @@ -201,7 +210,7 @@ impl WorkerNodeManager { pub fn update_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: WorkerMapping, + vnode_mapping: ParallelUnitMapping, ) { let mut guard = self.inner.write().unwrap(); guard @@ -219,7 +228,7 @@ impl WorkerNodeManager { } /// Returns fragment's vnode mapping for serving. - fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { + fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { self.inner .read() .unwrap() @@ -227,7 +236,7 @@ impl WorkerNodeManager { .ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id)) } - pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { + pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Set serving vnode mapping for fragments {:?}", @@ -236,7 +245,10 @@ impl WorkerNodeManager { guard.serving_fragment_vnode_mapping = mappings; } - pub fn upsert_serving_fragment_mapping(&self, mappings: HashMap) { + pub fn upsert_serving_fragment_mapping( + &self, + mappings: HashMap, + ) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Upsert serving vnode mapping for fragments {:?}", @@ -287,7 +299,7 @@ impl WorkerNodeManager { } impl WorkerNodeManagerInner { - fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { + fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { self.serving_fragment_vnode_mapping .get(&fragment_id) .cloned() @@ -330,7 +342,7 @@ impl WorkerNodeSelector { .sum() } - pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { + pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 8ff0a6f9261c4..3f18be7697520 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -145,6 +145,9 @@ where | Info::Function(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } + Info::ParallelUnitMapping(_) => { + notification.version > info.version.as_ref().unwrap().parallel_unit_mapping_version + } Info::Node(_) => { notification.version > info.version.as_ref().unwrap().worker_node_version } @@ -154,18 +157,10 @@ where Info::HummockSnapshot(_) => true, Info::MetaBackupManifestId(_) => true, Info::SystemParams(_) | Info::SessionParam(_) => true, + Info::ServingParallelUnitMappings(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), Info::HummockStats(_) => true, Info::Recovery(_) => true, - Info::StreamingWorkerMapping(_) => { - notification.version - > info - .version - .as_ref() - .unwrap() - .streaming_worker_mapping_version - } - Info::ServingWorkerMappings(_) => true, }); self.observer_states diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 9a41de0831c10..c542ab2050cf1 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -19,9 +19,7 @@ use std::ops::Index; use educe::Educe; use itertools::Itertools; -use risingwave_pb::common::{ - ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerMapping, -}; +use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto}; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; use super::bitmap::VnodeBitmapExt; @@ -32,7 +30,6 @@ use crate::util::iter_util::ZipEqDebug; // TODO: find a better place for this. pub type ActorId = u32; -pub type WorkerId = u32; /// Trait for items that can be used as keys in [`VnodeMapping`]. pub trait VnodeMappingItem { @@ -257,12 +254,6 @@ pub mod marker { impl VnodeMappingItem for ParallelUnit { type Item = ParallelUnitId; } - - /// A marker type for items of [`WorkerId`]. - pub struct Worker; - impl VnodeMappingItem for Worker { - type Item = WorkerId; - } } /// A mapping from [`VirtualNode`] to [`ActorId`]. @@ -275,11 +266,6 @@ pub type ParallelUnitMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`]. pub type ExpandedParallelUnitMapping = ExpandedMapping; -/// A mapping from [`VirtualNode`] to [`WorkerId`]. -pub type WorkerMapping = VnodeMapping; -/// An expanded mapping from [`VirtualNode`] to [`WorkerId`]. -pub type ExpandedWorkerMapping = ExpandedMapping; - impl ActorMapping { /// Transform this actor mapping to a parallel unit mapping, essentially `transform`. pub fn to_parallel_unit(&self, to_map: &M) -> ParallelUnitMapping @@ -307,30 +293,6 @@ impl ActorMapping { } } -impl WorkerMapping { - /// Create a uniform worker mapping from the given worker ids - pub fn build_from_ids(worker_ids: &[WorkerId]) -> Self { - Self::new_uniform(worker_ids.iter().cloned()) - } - - /// Create a worker mapping from the protobuf representation. - pub fn from_protobuf(proto: &PbWorkerMapping) -> Self { - assert_eq!(proto.original_indices.len(), proto.data.len()); - Self { - original_indices: proto.original_indices.clone(), - data: proto.data.clone(), - } - } - - /// Convert this worker mapping to the protobuf representation. - pub fn to_protobuf(&self) -> PbWorkerMapping { - PbWorkerMapping { - original_indices: self.original_indices.clone(), - data: self.data.clone(), - } - } -} - impl ParallelUnitMapping { /// Create a uniform parallel unit mapping from the given parallel units, essentially /// `new_uniform`. @@ -348,11 +310,6 @@ impl ParallelUnitMapping { self.transform(to_map) } - /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. - pub fn to_worker(&self, to_map: &HashMap) -> WorkerMapping { - self.transform(to_map) - } - /// Create a parallel unit mapping from the protobuf representation. pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self { assert_eq!(proto.original_indices.len(), proto.data.len()); diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 0c1086ffdb3dd..49f45d66512eb 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -13,56 +13,46 @@ // limitations under the License. use std::collections::{HashMap, HashSet, LinkedList, VecDeque}; -use std::ops::BitOrAssign; use itertools::Itertools; use num_integer::Integer; -use risingwave_common::hash::WorkerId; use risingwave_pb::common::WorkerNode; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::hash::{VirtualNode, WorkerMapping}; +use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; /// Calculate a new vnode mapping, keeping locality and balance on a best effort basis. /// The strategy is similar to `rebalance_actor_vnode` used in meta node, but is modified to /// consider `max_parallelism` too. pub fn place_vnode( - hint_worker_mapping: Option<&WorkerMapping>, - workers: &[WorkerNode], + hint_pu_mapping: Option<&ParallelUnitMapping>, + new_workers: &[WorkerNode], max_parallelism: Option, -) -> Option { - #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] - struct WorkerSlot(WorkerId, usize); - - impl WorkerSlot { - fn worker_id(&self) -> WorkerId { - self.0 - } - } - // Get all serving worker slots from all available workers, grouped by worker id and ordered - // by worker slot id in each group. - let mut worker_slots: LinkedList<_> = workers +) -> Option { + // Get all serving parallel units from all available workers, grouped by worker id and ordered + // by parallel unit id in each group. + let mut new_pus: LinkedList<_> = new_workers .iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) .sorted_by_key(|w| w.id) - .map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlot(w.id, idx))) + .map(|w| w.parallel_units.clone().into_iter().sorted_by_key(|p| p.id)) .collect(); - // Set serving parallelism to the minimum of total number of worker slots, specified + // Set serving parallelism to the minimum of total number of parallel units, specified // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( - worker_slots.iter().map(|slots| slots.len()).sum(), + new_pus.iter().map(|pus| pus.len()).sum(), std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT), ); - // Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload + // Select `serving_parallelism` parallel units in a round-robin fashion, to distribute workload // evenly among workers. - let mut selected_slots = Vec::new(); - while !worker_slots.is_empty() { - worker_slots - .extract_if(|slots| { - if let Some(slot) = slots.next() { - selected_slots.push(slot); + let mut selected_pu_ids = Vec::new(); + while !new_pus.is_empty() { + new_pus + .extract_if(|ps| { + if let Some(p) = ps.next() { + selected_pu_ids.push(p.id); false } else { true @@ -70,63 +60,57 @@ pub fn place_vnode( }) .for_each(drop); } - selected_slots.drain(serving_parallelism..); - let selected_slots_set: HashSet = selected_slots.iter().cloned().collect(); - if selected_slots_set.is_empty() { + selected_pu_ids.drain(serving_parallelism..); + let selected_pu_id_set: HashSet = selected_pu_ids.iter().cloned().collect(); + if selected_pu_id_set.is_empty() { return None; } - // Calculate balance for each selected worker slot. Initially, each worker slot is assigned + // Calculate balance for each selected parallel unit. Initially, each parallel unit is assigned // no vnodes. Thus its negative balance means that many vnodes should be assigned to it later. - // `is_temp` is a mark for a special temporary worker slot, only to simplify implementation. + // `is_temp` is a mark for a special temporary parallel unit, only to simplify implementation. #[derive(Debug)] struct Balance { - slot: WorkerSlot, + pu_id: ParallelUnitId, balance: i32, builder: BitmapBuilder, is_temp: bool, } - - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_slots.len()); - let mut balances: HashMap = HashMap::default(); - - for slot in &selected_slots { + let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_pu_ids.len()); + let mut balances: HashMap = HashMap::default(); + for pu_id in &selected_pu_ids { let mut balance = Balance { - slot: *slot, + pu_id: *pu_id, balance: -(expected as i32), builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: false, }; - if remain > 0 { balance.balance -= 1; remain -= 1; } - balances.insert(*slot, balance); + balances.insert(*pu_id, balance); } // Now to maintain affinity, if a hint has been provided via `hint_pu_mapping`, follow // that mapping to adjust balances. - let mut temp_slot = Balance { - slot: WorkerSlot(0, usize::MAX), /* This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. */ + let mut temp_pu = Balance { + pu_id: 0, // This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. balance: 0, builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: true, }; - match hint_worker_mapping { - Some(hint_worker_mapping) => { - for (vnode, worker_id) in hint_worker_mapping.iter_with_vnode() { - let worker_slot = WorkerSlot(worker_id, 0); - - let b = if selected_slots_set.contains(&worker_slot) { - // Assign vnode to the same worker slot as hint. - balances.get_mut(&worker_slot).unwrap() + match hint_pu_mapping { + Some(hint_pu_mapping) => { + for (vnode, pu_id) in hint_pu_mapping.iter_with_vnode() { + let b = if selected_pu_id_set.contains(&pu_id) { + // Assign vnode to the same parallel unit as hint. + balances.get_mut(&pu_id).unwrap() } else { - // Assign vnode that doesn't belong to any worker slot to `temp_pu` + // Assign vnode that doesn't belong to any parallel unit to `temp_pu` // temporarily. They will be reassigned later. - &mut temp_slot + &mut temp_pu }; - b.balance += 1; b.builder.set(vnode.to_index(), true); } @@ -134,33 +118,31 @@ pub fn place_vnode( None => { // No hint is provided, assign all vnodes to `temp_pu`. for vnode in VirtualNode::all() { - temp_slot.balance += 1; - temp_slot.builder.set(vnode.to_index(), true); + temp_pu.balance += 1; + temp_pu.builder.set(vnode.to_index(), true); } } } - // The final step is to move vnodes from worker slots with positive balance to worker slots - // with negative balance, until all worker slots are of 0 balance. - // A double-ended queue with worker slots ordered by balance in descending order is consumed: - // 1. Peek 2 worker slots from front and back. + // The final step is to move vnodes from parallel units with positive balance to parallel units + // with negative balance, until all parallel units are of 0 balance. + // A double-ended queue with parallel units ordered by balance in descending order is consumed: + // 1. Peek 2 parallel units from front and back. // 2. It any of them is of 0 balance, pop it and go to step 1. // 3. Otherwise, move vnodes from front to back. let mut balances: VecDeque<_> = balances .into_values() - .chain(std::iter::once(temp_slot)) + .chain(std::iter::once(temp_pu)) .sorted_by_key(|b| b.balance) .rev() .collect(); - - let mut results: HashMap = HashMap::default(); - + let mut results: HashMap = HashMap::default(); while !balances.is_empty() { if balances.len() == 1 { let single = balances.pop_front().unwrap(); assert_eq!(single.balance, 0); if !single.is_temp { - results.insert(single.slot, single.builder.finish()); + results.insert(single.pu_id, single.builder.finish()); } break; } @@ -184,43 +166,32 @@ pub fn place_vnode( if src.balance != 0 { balances.push_front(src); } else if !src.is_temp { - results.insert(src.slot, src.builder.finish()); + results.insert(src.pu_id, src.builder.finish()); } if dst.balance != 0 { balances.push_back(dst); } else if !dst.is_temp { - results.insert(dst.slot, dst.builder.finish()); + results.insert(dst.pu_id, dst.builder.finish()); } } - let mut worker_result = HashMap::new(); - - for (worker_slot, bitmap) in results { - let worker_id = worker_slot.worker_id(); - worker_result - .entry(worker_id) - .or_insert(BitmapBuilder::zeroed(VirtualNode::COUNT).finish()) - .bitor_assign(&bitmap); - } - - Some(WorkerMapping::from_bitmaps(&worker_result)) + Some(ParallelUnitMapping::from_bitmaps(&results)) } #[cfg(test)] mod tests { use std::collections::HashMap; - use risingwave_common::hash::WorkerMapping; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{ParallelUnit, WorkerNode}; - use crate::hash::{ParallelUnitId, VirtualNode}; + use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use crate::vnode_mapping::vnode_placement::place_vnode; + #[test] fn test_place_vnode() { assert_eq!(VirtualNode::COUNT, 256); - let mut pu_id_counter: ParallelUnitId = 0; let mut pu_to_worker: HashMap = Default::default(); let serving_property = Property { @@ -245,13 +216,13 @@ mod tests { results }; - let count_same_vnode_mapping = |wm1: &WorkerMapping, wm2: &WorkerMapping| { - assert_eq!(wm1.len(), 256); - assert_eq!(wm2.len(), 256); + let count_same_vnode_mapping = |pm1: &ParallelUnitMapping, pm2: &ParallelUnitMapping| { + assert_eq!(pm1.len(), 256); + assert_eq!(pm2.len(), 256); let mut count: usize = 0; for idx in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(idx); - if wm1.get(vnode) == wm2.get(vnode) { + if pm1.get(vnode) == pm2.get(vnode) { count += 1; } } @@ -264,32 +235,29 @@ mod tests { property: Some(serving_property.clone()), ..Default::default() }; - assert!( place_vnode(None, &[worker_1.clone()], Some(0)).is_none(), "max_parallelism should >= 0" ); - let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); - assert_eq!(re_worker_mapping_2.iter_unique().count(), 1); - + let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); + assert_eq!(re_pu_mapping_2.iter_unique().count(), 1); let worker_2 = WorkerNode { id: 2, parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker), property: Some(serving_property.clone()), ..Default::default() }; - - let re_worker_mapping = place_vnode( - Some(&re_worker_mapping_2), + let re_pu_mapping = place_vnode( + Some(&re_pu_mapping_2), &[worker_1.clone(), worker_2.clone()], None, ) .unwrap(); - assert_eq!(re_worker_mapping.iter_unique().count(), 2); + assert_eq!(re_pu_mapping.iter_unique().count(), 51); // 1 * 256 + 0 -> 51 * 5 + 1 - let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping); + let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); assert!(score >= 5); let worker_3 = WorkerNode { @@ -299,16 +267,16 @@ mod tests { ..Default::default() }; let re_pu_mapping_2 = place_vnode( - Some(&re_worker_mapping), + Some(&re_pu_mapping), &[worker_1.clone(), worker_2.clone(), worker_3.clone()], None, ) .unwrap(); // limited by total pu number - assert_eq!(re_pu_mapping_2.iter_unique().count(), 3); + assert_eq!(re_pu_mapping_2.iter_unique().count(), 111); // 51 * 5 + 1 -> 111 * 2 + 34 - let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_worker_mapping); + let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); assert!(score >= (2 + 50 * 2)); let re_pu_mapping = place_vnode( Some(&re_pu_mapping_2), @@ -317,7 +285,7 @@ mod tests { ) .unwrap(); // limited by max_parallelism - assert_eq!(re_pu_mapping.iter_unique().count(), 3); + assert_eq!(re_pu_mapping.iter_unique().count(), 50); // 111 * 2 + 34 -> 50 * 5 + 6 let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2); assert!(score >= 50 * 2); @@ -327,20 +295,20 @@ mod tests { None, ) .unwrap(); - assert_eq!(re_pu_mapping_2.iter_unique().count(), 3); + assert_eq!(re_pu_mapping_2.iter_unique().count(), 111); // 50 * 5 + 6 -> 111 * 2 + 34 let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); assert!(score >= 50 * 2); let re_pu_mapping = place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], None).unwrap(); // limited by total pu number - assert_eq!(re_pu_mapping.iter_unique().count(), 2); + assert_eq!(re_pu_mapping.iter_unique().count(), 61); // 111 * 2 + 34 -> 61 * 4 + 12 let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2); assert!(score >= 61 * 2); assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none()); let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], None).unwrap(); - assert_eq!(re_pu_mapping.iter_unique().count(), 1); + assert_eq!(re_pu_mapping.iter_unique().count(), 60); assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none()); } } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 050ae1c089f6c..ddf6ca489bf0c 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use parking_lot::RwLock; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::catalog::CatalogVersion; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend}; @@ -27,7 +27,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{FragmentWorkerMapping, MetaSnapshot, SubscribeResponse}; +use risingwave_pb::meta::{FragmentParallelUnitMapping, MetaSnapshot, SubscribeResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::sync::watch::Sender; @@ -72,6 +72,7 @@ impl ObserverState for FrontendObserverNode { Info::User(_) => { self.handle_user_notification(resp); } + Info::ParallelUnitMapping(_) => self.handle_fragment_mapping_notification(resp), Info::Snapshot(_) => { panic!( "receiving a snapshot in the middle is unsupported now {:?}", @@ -102,9 +103,8 @@ impl ObserverState for FrontendObserverNode { Info::HummockStats(stats) => { self.handle_table_stats_notification(stats); } - Info::StreamingWorkerMapping(_) => self.handle_fragment_mapping_notification(resp), - Info::ServingWorkerMappings(m) => { - self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()) + Info::ServingParallelUnitMappings(m) => { + self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()); } Info::Recovery(_) => { self.compute_client_pool.invalidate_all(); @@ -133,13 +133,13 @@ impl ObserverState for FrontendObserverNode { functions, connections, users, + parallel_unit_mappings, + serving_parallel_unit_mappings, nodes, hummock_snapshot, hummock_version: _, meta_backup_manifest_id: _, hummock_write_limits: _, - streaming_worker_mappings, - serving_worker_mappings, session_params, version, } = snapshot; @@ -177,11 +177,10 @@ impl ObserverState for FrontendObserverNode { for user in users { user_guard.create_user(user) } - self.worker_node_manager.refresh( nodes, - convert_worker_mapping(&streaming_worker_mappings), - convert_worker_mapping(&serving_worker_mappings), + convert_pu_mapping(¶llel_unit_mappings), + convert_pu_mapping(&serving_parallel_unit_mappings), ); self.hummock_snapshot_manager .update(hummock_snapshot.unwrap()); @@ -388,10 +387,12 @@ impl FrontendObserverNode { return; }; match info { - Info::StreamingWorkerMapping(streaming_worker_mapping) => { - let fragment_id = streaming_worker_mapping.fragment_id; + Info::ParallelUnitMapping(parallel_unit_mapping) => { + let fragment_id = parallel_unit_mapping.fragment_id; let mapping = || { - WorkerMapping::from_protobuf(streaming_worker_mapping.mapping.as_ref().unwrap()) + ParallelUnitMapping::from_protobuf( + parallel_unit_mapping.mapping.as_ref().unwrap(), + ) }; match resp.operation() { @@ -416,20 +417,20 @@ impl FrontendObserverNode { fn handle_fragment_serving_mapping_notification( &mut self, - mappings: Vec, + mappings: Vec, op: Operation, ) { match op { Operation::Add | Operation::Update => { self.worker_node_manager - .upsert_serving_fragment_mapping(convert_worker_mapping(&mappings)); + .upsert_serving_fragment_mapping(convert_pu_mapping(&mappings)); } Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping( &mappings.into_iter().map(|m| m.fragment_id).collect_vec(), ), Operation::Snapshot => { self.worker_node_manager - .set_serving_fragment_mapping(convert_worker_mapping(&mappings)); + .set_serving_fragment_mapping(convert_pu_mapping(&mappings)); } _ => panic!("receive an unsupported notify {:?}", op), } @@ -469,17 +470,17 @@ impl FrontendObserverNode { } } -fn convert_worker_mapping( - worker_mappings: &[FragmentWorkerMapping], -) -> HashMap { - worker_mappings +fn convert_pu_mapping( + parallel_unit_mappings: &[FragmentParallelUnitMapping], +) -> HashMap { + parallel_unit_mappings .iter() .map( - |FragmentWorkerMapping { + |FragmentParallelUnitMapping { fragment_id, mapping, }| { - let mapping = WorkerMapping::from_protobuf(mapping.as_ref().unwrap()); + let mapping = ParallelUnitMapping::from_protobuf(mapping.as_ref().unwrap()); (*fragment_id, mapping) }, ) diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index d6f58aa01205b..4999e1d8630bf 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -51,7 +51,7 @@ use generic::PhysicalPlanRef; use itertools::Itertools; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::{FieldDisplay, Schema, TableId}; -use risingwave_common::hash::WorkerId; +use risingwave_common::hash::ParallelUnitId; use risingwave_pb::batch_plan::exchange_info::{ ConsistentHashInfo, Distribution as DistributionPb, DistributionMode, HashInfo, }; @@ -149,17 +149,14 @@ impl Distribution { let vnode_mapping = worker_node_manager .fragment_mapping(Self::get_fragment_id(catalog_reader, table_id)?)?; - let worker_to_id_map: HashMap = vnode_mapping + let pu2id_map: HashMap = vnode_mapping .iter_unique() .enumerate() - .map(|(i, worker_id)| (worker_id, i as u32)) + .map(|(i, pu)| (pu, i as u32)) .collect(); Some(DistributionPb::ConsistentHashInfo(ConsistentHashInfo { - vmap: vnode_mapping - .iter() - .map(|x| worker_to_id_map[&x]) - .collect_vec(), + vmap: vnode_mapping.iter().map(|x| pu2id_map[&x]).collect_vec(), key: key.iter().map(|num| *num as u32).collect(), })) } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 36c32847f4992..c6e866630067b 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -477,7 +477,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::WorkerMapping; + use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType}; @@ -722,9 +722,10 @@ pub(crate) mod tests { let workers = vec![worker1, worker2, worker3]; let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers)); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); - worker_node_manager.insert_streaming_fragment_mapping(0, WorkerMapping::new_single(0)); + worker_node_manager + .insert_streaming_fragment_mapping(0, ParallelUnitMapping::new_single(0)); worker_node_manager.set_serving_fragment_mapping( - vec![(0, WorkerMapping::new_single(0))] + vec![(0, ParallelUnitMapping::new_single(0))] .into_iter() .collect(), ); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index fc83ad0b5c25a..193ee6417abdc 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -32,7 +32,7 @@ use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::source::SplitMetaData; @@ -353,13 +353,12 @@ impl StageRunner { // We let each task read one partition by setting the `vnode_ranges` of the scan node in // the task. // We schedule the task to the worker node that owns the data partition. - let worker_ids = vnode_bitmaps.keys().cloned().collect_vec(); + let parallel_unit_ids = vnode_bitmaps.keys().cloned().collect_vec(); let workers = self .worker_node_manager .manager - .get_workers_by_worker_ids(&worker_ids)?; - - for (i, (worker_id, worker)) in worker_ids + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + for (i, (parallel_unit_id, worker)) in parallel_unit_ids .into_iter() .zip_eq_fast(workers.into_iter()) .enumerate() @@ -369,7 +368,7 @@ impl StageRunner { stage_id: self.stage.id, task_id: i as u32, }; - let vnode_ranges = vnode_bitmaps[&worker_id].clone(); + let vnode_ranges = vnode_bitmaps[¶llel_unit_id].clone(); let plan_fragment = self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); futures.push(self.schedule_task( @@ -680,7 +679,10 @@ impl StageRunner { } #[inline(always)] - fn get_table_dml_vnode_mapping(&self, table_id: &TableId) -> SchedulerResult { + fn get_table_dml_vnode_mapping( + &self, + table_id: &TableId, + ) -> SchedulerResult { let guard = self.catalog_reader.read_guard(); let table = guard @@ -709,11 +711,11 @@ impl StageRunner { if let Some(table_id) = dml_table_id { let vnode_mapping = self.get_table_dml_vnode_mapping(&table_id)?; - let worker_ids = vnode_mapping.iter_unique().collect_vec(); + let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self .worker_node_manager .manager - .get_workers_by_worker_ids(&worker_ids)?; + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } @@ -739,17 +741,17 @@ impl StageRunner { .table_id .into(), )?; - let id_to_workers = self + let id2pu_vec = self .worker_node_manager .fragment_mapping(fragment_id)? .iter_unique() .collect_vec(); - let worker_id = id_to_workers[task_id as usize]; + let pu = id2pu_vec[task_id as usize]; let candidates = self .worker_node_manager .manager - .get_workers_by_worker_ids(&[worker_id])?; + .get_workers_by_parallel_unit_ids(&[pu])?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index a97de0629f5fb..7877ab658b792 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -30,7 +30,7 @@ use risingwave_batch::task::{ShutdownToken, TaskId}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_common::bail; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::tracing::{InstrumentStream, TracingContext}; use risingwave_connector::source::SplitMetaData; @@ -312,12 +312,12 @@ impl LocalQueryExecution { // Similar to the distributed case (StageRunner::schedule_tasks). // Set `vnode_ranges` of the scan node in `local_execute_plan` of each // `exchange_source`. - let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = + let (parallel_unit_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = vnode_bitmaps.clone().into_iter().unzip(); let workers = self .worker_node_manager .manager - .get_workers_by_worker_ids(&worker_ids)?; + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; for (idx, (worker_node, partition)) in (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate() { @@ -562,7 +562,10 @@ impl LocalQueryExecution { } #[inline(always)] - fn get_table_dml_vnode_mapping(&self, table_id: &TableId) -> SchedulerResult { + fn get_table_dml_vnode_mapping( + &self, + table_id: &TableId, + ) -> SchedulerResult { let guard = self.front_env.catalog_reader().read_guard(); let table = guard @@ -586,11 +589,11 @@ impl LocalQueryExecution { // dml should use streaming vnode mapping let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?; let worker_node = { - let worker_ids = vnode_mapping.iter_unique().collect_vec(); + let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self .worker_node_manager .manager - .get_workers_by_worker_ids(&worker_ids)?; + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 33113028e97dc..f5ed1b1205b69 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -30,7 +30,7 @@ use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; -use risingwave_common::hash::{VirtualNode, WorkerId, WorkerMapping}; +use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; @@ -392,12 +392,12 @@ pub struct TableScanInfo { /// full vnode bitmap, since we need to know where to schedule the singleton scan task. /// /// `None` iff the table is a system table. - partitions: Option>, + partitions: Option>, } impl TableScanInfo { /// For normal tables, `partitions` should always be `Some`. - pub fn new(name: String, partitions: HashMap) -> Self { + pub fn new(name: String, partitions: HashMap) -> Self { Self { name, partitions: Some(partitions), @@ -1150,10 +1150,10 @@ impl BatchPlanFragmenter { fn derive_partitions( scan_ranges: &[ScanRange], table_desc: &TableDesc, - vnode_mapping: &WorkerMapping, -) -> SchedulerResult> { + vnode_mapping: &ParallelUnitMapping, +) -> SchedulerResult> { let num_vnodes = vnode_mapping.len(); - let mut partitions: HashMap)> = HashMap::new(); + let mut partitions: HashMap)> = HashMap::new(); if scan_ranges.is_empty() { return Ok(vnode_mapping @@ -1181,25 +1181,24 @@ fn derive_partitions( match vnode { None => { // put this scan_range to all partitions - vnode_mapping - .to_bitmaps() - .into_iter() - .for_each(|(worker_id, vnode_bitmap)| { + vnode_mapping.to_bitmaps().into_iter().for_each( + |(parallel_unit_id, vnode_bitmap)| { let (bitmap, scan_ranges) = partitions - .entry(worker_id) + .entry(parallel_unit_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); vnode_bitmap .iter() .enumerate() .for_each(|(vnode, b)| bitmap.set(vnode, b)); scan_ranges.push(scan_range.to_protobuf()); - }); + }, + ); } // scan a single partition Some(vnode) => { - let worker_id = vnode_mapping[vnode]; + let parallel_unit_id = vnode_mapping[vnode]; let (bitmap, scan_ranges) = partitions - .entry(worker_id) + .entry(parallel_unit_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); bitmap.set(vnode.to_index(), true); scan_ranges.push(scan_range.to_protobuf()); diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index e7495535f9100..e9a5e4a017ad4 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -24,7 +24,8 @@ use risingwave_pb::hummock::WriteLimits; use risingwave_pb::meta::meta_snapshot::SnapshotVersion; use risingwave_pb::meta::notification_service_server::NotificationService; use risingwave_pb::meta::{ - FragmentWorkerMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest, SubscribeType, + FragmentParallelUnitMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest, + SubscribeType, }; use risingwave_pb::user::UserInfo; use tokio::sync::mpsc; @@ -137,9 +138,9 @@ impl NotificationServiceImpl { } } - async fn get_worker_mapping_snapshot( + async fn get_parallel_unit_mapping_snapshot( &self, - ) -> MetaResult<(Vec, NotificationVersion)> { + ) -> MetaResult<(Vec, NotificationVersion)> { match &self.metadata_manager { MetadataManager::V1(mgr) => { let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await; @@ -160,11 +161,11 @@ impl NotificationServiceImpl { } } - fn get_serving_vnode_mappings(&self) -> Vec { + fn get_serving_vnode_mappings(&self) -> Vec { self.serving_vnode_mapping .all() .iter() - .map(|(fragment_id, mapping)| FragmentWorkerMapping { + .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { fragment_id: *fragment_id, mapping: Some(mapping.to_protobuf()), }) @@ -240,11 +241,9 @@ impl NotificationServiceImpl { users, catalog_version, ) = self.get_catalog_snapshot().await?; - - let (streaming_worker_mappings, streaming_worker_mapping_version) = - self.get_worker_mapping_snapshot().await?; - let serving_worker_mappings = self.get_serving_vnode_mappings(); - + let (parallel_unit_mappings, parallel_unit_mapping_version) = + self.get_parallel_unit_mapping_snapshot().await?; + let serving_parallel_unit_mappings = self.get_serving_vnode_mappings(); let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?; let hummock_snapshot = Some(self.hummock_manager.latest_snapshot()); @@ -271,15 +270,15 @@ impl NotificationServiceImpl { functions, connections, users, + parallel_unit_mappings, nodes, hummock_snapshot, + serving_parallel_unit_mappings, version: Some(SnapshotVersion { catalog_version, + parallel_unit_mapping_version, worker_node_version, - streaming_worker_mapping_version, }), - serving_worker_mappings, - streaming_worker_mappings, session_params, ..Default::default() }) diff --git a/src/meta/service/src/serving_service.rs b/src/meta/service/src/serving_service.rs index bf2f10ba56207..d1b013e078e0f 100644 --- a/src/meta/service/src/serving_service.rs +++ b/src/meta/service/src/serving_service.rs @@ -16,7 +16,7 @@ use itertools::Itertools; use risingwave_meta::manager::MetadataManager; use risingwave_pb::meta::serving_service_server::ServingService; use risingwave_pb::meta::{ - FragmentWorkerMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse, + FragmentParallelUnitMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse, }; use tonic::{Request, Response, Status}; @@ -49,7 +49,7 @@ impl ServingService for ServingServiceImpl { .serving_vnode_mapping .all() .into_iter() - .map(|(fragment_id, mapping)| FragmentWorkerMapping { + .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { fragment_id, mapping: Some(mapping.to_protobuf()), }) @@ -78,8 +78,8 @@ impl ServingService for ServingServiceImpl { } }; Ok(Response::new(GetServingVnodeMappingsResponse { + mappings, fragment_to_table, - worker_mappings: mappings, })) } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 1571e7eb11c00..111e8e5ab9fe3 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -21,7 +21,6 @@ use itertools::Itertools; use risingwave_common::catalog::{ is_subscription_internal_table, TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS, }; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -46,9 +45,7 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbFragmentWorkerMapping, PbRelation, PbRelationGroup, -}; +use risingwave_pb::meta::{PbRelation, PbRelationGroup}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; @@ -66,9 +63,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - get_fragment_mappings, get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, - get_referring_objects, get_referring_objects_cascade, get_user_privilege, - list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, + get_fragment_mappings, get_fragment_mappings_by_jobs, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -277,30 +274,8 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; - - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; - let fragment_mappings = fragment_mappings - .into_iter() - .map( - |FragmentParallelUnitMapping { - fragment_id, - mapping, - }| { - PbFragmentWorkerMapping { - fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(), - ), - } - }, - ) - .collect(); - // The schema and objects in the database will be delete cascade. let res = Object::delete_by_id(database_id).exec(&txn).await?; if res.rows_affected == 0 { @@ -320,7 +295,6 @@ impl CatalogController { }), ) .await; - self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) .await; Ok(( @@ -2089,7 +2063,6 @@ impl CatalogController { let (source_fragments, removed_actors) = resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; @@ -2116,8 +2089,6 @@ impl CatalogController { } let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - txn.commit().await?; // notify about them. @@ -2192,26 +2163,6 @@ impl CatalogController { NotificationInfo::RelationGroup(PbRelationGroup { relations }), ) .await; - - let fragment_mappings = fragment_mappings - .into_iter() - .map( - |FragmentParallelUnitMapping { - fragment_id, - mapping, - }| { - PbFragmentWorkerMapping { - fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(), - ), - } - }, - ) - .collect(); - self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) .await; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index d07366be2ca0c..552008914d76e 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -18,7 +18,6 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; @@ -34,7 +33,9 @@ use risingwave_pb::meta::subscribe_response::{ use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; -use risingwave_pb::meta::{FragmentWorkerMapping, PbFragmentWorkerMapping, PbTableFragments}; +use risingwave_pb::meta::{ + FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbTableFragments, +}; use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ @@ -49,8 +50,7 @@ use sea_orm::{ use crate::controller::catalog::{CatalogController, CatalogControllerInner}; use crate::controller::utils::{ - get_actor_dispatchers, get_parallel_unit_to_worker_map, FragmentDesc, PartialActorLocation, - PartialFragmentStateTables, + get_actor_dispatchers, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; use crate::manager::{ActorInfos, LocalNotification}; use crate::model::TableParallelism; @@ -61,9 +61,7 @@ impl CatalogControllerInner { /// List all fragment vnode mapping info for all CREATED streaming jobs. pub async fn all_running_fragment_mappings( &self, - ) -> MetaResult + '_> { - let txn = self.db.begin().await?; - + ) -> MetaResult + '_> { let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .join(JoinType::InnerJoin, fragment::Relation::Object.def()) .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) @@ -71,23 +69,14 @@ impl CatalogControllerInner { .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .into_tuple() - .all(&txn) + .all(&self.db) .await?; - - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - - Ok(fragment_mappings - .into_iter() - .map(move |(fragment_id, mapping)| { - let worker_mapping = ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(); - - FragmentWorkerMapping { - fragment_id: fragment_id as _, - mapping: Some(worker_mapping), - } - })) + Ok(fragment_mappings.into_iter().map(|(fragment_id, mapping)| { + FragmentParallelUnitMapping { + fragment_id: fragment_id as _, + mapping: Some(mapping.to_protobuf()), + } + })) } } @@ -95,7 +84,7 @@ impl CatalogController { pub(crate) async fn notify_fragment_mapping( &self, operation: NotificationOperation, - fragment_mappings: Vec, + fragment_mappings: Vec, ) { let fragment_ids = fragment_mappings .iter() @@ -107,7 +96,7 @@ impl CatalogController { .notification_manager() .notify_frontend( operation, - NotificationInfo::StreamingWorkerMapping(fragment_mapping), + NotificationInfo::ParallelUnitMapping(fragment_mapping), ) .await; } @@ -947,21 +936,15 @@ impl CatalogController { .await?; } - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - txn.commit().await?; self.notify_fragment_mapping( NotificationOperation::Update, fragment_mapping .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerMapping { + .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(), - ), + mapping: Some(mapping.to_protobuf()), }) .collect(), ) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 10e33920d222d..271860ee49561 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -42,7 +42,9 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, Operation, }; use risingwave_pb::meta::table_fragments::PbActorStatus; -use risingwave_pb::meta::{FragmentWorkerMapping, PbRelation, PbRelationGroup, PbTableFragments}; +use risingwave_pb::meta::{ + FragmentParallelUnitMapping, PbRelation, PbRelationGroup, PbTableFragments, +}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -63,7 +65,7 @@ use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id, - get_fragment_actor_ids, get_fragment_mappings, get_parallel_unit_to_worker_map, + get_fragment_actor_ids, get_fragment_mappings, }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; @@ -1031,8 +1033,6 @@ impl CatalogController { let txn = inner.db.begin().await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - let mut fragment_mapping_to_notify = vec![]; // for assert only @@ -1214,13 +1214,9 @@ impl CatalogController { fragment.vnode_mapping = Set((&vnode_mapping).into()); fragment.update(&txn).await?; - let worker_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(); - - fragment_mapping_to_notify.push(FragmentWorkerMapping { + fragment_mapping_to_notify.push(FragmentParallelUnitMapping { fragment_id: fragment_id as u32, - mapping: Some(worker_mapping), + mapping: Some(vnode_mapping), }); // for downstream and upstream diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 55fff8e4b6e8b..173634ab03cc7 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -16,7 +16,6 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; @@ -24,12 +23,12 @@ use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, - object_dependency, schema, sink, source, table, user, user_privilege, view, worker_property, - ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, - PrivilegeId, SchemaId, SourceId, StreamNode, UserId, WorkerId, + object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId, + DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, + SchemaId, SourceId, StreamNode, UserId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; -use risingwave_pb::meta::{PbFragmentParallelUnitMapping, PbFragmentWorkerMapping}; +use risingwave_pb::meta::PbFragmentParallelUnitMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; @@ -790,12 +789,10 @@ where pub async fn get_fragment_mappings( db: &C, job_id: ObjectId, -) -> MetaResult> +) -> MetaResult> where C: ConnectionTrait, { - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(db).await?; - let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .select_only() .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) @@ -806,13 +803,9 @@ where Ok(fragment_mappings .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerMapping { + .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(), - ), + mapping: Some(mapping.to_protobuf()), }) .collect()) } @@ -929,30 +922,3 @@ where Ok((source_fragment_ids, actors.into_iter().collect())) } - -pub(crate) async fn get_parallel_unit_to_worker_map(db: &C) -> MetaResult> -where - C: ConnectionTrait, -{ - let worker_parallel_units = WorkerProperty::find() - .select_only() - .columns([ - worker_property::Column::WorkerId, - worker_property::Column::ParallelUnitIds, - ]) - .into_tuple::<(WorkerId, I32Array)>() - .all(db) - .await?; - - let parallel_unit_to_worker = worker_parallel_units - .into_iter() - .flat_map(|(worker_id, parallel_unit_ids)| { - parallel_unit_ids - .into_inner() - .into_iter() - .map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32)) - }) - .collect::>(); - - Ok(parallel_unit_to_worker) -} diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index ac523fa40d5f1..feb3dc3026dbf 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -24,11 +24,10 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping} use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerMapping}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; -use risingwave_pb::meta::FragmentWorkerMapping; +use risingwave_pb::meta::FragmentParallelUnitMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ @@ -56,21 +55,18 @@ impl FragmentManagerCore { /// List all fragment vnode mapping info that not in `State::Initial`. pub fn all_running_fragment_mappings( &self, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { self.table_fragments .values() .filter(|tf| tf.state() != State::Initial) .flat_map(|table_fragments| { - table_fragments - .fragments - .values() - .map(move |fragment| FragmentWorkerMapping { + table_fragments.fragments.values().map(|fragment| { + let parallel_unit_mapping = fragment.vnode_mapping.clone().unwrap(); + FragmentParallelUnitMapping { fragment_id: fragment.fragment_id, - mapping: Some(FragmentManager::convert_mapping( - &table_fragments.actor_status, - fragment.vnode_mapping.as_ref().unwrap(), - )), - }) + mapping: Some(parallel_unit_mapping), + } + }) }) } @@ -195,20 +191,18 @@ impl FragmentManager { async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let fragment_mapping = FragmentWorkerMapping { + let mapping = fragment + .vnode_mapping + .clone() + .expect("no data distribution found"); + let fragment_mapping = FragmentParallelUnitMapping { fragment_id: fragment.fragment_id, - mapping: Some(Self::convert_mapping( - &table_fragment.actor_status, - fragment - .vnode_mapping - .as_ref() - .expect("no data distribution found"), - )), + mapping: Some(mapping), }; self.env .notification_manager() - .notify_frontend(operation, Info::StreamingWorkerMapping(fragment_mapping)) + .notify_frontend(operation, Info::ParallelUnitMapping(fragment_mapping)) .await; } @@ -1276,14 +1270,11 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); - let worker_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); - // Notify fragment mapping to frontend nodes. - let fragment_mapping = FragmentWorkerMapping { + let fragment_mapping = FragmentParallelUnitMapping { fragment_id: *fragment_id as FragmentId, - mapping: Some(worker_mapping), + mapping: Some(vnode_mapping), }; - fragment_mapping_to_notify.push(fragment_mapping); } @@ -1403,30 +1394,13 @@ impl FragmentManager { for mapping in fragment_mapping_to_notify { self.env .notification_manager() - .notify_frontend(Operation::Update, Info::StreamingWorkerMapping(mapping)) + .notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping)) .await; } Ok(()) } - fn convert_mapping( - actor_status: &BTreeMap, - vnode_mapping: &PbParallelUnitMapping, - ) -> PbWorkerMapping { - let parallel_unit_to_worker = actor_status - .values() - .map(|actor_status| { - let parallel_unit = actor_status.get_parallel_unit().unwrap(); - (parallel_unit.id, parallel_unit.worker_node_id) - }) - .collect(); - - ParallelUnitMapping::from_protobuf(vnode_mapping) - .to_worker(¶llel_unit_to_worker) - .to_protobuf() - } - pub async fn table_node_actors( &self, table_ids: &HashSet, diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 67926d9b6c4b7..36e7b77ccf63a 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,11 +16,11 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{FragmentWorkerMapping, FragmentWorkerMappings}; +use risingwave_pb::meta::{FragmentParallelUnitMapping, FragmentParallelUnitMappings}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; @@ -31,11 +31,11 @@ pub type ServingVnodeMappingRef = Arc; #[derive(Default)] pub struct ServingVnodeMapping { - serving_vnode_mappings: RwLock>, + serving_vnode_mappings: RwLock>, } impl ServingVnodeMapping { - pub fn all(&self) -> HashMap { + pub fn all(&self) -> HashMap { self.serving_vnode_mappings.read().clone() } @@ -45,9 +45,9 @@ impl ServingVnodeMapping { &self, streaming_parallelisms: HashMap, workers: &[WorkerNode], - ) -> (HashMap, Vec) { + ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); - let mut upserted: HashMap = HashMap::default(); + let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; for (fragment_id, streaming_parallelism) in streaming_parallelisms { let new_mapping = { @@ -81,24 +81,24 @@ impl ServingVnodeMapping { } } -pub(crate) fn to_fragment_worker_mapping( - mappings: &HashMap, -) -> Vec { +pub(crate) fn to_fragment_parallel_unit_mapping( + mappings: &HashMap, +) -> Vec { mappings .iter() - .map(|(fragment_id, mapping)| FragmentWorkerMapping { + .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { fragment_id: *fragment_id, mapping: Some(mapping.to_protobuf()), }) .collect() } -pub(crate) fn to_deleted_fragment_worker_mapping( +pub(crate) fn to_deleted_fragment_parallel_unit_mapping( fragment_ids: &[FragmentId], -) -> Vec { +) -> Vec { fragment_ids .iter() - .map(|fragment_id| FragmentWorkerMapping { + .map(|fragment_id| FragmentParallelUnitMapping { fragment_id: *fragment_id, mapping: None, }) @@ -120,8 +120,8 @@ pub async fn on_meta_start( ); notification_manager.notify_frontend_without_version( Operation::Snapshot, - Info::ServingWorkerMappings(FragmentWorkerMappings { - mappings: to_fragment_worker_mapping(&mappings), + Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { + mappings: to_fragment_parallel_unit_mapping(&mappings), }), ); } @@ -185,7 +185,7 @@ pub async fn start_serving_vnode_mapping_worker( let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys()); - notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_fragment_worker_mapping(&mappings) })); + notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) })); } LocalNotification::FragmentMappingsUpsert(fragment_ids) => { if fragment_ids.is_empty() { @@ -195,11 +195,11 @@ pub async fn start_serving_vnode_mapping_worker( let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); if !upserted.is_empty() { tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); - notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_fragment_worker_mapping(&upserted) })); + notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) })); } if !failed.is_empty() { tracing::debug!("Fail to update serving vnode mapping for fragments {:?}.", failed); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_deleted_fragment_worker_mapping(&failed)})); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&failed)})); } } LocalNotification::FragmentMappingsDelete(fragment_ids) => { @@ -208,7 +208,7 @@ pub async fn start_serving_vnode_mapping_worker( } tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids); serving_vnode_mapping.remove(&fragment_ids); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_deleted_fragment_worker_mapping(&fragment_ids) })); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&fragment_ids) })); } _ => {} } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index f0eb1f5718157..a1c0aaa735fe1 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,7 +41,7 @@ use risingwave_pb::meta::table_fragments::fragment::{ FragmentDistributionType, PbFragmentDistributionType, }; use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State}; -use risingwave_pb::meta::FragmentWorkerMappings; +use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbStreamActor, StreamNode, @@ -59,7 +59,8 @@ use crate::manager::{ }; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ - to_deleted_fragment_worker_mapping, to_fragment_worker_mapping, ServingVnodeMapping, + to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping, + ServingVnodeMapping, }; use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY}; use crate::stream::{GlobalStreamManager, SourceManagerRef}; @@ -1702,8 +1703,8 @@ impl ScaleController { .notification_manager() .notify_frontend_without_version( Operation::Update, - Info::ServingWorkerMappings(FragmentWorkerMappings { - mappings: to_fragment_worker_mapping(&upserted), + Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { + mappings: to_fragment_parallel_unit_mapping(&upserted), }), ); } @@ -1716,8 +1717,8 @@ impl ScaleController { .notification_manager() .notify_frontend_without_version( Operation::Delete, - Info::ServingWorkerMappings(FragmentWorkerMappings { - mappings: to_deleted_fragment_worker_mapping(&failed), + Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { + mappings: to_deleted_fragment_parallel_unit_mapping(&failed), }), ); } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index ad56eecf59d4b..fbb8dff1f5a98 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,7 +25,7 @@ use futures::stream::BoxStream; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -1156,11 +1156,13 @@ impl MetaClient { Ok(resp.tables) } - pub async fn list_serving_vnode_mappings(&self) -> Result> { + pub async fn list_serving_vnode_mappings( + &self, + ) -> Result> { let req = GetServingVnodeMappingsRequest {}; let resp = self.inner.get_serving_vnode_mappings(req).await?; let mappings = resp - .worker_mappings + .mappings .into_iter() .map(|p| { ( @@ -1170,7 +1172,7 @@ impl MetaClient { .get(&p.fragment_id) .cloned() .unwrap_or(0), - WorkerMapping::from_protobuf(p.mapping.as_ref().unwrap()), + ParallelUnitMapping::from_protobuf(p.mapping.as_ref().unwrap()), ), ) })