diff --git a/metadata.sqlite b/metadata.sqlite deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 6a254332fc712..8a196eecf5021 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -225,7 +225,7 @@ message TableFunctionNode { message TaskId { string query_id = 1; uint32 stage_id = 2; - uint32 task_id = 3; + uint64 task_id = 3; } // Every task will create N buffers (channels) for parent operators to fetch results from, @@ -233,7 +233,7 @@ message TaskId { message TaskOutputId { TaskId task_id = 1; // The id of output channel to fetch from - uint32 output_id = 2; + uint64 output_id = 2; } message LocalExecutePlan { @@ -270,7 +270,7 @@ message LocalLookupJoinNode { repeated uint32 inner_side_key = 4; uint32 lookup_prefix_len = 5; plan_common.StorageTableDesc inner_side_table_desc = 6; - repeated uint32 inner_side_vnode_mapping = 7; + repeated uint64 inner_side_vnode_mapping = 7; repeated int32 inner_side_column_ids = 8; repeated uint32 output_indices = 9; repeated common.WorkerNode worker_nodes = 10; diff --git a/proto/common.proto b/proto/common.proto index fab50dcfecac2..b77260c9a2174 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -93,9 +93,9 @@ message ParallelUnitMapping { } // Vnode mapping for stream fragments. Stores mapping from virtual node to worker id. -message WorkerMapping { +message WorkerSlotMapping { repeated uint32 original_indices = 1; - repeated uint32 data = 2; + repeated uint64 data = 2; } message BatchQueryEpoch { diff --git a/proto/meta.proto b/proto/meta.proto index df266b6d4eded..eaa7d3c1fe5e6 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -114,7 +114,7 @@ message FragmentParallelUnitMappings { /// Worker mapping with fragment id, used for notification. message FragmentWorkerMapping { uint32 fragment_id = 1; - common.WorkerMapping mapping = 2; + common.WorkerSlotMapping mapping = 2; } message FragmentWorkerMappings { diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 838e4b738e082..6a968540ca879 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -21,7 +21,7 @@ use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ - ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId, + ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId, }; use risingwave_common::memory::MemoryContext; use risingwave_common::types::{DataType, Datum}; @@ -51,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; struct InnerSideExecutorBuilder { table_desc: StorageTableDesc, table_distribution: TableDistribution, - vnode_mapping: ExpandedWorkerMapping, + vnode_mapping: ExpandedWorkerSlotMapping, outer_side_key_types: Vec, inner_side_schema: Schema, inner_side_column_ids: Vec, @@ -60,8 +60,8 @@ struct InnerSideExecutorBuilder { context: C, task_id: TaskId, epoch: BatchQueryEpoch, - worker_mapping: HashMap, - worker_to_scan_range_mapping: HashMap>, + worker_slot_mapping: HashMap, + worker_to_scan_range_mapping: HashMap>, chunk_size: usize, shutdown_rx: ShutdownToken, next_stage_id: usize, @@ -90,7 +90,7 @@ impl InnerSideExecutorBuilder { /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table /// based on the passed `scan_range` and virtual node. - fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result { + fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result { let list = self.worker_to_scan_range_mapping.get(id).unwrap(); let mut scan_ranges = vec![]; let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len()); @@ -113,9 +113,9 @@ impl InnerSideExecutorBuilder { } /// Creates the `PbExchangeSource` using the given `id`. - fn build_prost_exchange_source(&self, id: &WorkerId) -> Result { + fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result { let worker = self - .worker_mapping + .worker_slot_mapping .get(id) .context("No worker node found for the given worker id.")?; @@ -144,7 +144,7 @@ impl InnerSideExecutorBuilder { // conflict. query_id: self.task_id.query_id.clone(), stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32, - task_id: *id, + task_id: (*id).into(), }), output_id: 0, }), @@ -367,18 +367,27 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { let null_safe = lookup_join_node.get_null_safe().to_vec(); - let vnode_mapping = lookup_join_node.get_inner_side_vnode_mapping().to_vec(); + let vnode_mapping = lookup_join_node + .get_inner_side_vnode_mapping() + .iter() + .copied() + .map(WorkerSlotId::from) + .collect_vec(); + assert!(!vnode_mapping.is_empty()); let chunk_size = source.context.get_config().developer.chunk_size; let worker_nodes = lookup_join_node.get_worker_nodes(); - let worker_mapping: HashMap = worker_nodes + let worker_slot_mapping: HashMap = worker_nodes .iter() - .map(|worker| (worker.id, worker.clone())) + .flat_map(|worker| { + (0..(worker.parallel_units.len())) + .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone())) + }) .collect(); - assert_eq!(worker_mapping.len(), worker_nodes.len()); + assert_eq!(worker_slot_mapping.len(), worker_nodes.len()); let inner_side_builder = InnerSideExecutorBuilder { table_desc: table_desc.clone(), @@ -399,7 +408,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { chunk_size, shutdown_rx: source.shutdown_rx.clone(), next_stage_id: 0, - worker_mapping, + worker_slot_mapping, }; let identity = source.plan_node().get_identity().clone(); diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 87d94bc1d90b9..b4ad51da3bb41 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -79,7 +79,7 @@ impl StateReporter { #[derive(PartialEq, Eq, Hash, Clone, Debug, Default)] pub struct TaskId { - pub task_id: u32, + pub task_id: u64, pub stage_id: u32, pub query_id: String, } @@ -87,7 +87,7 @@ pub struct TaskId { #[derive(PartialEq, Eq, Hash, Clone, Default)] pub struct TaskOutputId { pub task_id: TaskId, - pub output_id: u32, + pub output_id: u64, } /// More compact formatter compared to derived `fmt::Debug`. diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index f2a0edf135104..863aca6d22a05 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -18,7 +18,7 @@ use std::time::Duration; use rand::seq::SliceRandom; use risingwave_common::bail; -use risingwave_common::hash::{WorkerId, WorkerMapping}; +use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; @@ -36,9 +36,9 @@ pub struct WorkerNodeManager { struct WorkerNodeManagerInner { worker_nodes: Vec, /// fragment vnode mapping info for streaming - streaming_fragment_vnode_mapping: HashMap, + streaming_fragment_vnode_mapping: HashMap, /// fragment vnode mapping info for serving - serving_fragment_vnode_mapping: HashMap, + serving_fragment_vnode_mapping: HashMap, } pub type WorkerNodeManagerRef = Arc; @@ -125,8 +125,8 @@ impl WorkerNodeManager { pub fn refresh( &self, nodes: Vec, - streaming_mapping: HashMap, - serving_mapping: HashMap, + streaming_mapping: HashMap, + serving_mapping: HashMap, ) { let mut write_guard = self.inner.write().unwrap(); tracing::debug!("Refresh worker nodes {:?}.", nodes); @@ -146,26 +146,32 @@ impl WorkerNodeManager { /// If worker ids is empty, the scheduler may fail to schedule any task and stuck at /// schedule next stage. If we do not return error in this case, needs more complex control /// logic above. Report in this function makes the schedule root fail reason more clear. - pub fn get_workers_by_worker_ids(&self, worker_ids: &[WorkerId]) -> Result> { - if worker_ids.is_empty() { + pub fn get_workers_by_worker_ids( + &self, + worker_slot_ids: &[WorkerSlotId], + ) -> Result> { + if worker_slot_ids.is_empty() { return Err(BatchError::EmptyWorkerNodes); } let guard = self.inner.read().unwrap(); // TODO: Does the return order of this function need to match the order of the parameters? - let worker_index: HashMap<_, _> = guard + let worker_slot_index: HashMap<_, _> = guard .worker_nodes .iter() - .map(|worker| (worker.id, worker.clone())) + .flat_map(|worker| { + (0..worker.parallel_units.len()) + .map(move |i| (WorkerSlotId::new(worker.id, i), worker)) + }) .collect(); - let mut workers = Vec::with_capacity(worker_ids.len()); + let mut workers = Vec::with_capacity(worker_slot_ids.len()); - for worker_id in worker_ids { - match worker_index.get(worker_id) { - Some(worker) => workers.push(worker.clone()), - None => bail!("No worker node found for worker id: {}", worker_id), + for worker_slot_id in worker_slot_ids { + match worker_slot_index.get(worker_slot_id) { + Some(worker) => workers.push((*worker).clone()), + None => bail!("No worker node found for worker id: {}", worker_slot_id), } } @@ -175,7 +181,7 @@ impl WorkerNodeManager { pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, - ) -> Result { + ) -> Result { self.inner .read() .unwrap() @@ -188,7 +194,7 @@ impl WorkerNodeManager { pub fn insert_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: WorkerMapping, + vnode_mapping: WorkerSlotMapping, ) { self.inner .write() @@ -201,7 +207,7 @@ impl WorkerNodeManager { pub fn update_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: WorkerMapping, + vnode_mapping: WorkerSlotMapping, ) { let mut guard = self.inner.write().unwrap(); guard @@ -219,7 +225,7 @@ impl WorkerNodeManager { } /// Returns fragment's vnode mapping for serving. - fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { + fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { self.inner .read() .unwrap() @@ -227,7 +233,7 @@ impl WorkerNodeManager { .ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id)) } - pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { + pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Set serving vnode mapping for fragments {:?}", @@ -236,7 +242,10 @@ impl WorkerNodeManager { guard.serving_fragment_vnode_mapping = mappings; } - pub fn upsert_serving_fragment_mapping(&self, mappings: HashMap) { + pub fn upsert_serving_fragment_mapping( + &self, + mappings: HashMap, + ) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Upsert serving vnode mapping for fragments {:?}", @@ -287,7 +296,7 @@ impl WorkerNodeManager { } impl WorkerNodeManagerInner { - fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { + fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { self.serving_fragment_vnode_mapping .get(&fragment_id) .cloned() @@ -330,7 +339,7 @@ impl WorkerNodeSelector { .sum() } - pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { + pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index c15491510b587..b281781d46b01 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::fmt::Debug; +use std::collections::{BTreeSet, HashMap}; +use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; use std::ops::Index; use educe::Educe; use itertools::Itertools; use risingwave_pb::common::{ - ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerMapping, + ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping, }; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; @@ -32,7 +32,43 @@ use crate::util::iter_util::ZipEqDebug; // TODO: find a better place for this. pub type ActorId = u32; -pub type WorkerId = u32; + +// pub type WorkerSlotId = u64; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct WorkerSlotId(u64); + +impl WorkerSlotId { + pub fn worker_id(&self) -> u32 { + (self.0 >> 32) as u32 + } + + pub fn slot_idx(&self) -> u32 { + self.0 as u32 + } + + pub fn new(worker_id: u32, slot_idx: usize) -> Self { + Self((worker_id as u64) << 32 | slot_idx as u64) + } +} + +impl From for u64 { + fn from(id: WorkerSlotId) -> Self { + id.0 + } +} + +impl From for WorkerSlotId { + fn from(id: u64) -> Self { + Self(id) + } +} + +impl Display for WorkerSlotId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx())) + } +} /// Trait for items that can be used as keys in [`VnodeMapping`]. pub trait VnodeMappingItem { @@ -258,10 +294,10 @@ pub mod marker { type Item = ParallelUnitId; } - /// A marker type for items of [`WorkerId`]. - pub struct Worker; - impl VnodeMappingItem for Worker { - type Item = WorkerId; + /// A marker type for items of [`WorkerSlotId`]. + pub struct WorkerSlot; + impl VnodeMappingItem for WorkerSlot { + type Item = WorkerSlotId; } } @@ -275,10 +311,10 @@ pub type ParallelUnitMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`]. pub type ExpandedParallelUnitMapping = ExpandedMapping; -/// A mapping from [`VirtualNode`] to [`WorkerId`]. -pub type WorkerMapping = VnodeMapping; -/// An expanded mapping from [`VirtualNode`] to [`WorkerId`]. -pub type ExpandedWorkerMapping = ExpandedMapping; +/// A mapping from [`VirtualNode`] to [`WorkerSlotId`]. +pub type WorkerSlotMapping = VnodeMapping; +/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`]. +pub type ExpandedWorkerSlotMapping = ExpandedMapping; impl ActorMapping { /// Transform this actor mapping to a parallel unit mapping, essentially `transform`. @@ -307,26 +343,26 @@ impl ActorMapping { } } -impl WorkerMapping { +impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids - pub fn build_from_ids(worker_ids: &[WorkerId]) -> Self { - Self::new_uniform(worker_ids.iter().cloned()) + pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { + Self::new_uniform(worker_slot_ids.iter().cloned()) } /// Create a worker mapping from the protobuf representation. - pub fn from_protobuf(proto: &PbWorkerMapping) -> Self { + pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self { assert_eq!(proto.original_indices.len(), proto.data.len()); Self { original_indices: proto.original_indices.clone(), - data: proto.data.clone(), + data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(), } } /// Convert this worker mapping to the protobuf representation. - pub fn to_protobuf(&self) -> PbWorkerMapping { - PbWorkerMapping { + pub fn to_protobuf(&self) -> PbWorkerSlotMapping { + PbWorkerSlotMapping { original_indices: self.original_indices.clone(), - data: self.data.clone(), + data: self.data.iter().map(|id| id.0).collect(), } } } @@ -349,8 +385,25 @@ impl ParallelUnitMapping { } /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. - pub fn to_worker(&self, to_map: &HashMap) -> WorkerMapping { - self.transform(to_map) + pub fn to_worker_slot(&self, to_map: &HashMap) -> WorkerSlotMapping { + let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); + for (parallel_unit_id, worker_id) in to_map { + worker_to_parallel_units + .entry(*worker_id) + .or_default() + .insert(*parallel_unit_id); + } + + let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len()); + + for (worker_id, parallel_unit_ids) in worker_to_parallel_units { + for (index, ¶llel_unit_id) in parallel_unit_ids.iter().enumerate() { + parallel_unit_to_worker_slot + .insert(parallel_unit_id, WorkerSlotId::new(worker_id, index)); + } + } + + self.transform(¶llel_unit_to_worker_slot) } /// Create a parallel unit mapping from the protobuf representation. diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 0c1086ffdb3dd..63446d76f177d 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -17,35 +17,35 @@ use std::ops::BitOrAssign; use itertools::Itertools; use num_integer::Integer; -use risingwave_common::hash::WorkerId; +use risingwave_common::hash::WorkerSlotId; use risingwave_pb::common::WorkerNode; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::hash::{VirtualNode, WorkerMapping}; +use crate::hash::{VirtualNode, WorkerSlotMapping}; /// Calculate a new vnode mapping, keeping locality and balance on a best effort basis. /// The strategy is similar to `rebalance_actor_vnode` used in meta node, but is modified to /// consider `max_parallelism` too. pub fn place_vnode( - hint_worker_mapping: Option<&WorkerMapping>, + hint_worker_mapping: Option<&WorkerSlotMapping>, workers: &[WorkerNode], max_parallelism: Option, -) -> Option { - #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] - struct WorkerSlot(WorkerId, usize); - - impl WorkerSlot { - fn worker_id(&self) -> WorkerId { - self.0 - } - } +) -> Option { + // #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] + // struct WorkerSlot(u32, usize); + // + // impl WorkerSlot { + // fn worker_id(&self) -> u32 { + // self.0 + // } + // } // Get all serving worker slots from all available workers, grouped by worker id and ordered // by worker slot id in each group. let mut worker_slots: LinkedList<_> = workers .iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) .sorted_by_key(|w| w.id) - .map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlot(w.id, idx))) + .map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlotId::new(w.id, idx))) .collect(); // Set serving parallelism to the minimum of total number of worker slots, specified @@ -71,7 +71,7 @@ pub fn place_vnode( .for_each(drop); } selected_slots.drain(serving_parallelism..); - let selected_slots_set: HashSet = selected_slots.iter().cloned().collect(); + let selected_slots_set: HashSet = selected_slots.iter().cloned().collect(); if selected_slots_set.is_empty() { return None; } @@ -81,14 +81,14 @@ pub fn place_vnode( // `is_temp` is a mark for a special temporary worker slot, only to simplify implementation. #[derive(Debug)] struct Balance { - slot: WorkerSlot, + slot: WorkerSlotId, balance: i32, builder: BitmapBuilder, is_temp: bool, } let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_slots.len()); - let mut balances: HashMap = HashMap::default(); + let mut balances: HashMap = HashMap::default(); for slot in &selected_slots { let mut balance = Balance { @@ -108,16 +108,14 @@ pub fn place_vnode( // Now to maintain affinity, if a hint has been provided via `hint_pu_mapping`, follow // that mapping to adjust balances. let mut temp_slot = Balance { - slot: WorkerSlot(0, usize::MAX), /* This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. */ + slot: WorkerSlotId::new(0u32, usize::MAX), /* This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. */ balance: 0, builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: true, }; match hint_worker_mapping { Some(hint_worker_mapping) => { - for (vnode, worker_id) in hint_worker_mapping.iter_with_vnode() { - let worker_slot = WorkerSlot(worker_id, 0); - + for (vnode, worker_slot) in hint_worker_mapping.iter_with_vnode() { let b = if selected_slots_set.contains(&worker_slot) { // Assign vnode to the same worker slot as hint. balances.get_mut(&worker_slot).unwrap() @@ -153,7 +151,7 @@ pub fn place_vnode( .rev() .collect(); - let mut results: HashMap = HashMap::default(); + let mut results: HashMap = HashMap::default(); while !balances.is_empty() { if balances.len() == 1 { @@ -197,21 +195,20 @@ pub fn place_vnode( let mut worker_result = HashMap::new(); for (worker_slot, bitmap) in results { - let worker_id = worker_slot.worker_id(); worker_result - .entry(worker_id) + .entry(worker_slot) .or_insert(BitmapBuilder::zeroed(VirtualNode::COUNT).finish()) .bitor_assign(&bitmap); } - Some(WorkerMapping::from_bitmaps(&worker_result)) + Some(WorkerSlotMapping::from_bitmaps(&worker_result)) } #[cfg(test)] mod tests { use std::collections::HashMap; - use risingwave_common::hash::WorkerMapping; + use risingwave_common::hash::WorkerSlotMapping; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{ParallelUnit, WorkerNode}; @@ -245,7 +242,7 @@ mod tests { results }; - let count_same_vnode_mapping = |wm1: &WorkerMapping, wm2: &WorkerMapping| { + let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| { assert_eq!(wm1.len(), 256); assert_eq!(wm2.len(), 256); let mut count: usize = 0; diff --git a/src/ctl/src/cmd_impl/meta/serving.rs b/src/ctl/src/cmd_impl/meta/serving.rs index 4cccf825ef862..cd8b1be50e206 100644 --- a/src/ctl/src/cmd_impl/meta/serving.rs +++ b/src/ctl/src/cmd_impl/meta/serving.rs @@ -16,8 +16,8 @@ use std::collections::HashMap; use comfy_table::{Row, Table}; use itertools::Itertools; -use risingwave_common::hash::{ParallelUnitId, VirtualNode, WorkerId}; -use risingwave_pb::common::{WorkerNode, WorkerType}; +use risingwave_common::hash::VirtualNode; +use risingwave_pb::common::WorkerType; use crate::CtlContext; @@ -44,9 +44,12 @@ pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Res let rows = mappings .iter() .flat_map(|(fragment_id, (table_id, mapping))| { - let mut worker_nodes: HashMap> = HashMap::new(); - for (vnode, worker) in mapping.iter_with_vnode() { - worker_nodes.entry(worker).or_default().push(vnode); + let mut worker_nodes: HashMap> = HashMap::new(); + for (vnode, worker_slot_id) in mapping.iter_with_vnode() { + worker_nodes + .entry(worker_slot_id.worker_id()) + .or_default() + .push(vnode); } worker_nodes.into_iter().map(|(worker_id, vnodes)| { (*table_id, *fragment_id, vnodes, workers.get(&worker_id)) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 050ae1c089f6c..62791c51eb65f 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use parking_lot::RwLock; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::catalog::CatalogVersion; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend}; @@ -391,7 +391,9 @@ impl FrontendObserverNode { Info::StreamingWorkerMapping(streaming_worker_mapping) => { let fragment_id = streaming_worker_mapping.fragment_id; let mapping = || { - WorkerMapping::from_protobuf(streaming_worker_mapping.mapping.as_ref().unwrap()) + WorkerSlotMapping::from_protobuf( + streaming_worker_mapping.mapping.as_ref().unwrap(), + ) }; match resp.operation() { @@ -471,7 +473,7 @@ impl FrontendObserverNode { fn convert_worker_mapping( worker_mappings: &[FragmentWorkerMapping], -) -> HashMap { +) -> HashMap { worker_mappings .iter() .map( @@ -479,7 +481,7 @@ fn convert_worker_mapping( fragment_id, mapping, }| { - let mapping = WorkerMapping::from_protobuf(mapping.as_ref().unwrap()); + let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap()); (*fragment_id, mapping) }, ) diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 4c8eba53030f2..6ee82e921031e 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -51,7 +51,7 @@ use generic::PhysicalPlanRef; use itertools::Itertools; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::{FieldDisplay, Schema, TableId}; -use risingwave_common::hash::WorkerId; +use risingwave_common::hash::WorkerSlotId; use risingwave_pb::batch_plan::exchange_info::{ ConsistentHashInfo, Distribution as DistributionPb, DistributionMode, HashInfo, }; @@ -148,7 +148,7 @@ impl Distribution { let vnode_mapping = worker_node_manager .fragment_mapping(Self::get_fragment_id(catalog_reader, table_id)?)?; - let worker_to_id_map: HashMap = vnode_mapping + let worker_to_id_map: HashMap = vnode_mapping .iter_unique() .enumerate() .map(|(i, worker_id)| (worker_id, i as u32)) diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index f5c9f9280f9d3..0a8244086109b 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -392,12 +392,12 @@ impl QueryRunner { let root_task_id_prost = TaskIdPb { query_id: self.query.query_id.clone().id, stage_id: self.query.root_stage_id(), - task_id: ROOT_TASK_ID, + task_id: ROOT_TASK_ID as u64, }; TaskOutputIdPb { task_id: Some(root_task_id_prost), - output_id: ROOT_TASK_OUTPUT_ID, + output_id: ROOT_TASK_OUTPUT_ID as u64, } }; @@ -476,7 +476,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::WorkerMapping; + use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType}; @@ -721,9 +721,12 @@ pub(crate) mod tests { let workers = vec![worker1, worker2, worker3]; let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers)); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); - worker_node_manager.insert_streaming_fragment_mapping(0, WorkerMapping::new_single(0)); + worker_node_manager.insert_streaming_fragment_mapping( + 0, + WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)), + ); worker_node_manager.set_serving_fragment_mapping( - vec![(0, WorkerMapping::new_single(0))] + vec![(0, WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)))] .into_iter() .collect(), ); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 0b597f91a3cfb..5197cea0c7a35 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -32,7 +32,7 @@ use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::source::SplitMetaData; @@ -172,7 +172,7 @@ impl StageExecution { ctx: ExecutionContextRef, ) -> Self { let tasks = (0..stage.parallelism.unwrap()) - .map(|task_id| (task_id, TaskStatusHolder::new(task_id))) + .map(|task_id| (task_id as u64, TaskStatusHolder::new(task_id as u64))) .collect(); Self { @@ -289,7 +289,7 @@ impl StageExecution { /// /// When this method is called, all tasks should have been scheduled, and their `worker_node` /// should have been set. - pub fn all_exchange_sources_for(&self, output_id: u32) -> Vec { + pub fn all_exchange_sources_for(&self, output_id: u64) -> Vec { self.tasks .iter() .map(|(task_id, status_holder)| { @@ -367,11 +367,11 @@ impl StageRunner { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: i as u32, + task_id: i as u64, }; let vnode_ranges = vnode_bitmaps[&worker_id].clone(); let plan_fragment = - self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); + self.create_plan_fragment(i as u64, Some(PartitionInfo::Table(vnode_ranges))); futures.push(self.schedule_task( task_id, plan_fragment, @@ -392,10 +392,10 @@ impl StageRunner { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: id as u32, + task_id: id as u64, }; let plan_fragment = self - .create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.to_vec()))); + .create_plan_fragment(id as u64, Some(PartitionInfo::Source(split.to_vec()))); let worker = self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; futures.push(self.schedule_task( @@ -410,9 +410,9 @@ impl StageRunner { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: id, + task_id: id as u64, }; - let plan_fragment = self.create_plan_fragment(id, None); + let plan_fragment = self.create_plan_fragment(id as u64, None); let worker = self.choose_worker(&plan_fragment, id, self.stage.dml_table_id)?; futures.push(self.schedule_task( task_id, @@ -584,7 +584,7 @@ impl StageRunner { let root_stage_id = self.stage.id; // Currently, the dml or table scan should never be root fragment, so the partition is None. // And root fragment only contain one task. - let plan_fragment = self.create_plan_fragment(ROOT_TASK_ID, None); + let plan_fragment = self.create_plan_fragment(ROOT_TASK_ID as u64, None); let plan_node = plan_fragment.root.unwrap(); let task_id = TaskIdBatch { query_id: self.stage.query_id.id.clone(), @@ -680,7 +680,10 @@ impl StageRunner { } #[inline(always)] - fn get_table_dml_vnode_mapping(&self, table_id: &TableId) -> SchedulerResult { + fn get_table_dml_vnode_mapping( + &self, + table_id: &TableId, + ) -> SchedulerResult { let guard = self.catalog_reader.read_guard(); let table = guard diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index ec8b3a0d21184..7f24a5c27032f 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -30,7 +30,7 @@ use risingwave_batch::task::{ShutdownToken, TaskId}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_common::bail; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::tracing::{InstrumentStream, TracingContext}; use risingwave_connector::source::SplitMetaData; @@ -343,7 +343,7 @@ impl LocalQueryExecution { let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { task_id: Some(PbTaskId { - task_id: idx as u32, + task_id: idx as u64, stage_id: exchange_source_stage_id, query_id: self.query.query_id.id.clone(), }), @@ -389,7 +389,7 @@ impl LocalQueryExecution { let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { task_id: Some(PbTaskId { - task_id: id as u32, + task_id: id as u64, stage_id: exchange_source_stage_id, query_id: self.query.query_id.id.clone(), }), @@ -429,7 +429,7 @@ impl LocalQueryExecution { let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { task_id: Some(PbTaskId { - task_id: idx as u32, + task_id: idx as u64, stage_id: exchange_source_stage_id, query_id: self.query.query_id.id.clone(), }), @@ -531,7 +531,8 @@ impl LocalQueryExecution { )?; // TODO: should we use `pb::ParallelUnitMapping` here? - node.inner_side_vnode_mapping = mapping.to_expanded(); + node.inner_side_vnode_mapping = + mapping.to_expanded().into_iter().map(u64::from).collect(); node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes(); } _ => unreachable!(), @@ -583,7 +584,10 @@ impl LocalQueryExecution { } #[inline(always)] - fn get_table_dml_vnode_mapping(&self, table_id: &TableId) -> SchedulerResult { + fn get_table_dml_vnode_mapping( + &self, + table_id: &TableId, + ) -> SchedulerResult { let guard = self.front_env.catalog_reader().read_guard(); let table = guard diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6dfa31a845b18..fd86ed0442924 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -30,7 +30,7 @@ use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; -use risingwave_common::hash::{VirtualNode, WorkerId, WorkerMapping}; +use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; @@ -78,7 +78,7 @@ pub type StageId = u32; pub const ROOT_TASK_ID: u32 = 0; // Root task has only one output. pub const ROOT_TASK_OUTPUT_ID: u32 = 0; -pub type TaskId = u32; +pub type TaskId = u64; /// Generated by [`BatchPlanFragmenter`] and used in query execution graph. #[derive(Clone, Debug)] @@ -392,12 +392,12 @@ pub struct TableScanInfo { /// full vnode bitmap, since we need to know where to schedule the singleton scan task. /// /// `None` iff the table is a system table. - partitions: Option>, + partitions: Option>, } impl TableScanInfo { /// For normal tables, `partitions` should always be `Some`. - pub fn new(name: String, partitions: HashMap) -> Self { + pub fn new(name: String, partitions: HashMap) -> Self { Self { name, partitions: Some(partitions), @@ -416,7 +416,7 @@ impl TableScanInfo { self.name.as_ref() } - pub fn partitions(&self) -> Option<&HashMap> { + pub fn partitions(&self) -> Option<&HashMap> { self.partitions.as_ref() } } @@ -1158,10 +1158,10 @@ impl BatchPlanFragmenter { fn derive_partitions( scan_ranges: &[ScanRange], table_desc: &TableDesc, - vnode_mapping: &WorkerMapping, -) -> SchedulerResult> { + vnode_mapping: &WorkerSlotMapping, +) -> SchedulerResult> { let num_vnodes = vnode_mapping.len(); - let mut partitions: HashMap)> = HashMap::new(); + let mut partitions: HashMap)> = HashMap::new(); if scan_ranges.is_empty() { return Ok(vnode_mapping diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 7e3cd8a1fe6ce..5049f32cce00e 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -294,7 +294,7 @@ impl CatalogController { fragment_id, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf(), ), } @@ -2114,7 +2114,7 @@ impl CatalogController { fragment_id, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf(), ), } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 5f3dc49570d8b..c073979a8cbdc 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -80,7 +80,7 @@ impl CatalogControllerInner { .into_iter() .map(move |(fragment_id, mapping)| { let worker_mapping = ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf(); FragmentWorkerMapping { @@ -959,7 +959,7 @@ impl CatalogController { fragment_id: fragment_id as _, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf(), ), }) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 0dcad7df5d9f1..153220d8c0edb 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1401,7 +1401,7 @@ impl CatalogController { fragment.update(&txn).await?; let worker_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf(); fragment_mapping_to_notify.push(FragmentWorkerMapping { diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 702a485ed3187..70793a7043d1d 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -836,7 +836,7 @@ where fragment_id: fragment_id as _, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf(), ), }) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index d12680de968cd..204ef1fb3dcc0 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -24,7 +24,7 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping} use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerMapping}; +use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerSlotMapping}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; @@ -1405,7 +1405,7 @@ impl FragmentManager { fn convert_mapping( actor_status: &BTreeMap, vnode_mapping: &PbParallelUnitMapping, - ) -> PbWorkerMapping { + ) -> PbWorkerSlotMapping { let parallel_unit_to_worker = actor_status .values() .map(|actor_status| { @@ -1415,7 +1415,7 @@ impl FragmentManager { .collect(); ParallelUnitMapping::from_protobuf(vnode_mapping) - .to_worker(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker) .to_protobuf() } diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 67926d9b6c4b7..3665ea64ef9f1 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -31,11 +31,11 @@ pub type ServingVnodeMappingRef = Arc; #[derive(Default)] pub struct ServingVnodeMapping { - serving_vnode_mappings: RwLock>, + serving_vnode_mappings: RwLock>, } impl ServingVnodeMapping { - pub fn all(&self) -> HashMap { + pub fn all(&self) -> HashMap { self.serving_vnode_mappings.read().clone() } @@ -45,9 +45,9 @@ impl ServingVnodeMapping { &self, streaming_parallelisms: HashMap, workers: &[WorkerNode], - ) -> (HashMap, Vec) { + ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); - let mut upserted: HashMap = HashMap::default(); + let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; for (fragment_id, streaming_parallelism) in streaming_parallelisms { let new_mapping = { @@ -82,7 +82,7 @@ impl ServingVnodeMapping { } pub(crate) fn to_fragment_worker_mapping( - mappings: &HashMap, + mappings: &HashMap, ) -> Vec { mappings .iter() diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 50b6c0444041b..f6afbb299d3a2 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,7 +25,7 @@ use futures::stream::BoxStream; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; -use risingwave_common::hash::WorkerMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -1169,7 +1169,9 @@ impl MetaClient { Ok(resp.tables) } - pub async fn list_serving_vnode_mappings(&self) -> Result> { + pub async fn list_serving_vnode_mappings( + &self, + ) -> Result> { let req = GetServingVnodeMappingsRequest {}; let resp = self.inner.get_serving_vnode_mappings(req).await?; let mappings = resp @@ -1183,7 +1185,7 @@ impl MetaClient { .get(&p.fragment_id) .cloned() .unwrap_or(0), - WorkerMapping::from_protobuf(p.mapping.as_ref().unwrap()), + WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()), ), ) })