diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 6a254332fc71..8a196eecf502 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -225,7 +225,7 @@ message TableFunctionNode { message TaskId { string query_id = 1; uint32 stage_id = 2; - uint32 task_id = 3; + uint64 task_id = 3; } // Every task will create N buffers (channels) for parent operators to fetch results from, @@ -233,7 +233,7 @@ message TaskId { message TaskOutputId { TaskId task_id = 1; // The id of output channel to fetch from - uint32 output_id = 2; + uint64 output_id = 2; } message LocalExecutePlan { @@ -270,7 +270,7 @@ message LocalLookupJoinNode { repeated uint32 inner_side_key = 4; uint32 lookup_prefix_len = 5; plan_common.StorageTableDesc inner_side_table_desc = 6; - repeated uint32 inner_side_vnode_mapping = 7; + repeated uint64 inner_side_vnode_mapping = 7; repeated int32 inner_side_column_ids = 8; repeated uint32 output_indices = 9; repeated common.WorkerNode worker_nodes = 10; diff --git a/proto/common.proto b/proto/common.proto index 4f0d56b4823a..164150379c48 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -92,6 +92,12 @@ message ParallelUnitMapping { repeated uint32 data = 2; } +// Vnode mapping for stream fragments. Stores mapping from virtual node to (worker id, slot index). +message WorkerSlotMapping { + repeated uint32 original_indices = 1; + repeated uint64 data = 2; +} + message BatchQueryEpoch { oneof epoch { uint64 committed = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 149b2940e2a2..df90d99f6e9e 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -111,6 +111,16 @@ message FragmentParallelUnitMappings { repeated FragmentParallelUnitMapping mappings = 1; } +/// Worker slot mapping with fragment id, used for notification. +message FragmentWorkerSlotMapping { + uint32 fragment_id = 1; + common.WorkerSlotMapping mapping = 2; +} + +message FragmentWorkerSlotMappings { + repeated FragmentWorkerSlotMapping mappings = 1; +} + // TODO: remove this when dashboard refactored. message ActorLocation { common.WorkerNode node = 1; @@ -378,8 +388,10 @@ message SubscribeRequest { message MetaSnapshot { message SnapshotVersion { uint64 catalog_version = 1; - uint64 parallel_unit_mapping_version = 2; + reserved 2; + reserved "parallel_unit_mapping_version"; uint64 worker_node_version = 3; + uint64 streaming_worker_slot_mapping_version = 4; } repeated catalog.Database databases = 1; repeated catalog.Schema schemas = 2; @@ -392,16 +404,20 @@ message MetaSnapshot { repeated catalog.Connection connections = 17; repeated catalog.Subscription subscriptions = 19; repeated user.UserInfo users = 8; + reserved 9; + reserved "parallel_unit_mappings"; GetSessionParamsResponse session_params = 20; - // for streaming - repeated FragmentParallelUnitMapping parallel_unit_mappings = 9; repeated common.WorkerNode nodes = 10; hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; hummock.WriteLimits hummock_write_limits = 16; - // for serving - repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18; + reserved 18; + reserved "serving_parallel_unit_mappings"; + + // for streaming + repeated FragmentWorkerSlotMapping streaming_worker_slot_mappings = 21; + repeated FragmentWorkerSlotMapping serving_worker_slot_mappings = 22; SnapshotVersion version = 13; } @@ -440,8 +456,6 @@ message SubscribeResponse { catalog.Function function = 6; user.UserInfo user = 11; SetSessionParamRequest session_param = 26; - // for streaming - FragmentParallelUnitMapping parallel_unit_mapping = 12; common.WorkerNode node = 13; hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; @@ -451,10 +465,15 @@ message SubscribeResponse { hummock.WriteLimits hummock_write_limits = 20; RelationGroup relation_group = 21; catalog.Connection connection = 22; - FragmentParallelUnitMappings serving_parallel_unit_mappings = 23; hummock.HummockVersionStats hummock_stats = 24; Recovery recovery = 25; + FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27; + FragmentWorkerSlotMappings serving_worker_slot_mappings = 28; } + reserved 12; + reserved "parallel_unit_mapping"; + reserved 23; + reserved "serving_parallel_unit_mappings"; } service NotificationService { @@ -629,8 +648,10 @@ service SessionParamService { message GetServingVnodeMappingsRequest {} message GetServingVnodeMappingsResponse { - repeated FragmentParallelUnitMapping mappings = 1; + reserved 1; + reserved "mappings"; map fragment_to_table = 2; + repeated FragmentWorkerSlotMapping worker_slot_mappings = 3; } service ServingService { diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 17b257106fb5..638fbf5b96b6 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -21,7 +21,7 @@ use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ - ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, + ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId, }; use risingwave_common::memory::MemoryContext; use risingwave_common::types::{DataType, Datum}; @@ -29,7 +29,6 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::scan_range::ScanRange; use risingwave_common::util::tracing::TracingContext; -use risingwave_common::util::worker_util::get_pu_to_worker_mapping; use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::exchange_info::DistributionMode; use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan; @@ -52,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; struct InnerSideExecutorBuilder { table_desc: StorageTableDesc, table_distribution: TableDistribution, - vnode_mapping: ExpandedParallelUnitMapping, + vnode_mapping: ExpandedWorkerSlotMapping, outer_side_key_types: Vec, inner_side_schema: Schema, inner_side_column_ids: Vec, @@ -61,8 +60,8 @@ struct InnerSideExecutorBuilder { context: C, task_id: TaskId, epoch: BatchQueryEpoch, - pu_to_worker_mapping: HashMap, - pu_to_scan_range_mapping: HashMap>, + worker_slot_mapping: HashMap, + worker_slot_to_scan_range_mapping: HashMap>, chunk_size: usize, shutdown_rx: ShutdownToken, next_stage_id: usize, @@ -91,8 +90,8 @@ impl InnerSideExecutorBuilder { /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table /// based on the passed `scan_range` and virtual node. - fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result { - let list = self.pu_to_scan_range_mapping.get(id).unwrap(); + fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result { + let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap(); let mut scan_ranges = vec![]; let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len()); @@ -114,11 +113,11 @@ impl InnerSideExecutorBuilder { } /// Creates the `PbExchangeSource` using the given `id`. - fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result { + fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result { let worker = self - .pu_to_worker_mapping + .worker_slot_mapping .get(id) - .context("No worker node found for the given parallel unit id.")?; + .context("No worker node found for the given worker slot id.")?; let local_execute_plan = LocalExecutePlan { plan: Some(PlanFragment { @@ -145,7 +144,7 @@ impl InnerSideExecutorBuilder { // conflict. query_id: self.task_id.query_id.clone(), stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32, - task_id: *id, + task_id: (*id).into(), }), output_id: 0, }), @@ -160,7 +159,7 @@ impl InnerSideExecutorBuilder { #[async_trait::async_trait] impl LookupExecutorBuilder for InnerSideExecutorBuilder { fn reset(&mut self) { - self.pu_to_scan_range_mapping = HashMap::new(); + self.worker_slot_to_scan_range_mapping = HashMap::new(); } /// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id @@ -191,11 +190,11 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder } let vnode = self.get_virtual_node(&scan_range)?; - let parallel_unit_id = self.vnode_mapping[vnode.to_index()]; + let worker_slot_id = self.vnode_mapping[vnode.to_index()]; let list = self - .pu_to_scan_range_mapping - .entry(parallel_unit_id) + .worker_slot_to_scan_range_mapping + .entry(worker_slot_id) .or_default(); list.push((scan_range, vnode)); @@ -207,7 +206,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder async fn build_executor(&mut self) -> Result { self.next_stage_id += 1; let mut sources = vec![]; - for id in self.pu_to_scan_range_mapping.keys() { + for id in self.worker_slot_to_scan_range_mapping.keys() { sources.push(self.build_prost_exchange_source(id)?); } @@ -368,11 +367,26 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { let null_safe = lookup_join_node.get_null_safe().to_vec(); - let vnode_mapping = lookup_join_node.get_inner_side_vnode_mapping().to_vec(); + let vnode_mapping = lookup_join_node + .get_inner_side_vnode_mapping() + .iter() + .copied() + .map(WorkerSlotId::from) + .collect_vec(); + assert!(!vnode_mapping.is_empty()); let chunk_size = source.context.get_config().developer.chunk_size; + let worker_nodes = lookup_join_node.get_worker_nodes(); + let worker_slot_mapping: HashMap = worker_nodes + .iter() + .flat_map(|worker| { + (0..(worker.parallel_units.len())) + .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone())) + }) + .collect(); + let inner_side_builder = InnerSideExecutorBuilder { table_desc: table_desc.clone(), table_distribution: TableDistribution::new_from_storage_table_desc( @@ -388,11 +402,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { context: source.context().clone(), task_id: source.task_id.clone(), epoch: source.epoch(), - pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()), - pu_to_scan_range_mapping: HashMap::new(), + worker_slot_to_scan_range_mapping: HashMap::new(), chunk_size, shutdown_rx: source.shutdown_rx.clone(), next_stage_id: 0, + worker_slot_mapping, }; let identity = source.plan_node().get_identity().clone(); diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 87d94bc1d90b..b4ad51da3bb4 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -79,7 +79,7 @@ impl StateReporter { #[derive(PartialEq, Eq, Hash, Clone, Debug, Default)] pub struct TaskId { - pub task_id: u32, + pub task_id: u64, pub stage_id: u32, pub query_id: String, } @@ -87,7 +87,7 @@ pub struct TaskId { #[derive(PartialEq, Eq, Hash, Clone, Default)] pub struct TaskOutputId { pub task_id: TaskId, - pub output_id: u32, + pub output_id: u64, } /// More compact formatter compared to derived `fmt::Debug`. diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index 5b0813186fd1..8b7dcc42b565 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -18,8 +18,7 @@ use std::time::Duration; use rand::seq::SliceRandom; use risingwave_common::bail; -use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; -use risingwave_common::util::worker_util::get_pu_to_worker_mapping; +use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; @@ -36,12 +35,10 @@ pub struct WorkerNodeManager { struct WorkerNodeManagerInner { worker_nodes: Vec, - /// A cache for parallel units to worker nodes. It should be consistent with `worker_nodes`. - pu_to_worker: HashMap, /// fragment vnode mapping info for streaming - streaming_fragment_vnode_mapping: HashMap, + streaming_fragment_vnode_mapping: HashMap, /// fragment vnode mapping info for serving - serving_fragment_vnode_mapping: HashMap, + serving_fragment_vnode_mapping: HashMap, } pub type WorkerNodeManagerRef = Arc; @@ -57,7 +54,6 @@ impl WorkerNodeManager { Self { inner: RwLock::new(WorkerNodeManagerInner { worker_nodes: Default::default(), - pu_to_worker: Default::default(), streaming_fragment_vnode_mapping: Default::default(), serving_fragment_vnode_mapping: Default::default(), }), @@ -68,7 +64,6 @@ impl WorkerNodeManager { /// Used in tests. pub fn mock(worker_nodes: Vec) -> Self { let inner = RwLock::new(WorkerNodeManagerInner { - pu_to_worker: get_pu_to_worker_mapping(&worker_nodes), worker_nodes, streaming_fragment_vnode_mapping: HashMap::new(), serving_fragment_vnode_mapping: HashMap::new(), @@ -120,23 +115,18 @@ impl WorkerNodeManager { *w = node; } } - // Update `pu_to_worker` - write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn remove_worker_node(&self, node: WorkerNode) { let mut write_guard = self.inner.write().unwrap(); write_guard.worker_nodes.retain(|x| x.id != node.id); - - // Update `pu_to_worker` - write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn refresh( &self, nodes: Vec, - streaming_mapping: HashMap, - serving_mapping: HashMap, + streaming_mapping: HashMap, + serving_mapping: HashMap, ) { let mut write_guard = self.inner.write().unwrap(); tracing::debug!("Refresh worker nodes {:?}.", nodes); @@ -149,42 +139,51 @@ impl WorkerNodeManager { serving_mapping.keys() ); write_guard.worker_nodes = nodes; - // Update `pu_to_worker` - write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); write_guard.streaming_fragment_vnode_mapping = streaming_mapping; write_guard.serving_fragment_vnode_mapping = serving_mapping; } - /// If parallel unit ids is empty, the scheduler may fail to schedule any task and stuck at + /// If worker slot ids is empty, the scheduler may fail to schedule any task and stuck at /// schedule next stage. If we do not return error in this case, needs more complex control /// logic above. Report in this function makes the schedule root fail reason more clear. - pub fn get_workers_by_parallel_unit_ids( + pub fn get_workers_by_worker_slot_ids( &self, - parallel_unit_ids: &[ParallelUnitId], + worker_slot_ids: &[WorkerSlotId], ) -> Result> { - if parallel_unit_ids.is_empty() { + if worker_slot_ids.is_empty() { return Err(BatchError::EmptyWorkerNodes); } let guard = self.inner.read().unwrap(); - let mut workers = Vec::with_capacity(parallel_unit_ids.len()); - for parallel_unit_id in parallel_unit_ids { - match guard.pu_to_worker.get(parallel_unit_id) { - Some(worker) => workers.push(worker.clone()), + let worker_slot_index: HashMap<_, _> = guard + .worker_nodes + .iter() + .flat_map(|worker| { + (0..worker.parallel_units.len()) + .map(move |i| (WorkerSlotId::new(worker.id, i), worker)) + }) + .collect(); + + let mut workers = Vec::with_capacity(worker_slot_ids.len()); + + for worker_slot_id in worker_slot_ids { + match worker_slot_index.get(worker_slot_id) { + Some(worker) => workers.push((*worker).clone()), None => bail!( - "No worker node found for parallel unit id: {}", - parallel_unit_id + "No worker node found for worker slot id: {}", + worker_slot_id ), } } + Ok(workers) } pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, - ) -> Result { + ) -> Result { self.inner .read() .unwrap() @@ -197,7 +196,7 @@ impl WorkerNodeManager { pub fn insert_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: ParallelUnitMapping, + vnode_mapping: WorkerSlotMapping, ) { self.inner .write() @@ -210,7 +209,7 @@ impl WorkerNodeManager { pub fn update_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: ParallelUnitMapping, + vnode_mapping: WorkerSlotMapping, ) { let mut guard = self.inner.write().unwrap(); guard @@ -228,7 +227,7 @@ impl WorkerNodeManager { } /// Returns fragment's vnode mapping for serving. - fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { + fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { self.inner .read() .unwrap() @@ -236,7 +235,7 @@ impl WorkerNodeManager { .ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id)) } - pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { + pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Set serving vnode mapping for fragments {:?}", @@ -247,7 +246,7 @@ impl WorkerNodeManager { pub fn upsert_serving_fragment_mapping( &self, - mappings: HashMap, + mappings: HashMap, ) { let mut guard = self.inner.write().unwrap(); tracing::debug!( @@ -299,7 +298,7 @@ impl WorkerNodeManager { } impl WorkerNodeManagerInner { - fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { + fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { self.serving_fragment_vnode_mapping .get(&fragment_id) .cloned() @@ -342,7 +341,7 @@ impl WorkerNodeSelector { .sum() } - pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { + pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 3f18be769752..a611c40aebc6 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -145,9 +145,6 @@ where | Info::Function(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } - Info::ParallelUnitMapping(_) => { - notification.version > info.version.as_ref().unwrap().parallel_unit_mapping_version - } Info::Node(_) => { notification.version > info.version.as_ref().unwrap().worker_node_version } @@ -157,10 +154,18 @@ where Info::HummockSnapshot(_) => true, Info::MetaBackupManifestId(_) => true, Info::SystemParams(_) | Info::SessionParam(_) => true, - Info::ServingParallelUnitMappings(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), Info::HummockStats(_) => true, Info::Recovery(_) => true, + Info::StreamingWorkerSlotMapping(_) => { + notification.version + > info + .version + .as_ref() + .unwrap() + .streaming_worker_slot_mapping_version + } + Info::ServingWorkerSlotMappings(_) => true, }); self.observer_states diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 07e62b7eac27..c057cf847c7d 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::fmt::Debug; +use std::collections::{BTreeSet, HashMap}; +use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; use std::ops::Index; use educe::Educe; use itertools::Itertools; -use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto}; +use risingwave_pb::common::{ + ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping, +}; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; use super::bitmap::VnodeBitmapExt; @@ -31,6 +33,41 @@ use crate::util::iter_util::ZipEqDebug; // TODO: find a better place for this. pub type ActorId = u32; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct WorkerSlotId(u64); + +impl WorkerSlotId { + pub fn worker_id(&self) -> u32 { + (self.0 >> 32) as u32 + } + + pub fn slot_idx(&self) -> u32 { + self.0 as u32 + } + + pub fn new(worker_id: u32, slot_idx: usize) -> Self { + Self((worker_id as u64) << 32 | slot_idx as u64) + } +} + +impl From for u64 { + fn from(id: WorkerSlotId) -> Self { + id.0 + } +} + +impl From for WorkerSlotId { + fn from(id: u64) -> Self { + Self(id) + } +} + +impl Display for WorkerSlotId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx())) + } +} + /// Trait for items that can be used as keys in [`VnodeMapping`]. pub trait VnodeMappingItem { /// The type of the item. @@ -254,6 +291,12 @@ pub mod marker { impl VnodeMappingItem for ParallelUnit { type Item = ParallelUnitId; } + + /// A marker type for items of [`WorkerSlotId`]. + pub struct WorkerSlot; + impl VnodeMappingItem for WorkerSlot { + type Item = WorkerSlotId; + } } /// A mapping from [`VirtualNode`] to [`ActorId`]. @@ -266,6 +309,11 @@ pub type ParallelUnitMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`]. pub type ExpandedParallelUnitMapping = ExpandedMapping; +/// A mapping from [`VirtualNode`] to [`WorkerSlotId`]. +pub type WorkerSlotMapping = VnodeMapping; +/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`]. +pub type ExpandedWorkerSlotMapping = ExpandedMapping; + impl ActorMapping { /// Transform this actor mapping to a parallel unit mapping, essentially `transform`. pub fn to_parallel_unit(&self, to_map: &M) -> ParallelUnitMapping @@ -293,6 +341,30 @@ impl ActorMapping { } } +impl WorkerSlotMapping { + /// Create a uniform worker mapping from the given worker ids + pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { + Self::new_uniform(worker_slot_ids.iter().cloned()) + } + + /// Create a worker mapping from the protobuf representation. + pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self { + assert_eq!(proto.original_indices.len(), proto.data.len()); + Self { + original_indices: proto.original_indices.clone(), + data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(), + } + } + + /// Convert this worker mapping to the protobuf representation. + pub fn to_protobuf(&self) -> PbWorkerSlotMapping { + PbWorkerSlotMapping { + original_indices: self.original_indices.clone(), + data: self.data.iter().map(|id| id.0).collect(), + } + } +} + impl ParallelUnitMapping { /// Create a uniform parallel unit mapping from the given parallel units, essentially /// `new_uniform`. @@ -310,6 +382,28 @@ impl ParallelUnitMapping { self.transform(to_map) } + /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. + pub fn to_worker_slot(&self, to_map: &HashMap) -> WorkerSlotMapping { + let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); + for (parallel_unit_id, worker_id) in to_map { + worker_to_parallel_units + .entry(*worker_id) + .or_default() + .insert(*parallel_unit_id); + } + + let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len()); + + for (worker_id, parallel_unit_ids) in worker_to_parallel_units { + for (index, ¶llel_unit_id) in parallel_unit_ids.iter().enumerate() { + parallel_unit_to_worker_slot + .insert(parallel_unit_id, WorkerSlotId::new(worker_id, index)); + } + } + + self.transform(¶llel_unit_to_worker_slot) + } + /// Create a parallel unit mapping from the protobuf representation. pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self { assert_eq!(proto.original_indices.len(), proto.data.len()); diff --git a/src/common/src/util/worker_util.rs b/src/common/src/util/worker_util.rs index 893cd95ecbbb..80ecd3b82253 100644 --- a/src/common/src/util/worker_util.rs +++ b/src/common/src/util/worker_util.rs @@ -12,23 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - -use risingwave_pb::common::WorkerNode; - -use crate::hash::ParallelUnitId; - pub type WorkerNodeId = u32; - -pub fn get_pu_to_worker_mapping(nodes: &[WorkerNode]) -> HashMap { - let mut pu_to_worker = HashMap::new(); - - for node in nodes { - for pu in &node.parallel_units { - let res = pu_to_worker.insert(pu.id, node.clone()); - assert!(res.is_none(), "duplicate parallel unit id"); - } - } - - pu_to_worker -} diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 49f45d66512e..036cfebe792b 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -13,46 +13,48 @@ // limitations under the License. use std::collections::{HashMap, HashSet, LinkedList, VecDeque}; +use std::ops::BitOrAssign; use itertools::Itertools; use num_integer::Integer; +use risingwave_common::hash::WorkerSlotId; use risingwave_pb::common::WorkerNode; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; +use crate::hash::{VirtualNode, WorkerSlotMapping}; /// Calculate a new vnode mapping, keeping locality and balance on a best effort basis. /// The strategy is similar to `rebalance_actor_vnode` used in meta node, but is modified to /// consider `max_parallelism` too. pub fn place_vnode( - hint_pu_mapping: Option<&ParallelUnitMapping>, - new_workers: &[WorkerNode], + hint_worker_slot_mapping: Option<&WorkerSlotMapping>, + workers: &[WorkerNode], max_parallelism: Option, -) -> Option { - // Get all serving parallel units from all available workers, grouped by worker id and ordered - // by parallel unit id in each group. - let mut new_pus: LinkedList<_> = new_workers +) -> Option { + // Get all serving worker slots from all available workers, grouped by worker id and ordered + // by worker slot id in each group. + let mut worker_slots: LinkedList<_> = workers .iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) .sorted_by_key(|w| w.id) - .map(|w| w.parallel_units.clone().into_iter().sorted_by_key(|p| p.id)) + .map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlotId::new(w.id, idx))) .collect(); - // Set serving parallelism to the minimum of total number of parallel units, specified + // Set serving parallelism to the minimum of total number of worker slots, specified // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( - new_pus.iter().map(|pus| pus.len()).sum(), + worker_slots.iter().map(|slots| slots.len()).sum(), std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT), ); - // Select `serving_parallelism` parallel units in a round-robin fashion, to distribute workload + // Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload // evenly among workers. - let mut selected_pu_ids = Vec::new(); - while !new_pus.is_empty() { - new_pus - .extract_if(|ps| { - if let Some(p) = ps.next() { - selected_pu_ids.push(p.id); + let mut selected_slots = Vec::new(); + while !worker_slots.is_empty() { + worker_slots + .extract_if(|slots| { + if let Some(slot) = slots.next() { + selected_slots.push(slot); false } else { true @@ -60,57 +62,61 @@ pub fn place_vnode( }) .for_each(drop); } - selected_pu_ids.drain(serving_parallelism..); - let selected_pu_id_set: HashSet = selected_pu_ids.iter().cloned().collect(); - if selected_pu_id_set.is_empty() { + selected_slots.drain(serving_parallelism..); + let selected_slots_set: HashSet = selected_slots.iter().cloned().collect(); + if selected_slots_set.is_empty() { return None; } - // Calculate balance for each selected parallel unit. Initially, each parallel unit is assigned + // Calculate balance for each selected worker slot. Initially, each worker slot is assigned // no vnodes. Thus its negative balance means that many vnodes should be assigned to it later. - // `is_temp` is a mark for a special temporary parallel unit, only to simplify implementation. + // `is_temp` is a mark for a special temporary worker slot, only to simplify implementation. #[derive(Debug)] struct Balance { - pu_id: ParallelUnitId, + slot: WorkerSlotId, balance: i32, builder: BitmapBuilder, is_temp: bool, } - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_pu_ids.len()); - let mut balances: HashMap = HashMap::default(); - for pu_id in &selected_pu_ids { + + let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_slots.len()); + let mut balances: HashMap = HashMap::default(); + + for slot in &selected_slots { let mut balance = Balance { - pu_id: *pu_id, + slot: *slot, balance: -(expected as i32), builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: false, }; + if remain > 0 { balance.balance -= 1; remain -= 1; } - balances.insert(*pu_id, balance); + balances.insert(*slot, balance); } - // Now to maintain affinity, if a hint has been provided via `hint_pu_mapping`, follow + // Now to maintain affinity, if a hint has been provided via `hint_worker_slot_mapping`, follow // that mapping to adjust balances. - let mut temp_pu = Balance { - pu_id: 0, // This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. + let mut temp_slot = Balance { + slot: WorkerSlotId::new(0u32, usize::MAX), /* This id doesn't matter for `temp_slot`. It's distinguishable via `is_temp`. */ balance: 0, builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: true, }; - match hint_pu_mapping { - Some(hint_pu_mapping) => { - for (vnode, pu_id) in hint_pu_mapping.iter_with_vnode() { - let b = if selected_pu_id_set.contains(&pu_id) { - // Assign vnode to the same parallel unit as hint. - balances.get_mut(&pu_id).unwrap() + match hint_worker_slot_mapping { + Some(hint_worker_slot_mapping) => { + for (vnode, worker_slot) in hint_worker_slot_mapping.iter_with_vnode() { + let b = if selected_slots_set.contains(&worker_slot) { + // Assign vnode to the same worker slot as hint. + balances.get_mut(&worker_slot).unwrap() } else { - // Assign vnode that doesn't belong to any parallel unit to `temp_pu` + // Assign vnode that doesn't belong to any worker slot to `temp_slot` // temporarily. They will be reassigned later. - &mut temp_pu + &mut temp_slot }; + b.balance += 1; b.builder.set(vnode.to_index(), true); } @@ -118,31 +124,33 @@ pub fn place_vnode( None => { // No hint is provided, assign all vnodes to `temp_pu`. for vnode in VirtualNode::all() { - temp_pu.balance += 1; - temp_pu.builder.set(vnode.to_index(), true); + temp_slot.balance += 1; + temp_slot.builder.set(vnode.to_index(), true); } } } - // The final step is to move vnodes from parallel units with positive balance to parallel units - // with negative balance, until all parallel units are of 0 balance. - // A double-ended queue with parallel units ordered by balance in descending order is consumed: - // 1. Peek 2 parallel units from front and back. + // The final step is to move vnodes from worker slots with positive balance to worker slots + // with negative balance, until all worker slots are of 0 balance. + // A double-ended queue with worker slots ordered by balance in descending order is consumed: + // 1. Peek 2 worker slots from front and back. // 2. It any of them is of 0 balance, pop it and go to step 1. // 3. Otherwise, move vnodes from front to back. let mut balances: VecDeque<_> = balances .into_values() - .chain(std::iter::once(temp_pu)) + .chain(std::iter::once(temp_slot)) .sorted_by_key(|b| b.balance) .rev() .collect(); - let mut results: HashMap = HashMap::default(); + + let mut results: HashMap = HashMap::default(); + while !balances.is_empty() { if balances.len() == 1 { let single = balances.pop_front().unwrap(); assert_eq!(single.balance, 0); if !single.is_temp { - results.insert(single.pu_id, single.builder.finish()); + results.insert(single.slot, single.builder.finish()); } break; } @@ -166,32 +174,42 @@ pub fn place_vnode( if src.balance != 0 { balances.push_front(src); } else if !src.is_temp { - results.insert(src.pu_id, src.builder.finish()); + results.insert(src.slot, src.builder.finish()); } if dst.balance != 0 { balances.push_back(dst); } else if !dst.is_temp { - results.insert(dst.pu_id, dst.builder.finish()); + results.insert(dst.slot, dst.builder.finish()); } } - Some(ParallelUnitMapping::from_bitmaps(&results)) + let mut worker_result = HashMap::new(); + + for (worker_slot, bitmap) in results { + worker_result + .entry(worker_slot) + .or_insert(BitmapBuilder::zeroed(VirtualNode::COUNT).finish()) + .bitor_assign(&bitmap); + } + + Some(WorkerSlotMapping::from_bitmaps(&worker_result)) } #[cfg(test)] mod tests { use std::collections::HashMap; + use risingwave_common::hash::WorkerSlotMapping; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{ParallelUnit, WorkerNode}; - use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; + use crate::hash::{ParallelUnitId, VirtualNode}; use crate::vnode_mapping::vnode_placement::place_vnode; - #[test] fn test_place_vnode() { assert_eq!(VirtualNode::COUNT, 256); + let mut pu_id_counter: ParallelUnitId = 0; let mut pu_to_worker: HashMap = Default::default(); let serving_property = Property { @@ -216,13 +234,13 @@ mod tests { results }; - let count_same_vnode_mapping = |pm1: &ParallelUnitMapping, pm2: &ParallelUnitMapping| { - assert_eq!(pm1.len(), 256); - assert_eq!(pm2.len(), 256); + let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| { + assert_eq!(wm1.len(), 256); + assert_eq!(wm2.len(), 256); let mut count: usize = 0; for idx in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(idx); - if pm1.get(vnode) == pm2.get(vnode) { + if wm1.get(vnode) == wm2.get(vnode) { count += 1; } } @@ -235,29 +253,32 @@ mod tests { property: Some(serving_property.clone()), ..Default::default() }; + assert!( place_vnode(None, &[worker_1.clone()], Some(0)).is_none(), "max_parallelism should >= 0" ); - let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); - assert_eq!(re_pu_mapping_2.iter_unique().count(), 1); + let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); + assert_eq!(re_worker_mapping_2.iter_unique().count(), 1); + let worker_2 = WorkerNode { id: 2, parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker), property: Some(serving_property.clone()), ..Default::default() }; - let re_pu_mapping = place_vnode( - Some(&re_pu_mapping_2), + + let re_worker_mapping = place_vnode( + Some(&re_worker_mapping_2), &[worker_1.clone(), worker_2.clone()], None, ) .unwrap(); - assert_eq!(re_pu_mapping.iter_unique().count(), 51); + assert_eq!(re_worker_mapping.iter_unique().count(), 51); // 1 * 256 + 0 -> 51 * 5 + 1 - let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); + let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping); assert!(score >= 5); let worker_3 = WorkerNode { @@ -267,7 +288,7 @@ mod tests { ..Default::default() }; let re_pu_mapping_2 = place_vnode( - Some(&re_pu_mapping), + Some(&re_worker_mapping), &[worker_1.clone(), worker_2.clone(), worker_3.clone()], None, ) @@ -276,7 +297,7 @@ mod tests { // limited by total pu number assert_eq!(re_pu_mapping_2.iter_unique().count(), 111); // 51 * 5 + 1 -> 111 * 2 + 34 - let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); + let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_worker_mapping); assert!(score >= (2 + 50 * 2)); let re_pu_mapping = place_vnode( Some(&re_pu_mapping_2), diff --git a/src/ctl/src/cmd_impl/meta/serving.rs b/src/ctl/src/cmd_impl/meta/serving.rs index c6c5d3cf8198..cd8b1be50e20 100644 --- a/src/ctl/src/cmd_impl/meta/serving.rs +++ b/src/ctl/src/cmd_impl/meta/serving.rs @@ -16,30 +16,26 @@ use std::collections::HashMap; use comfy_table::{Row, Table}; use itertools::Itertools; -use risingwave_common::hash::{ParallelUnitId, VirtualNode}; -use risingwave_pb::common::{WorkerNode, WorkerType}; +use risingwave_common::hash::VirtualNode; +use risingwave_pb::common::WorkerType; use crate::CtlContext; pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let mappings = meta_client.list_serving_vnode_mappings().await?; - let workers = meta_client + let workers: HashMap<_, _> = meta_client .list_worker_nodes(Some(WorkerType::ComputeNode)) - .await?; - let mut pu_to_worker: HashMap = HashMap::new(); - for w in &workers { - for pu in &w.parallel_units { - pu_to_worker.insert(pu.id, w); - } - } + .await? + .into_iter() + .map(|worker| (worker.id, worker)) + .collect(); let mut table = Table::new(); table.set_header({ let mut row = Row::new(); row.add_cell("Table Id".into()); row.add_cell("Fragment Id".into()); - row.add_cell("Parallel Unit Id".into()); row.add_cell("Virtual Node".into()); row.add_cell("Worker".into()); row @@ -48,28 +44,25 @@ pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Res let rows = mappings .iter() .flat_map(|(fragment_id, (table_id, mapping))| { - let mut pu_vnodes: HashMap> = HashMap::new(); - for (vnode, pu) in mapping.iter_with_vnode() { - pu_vnodes.entry(pu).or_default().push(vnode); + let mut worker_nodes: HashMap> = HashMap::new(); + for (vnode, worker_slot_id) in mapping.iter_with_vnode() { + worker_nodes + .entry(worker_slot_id.worker_id()) + .or_default() + .push(vnode); } - pu_vnodes.into_iter().map(|(pu_id, vnodes)| { - ( - *table_id, - *fragment_id, - pu_id, - vnodes, - pu_to_worker.get(&pu_id), - ) + worker_nodes.into_iter().map(|(worker_id, vnodes)| { + (*table_id, *fragment_id, vnodes, workers.get(&worker_id)) }) }) .collect_vec(); - for (table_id, fragment_id, pu_id, vnodes, worker) in - rows.into_iter().sorted_by_key(|(t, f, p, ..)| (*t, *f, *p)) + + for (table_id, fragment_id, vnodes, worker) in + rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f)) { let mut row = Row::new(); row.add_cell(table_id.into()); row.add_cell(fragment_id.into()); - row.add_cell(pu_id.into()); row.add_cell( format!( "{} in total: {}", diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index ddf6ca489bf0..f864f9608bdb 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use parking_lot::RwLock; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::catalog::CatalogVersion; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend}; @@ -27,7 +27,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{FragmentParallelUnitMapping, MetaSnapshot, SubscribeResponse}; +use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::sync::watch::Sender; @@ -72,7 +72,6 @@ impl ObserverState for FrontendObserverNode { Info::User(_) => { self.handle_user_notification(resp); } - Info::ParallelUnitMapping(_) => self.handle_fragment_mapping_notification(resp), Info::Snapshot(_) => { panic!( "receiving a snapshot in the middle is unsupported now {:?}", @@ -103,8 +102,9 @@ impl ObserverState for FrontendObserverNode { Info::HummockStats(stats) => { self.handle_table_stats_notification(stats); } - Info::ServingParallelUnitMappings(m) => { - self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()); + Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp), + Info::ServingWorkerSlotMappings(m) => { + self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()) } Info::Recovery(_) => { self.compute_client_pool.invalidate_all(); @@ -133,13 +133,13 @@ impl ObserverState for FrontendObserverNode { functions, connections, users, - parallel_unit_mappings, - serving_parallel_unit_mappings, nodes, hummock_snapshot, hummock_version: _, meta_backup_manifest_id: _, hummock_write_limits: _, + streaming_worker_slot_mappings, + serving_worker_slot_mappings, session_params, version, } = snapshot; @@ -177,10 +177,11 @@ impl ObserverState for FrontendObserverNode { for user in users { user_guard.create_user(user) } + self.worker_node_manager.refresh( nodes, - convert_pu_mapping(¶llel_unit_mappings), - convert_pu_mapping(&serving_parallel_unit_mappings), + convert_worker_slot_mapping(&streaming_worker_slot_mappings), + convert_worker_slot_mapping(&serving_worker_slot_mappings), ); self.hummock_snapshot_manager .update(hummock_snapshot.unwrap()); @@ -387,11 +388,11 @@ impl FrontendObserverNode { return; }; match info { - Info::ParallelUnitMapping(parallel_unit_mapping) => { - let fragment_id = parallel_unit_mapping.fragment_id; + Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => { + let fragment_id = streaming_worker_slot_mapping.fragment_id; let mapping = || { - ParallelUnitMapping::from_protobuf( - parallel_unit_mapping.mapping.as_ref().unwrap(), + WorkerSlotMapping::from_protobuf( + streaming_worker_slot_mapping.mapping.as_ref().unwrap(), ) }; @@ -417,20 +418,20 @@ impl FrontendObserverNode { fn handle_fragment_serving_mapping_notification( &mut self, - mappings: Vec, + mappings: Vec, op: Operation, ) { match op { Operation::Add | Operation::Update => { self.worker_node_manager - .upsert_serving_fragment_mapping(convert_pu_mapping(&mappings)); + .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings)); } Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping( &mappings.into_iter().map(|m| m.fragment_id).collect_vec(), ), Operation::Snapshot => { self.worker_node_manager - .set_serving_fragment_mapping(convert_pu_mapping(&mappings)); + .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings)); } _ => panic!("receive an unsupported notify {:?}", op), } @@ -470,17 +471,17 @@ impl FrontendObserverNode { } } -fn convert_pu_mapping( - parallel_unit_mappings: &[FragmentParallelUnitMapping], -) -> HashMap { - parallel_unit_mappings +fn convert_worker_slot_mapping( + worker_slot_mappings: &[FragmentWorkerSlotMapping], +) -> HashMap { + worker_slot_mappings .iter() .map( - |FragmentParallelUnitMapping { + |FragmentWorkerSlotMapping { fragment_id, mapping, }| { - let mapping = ParallelUnitMapping::from_protobuf(mapping.as_ref().unwrap()); + let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap()); (*fragment_id, mapping) }, ) diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 358ad934a9cc..cf02daac47d8 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -51,7 +51,7 @@ use generic::PhysicalPlanRef; use itertools::Itertools; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::{FieldDisplay, Schema, TableId}; -use risingwave_common::hash::ParallelUnitId; +use risingwave_common::hash::WorkerSlotId; use risingwave_pb::batch_plan::exchange_info::{ ConsistentHashInfo, Distribution as DistributionPb, DistributionMode, HashInfo, }; @@ -148,14 +148,17 @@ impl Distribution { let vnode_mapping = worker_node_manager .fragment_mapping(Self::get_fragment_id(catalog_reader, table_id)?)?; - let pu2id_map: HashMap = vnode_mapping + let worker_slot_to_id_map: HashMap = vnode_mapping .iter_unique() .enumerate() - .map(|(i, pu)| (pu, i as u32)) + .map(|(i, worker_slot_id)| (worker_slot_id, i as u32)) .collect(); Some(DistributionPb::ConsistentHashInfo(ConsistentHashInfo { - vmap: vnode_mapping.iter().map(|x| pu2id_map[&x]).collect_vec(), + vmap: vnode_mapping + .iter() + .map(|id| worker_slot_to_id_map[&id]) + .collect_vec(), key: key.iter().map(|num| *num as u32).collect(), })) } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index b556bc3af6c8..165bdcee6476 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -476,7 +476,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::ParallelUnitMapping; + use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType}; @@ -721,10 +721,12 @@ pub(crate) mod tests { let workers = vec![worker1, worker2, worker3]; let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers)); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); - worker_node_manager - .insert_streaming_fragment_mapping(0, ParallelUnitMapping::new_single(0)); + worker_node_manager.insert_streaming_fragment_mapping( + 0, + WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)), + ); worker_node_manager.set_serving_fragment_mapping( - vec![(0, ParallelUnitMapping::new_single(0))] + vec![(0, WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)))] .into_iter() .collect(), ); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 59294169220e..c727d2e61b1f 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -32,7 +32,7 @@ use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::source::SplitMetaData; @@ -172,7 +172,7 @@ impl StageExecution { ctx: ExecutionContextRef, ) -> Self { let tasks = (0..stage.parallelism.unwrap()) - .map(|task_id| (task_id, TaskStatusHolder::new(task_id))) + .map(|task_id| (task_id as u64, TaskStatusHolder::new(task_id as u64))) .collect(); Self { @@ -289,7 +289,7 @@ impl StageExecution { /// /// When this method is called, all tasks should have been scheduled, and their `worker_node` /// should have been set. - pub fn all_exchange_sources_for(&self, output_id: u32) -> Vec { + pub fn all_exchange_sources_for(&self, output_id: u64) -> Vec { self.tasks .iter() .map(|(task_id, status_holder)| { @@ -353,12 +353,13 @@ impl StageRunner { // We let each task read one partition by setting the `vnode_ranges` of the scan node in // the task. // We schedule the task to the worker node that owns the data partition. - let parallel_unit_ids = vnode_bitmaps.keys().cloned().collect_vec(); + let worker_slot_ids = vnode_bitmaps.keys().cloned().collect_vec(); let workers = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; - for (i, (parallel_unit_id, worker)) in parallel_unit_ids + .get_workers_by_worker_slot_ids(&worker_slot_ids)?; + + for (i, (worker_slot_id, worker)) in worker_slot_ids .into_iter() .zip_eq_fast(workers.into_iter()) .enumerate() @@ -366,11 +367,11 @@ impl StageRunner { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: i as u32, + task_id: i as u64, }; - let vnode_ranges = vnode_bitmaps[¶llel_unit_id].clone(); + let vnode_ranges = vnode_bitmaps[&worker_slot_id].clone(); let plan_fragment = - self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); + self.create_plan_fragment(i as u64, Some(PartitionInfo::Table(vnode_ranges))); futures.push(self.schedule_task( task_id, plan_fragment, @@ -391,10 +392,10 @@ impl StageRunner { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: id as u32, + task_id: id as u64, }; let plan_fragment = self - .create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.to_vec()))); + .create_plan_fragment(id as u64, Some(PartitionInfo::Source(split.to_vec()))); let worker = self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; futures.push(self.schedule_task( @@ -409,9 +410,9 @@ impl StageRunner { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: id, + task_id: id as u64, }; - let plan_fragment = self.create_plan_fragment(id, None); + let plan_fragment = self.create_plan_fragment(id as u64, None); let worker = self.choose_worker(&plan_fragment, id, self.stage.dml_table_id)?; futures.push(self.schedule_task( task_id, @@ -682,7 +683,7 @@ impl StageRunner { fn get_table_dml_vnode_mapping( &self, table_id: &TableId, - ) -> SchedulerResult { + ) -> SchedulerResult { let guard = self.catalog_reader.read_guard(); let table = guard @@ -711,11 +712,11 @@ impl StageRunner { if let Some(table_id) = dml_table_id { let vnode_mapping = self.get_table_dml_vnode_mapping(&table_id)?; - let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); + let worker_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + .get_workers_by_worker_slot_ids(&worker_ids)?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } @@ -741,17 +742,17 @@ impl StageRunner { .table_id .into(), )?; - let id2pu_vec = self + let id_to_worker_slots = self .worker_node_manager .fragment_mapping(fragment_id)? .iter_unique() .collect_vec(); - let pu = id2pu_vec[task_id as usize]; + let worker_slot_id = id_to_worker_slots[task_id as usize]; let candidates = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(&[pu])?; + .get_workers_by_worker_slot_ids(&[worker_slot_id])?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 0ff7540a79d7..89104cc895f7 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -30,7 +30,7 @@ use risingwave_batch::task::{ShutdownToken, TaskId}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_common::bail; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::tracing::{InstrumentStream, TracingContext}; use risingwave_connector::source::SplitMetaData; @@ -313,12 +313,12 @@ impl LocalQueryExecution { // Similar to the distributed case (StageRunner::schedule_tasks). // Set `vnode_ranges` of the scan node in `local_execute_plan` of each // `exchange_source`. - let (parallel_unit_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = + let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = vnode_bitmaps.clone().into_iter().unzip(); let workers = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + .get_workers_by_worker_slot_ids(&worker_ids)?; for (idx, (worker_node, partition)) in (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate() { @@ -343,7 +343,7 @@ impl LocalQueryExecution { let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { task_id: Some(PbTaskId { - task_id: idx as u32, + task_id: idx as u64, stage_id: exchange_source_stage_id, query_id: self.query.query_id.id.clone(), }), @@ -389,7 +389,7 @@ impl LocalQueryExecution { let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { task_id: Some(PbTaskId { - task_id: id as u32, + task_id: id as u64, stage_id: exchange_source_stage_id, query_id: self.query.query_id.id.clone(), }), @@ -429,7 +429,7 @@ impl LocalQueryExecution { let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { task_id: Some(PbTaskId { - task_id: idx as u32, + task_id: idx as u64, stage_id: exchange_source_stage_id, query_id: self.query.query_id.id.clone(), }), @@ -531,7 +531,8 @@ impl LocalQueryExecution { )?; // TODO: should we use `pb::ParallelUnitMapping` here? - node.inner_side_vnode_mapping = mapping.to_expanded(); + node.inner_side_vnode_mapping = + mapping.to_expanded().into_iter().map(u64::from).collect(); node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes(); } _ => unreachable!(), @@ -586,7 +587,7 @@ impl LocalQueryExecution { fn get_table_dml_vnode_mapping( &self, table_id: &TableId, - ) -> SchedulerResult { + ) -> SchedulerResult { let guard = self.front_env.catalog_reader().read_guard(); let table = guard @@ -610,11 +611,11 @@ impl LocalQueryExecution { // dml should use streaming vnode mapping let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?; let worker_node = { - let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); + let worker_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + .get_workers_by_worker_slot_ids(&worker_ids)?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 06237c5756fb..74ade3fdab83 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -30,7 +30,7 @@ use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; -use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; +use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; @@ -75,10 +75,10 @@ impl std::fmt::Display for QueryId { pub type StageId = u32; // Root stage always has only one task. -pub const ROOT_TASK_ID: u32 = 0; +pub const ROOT_TASK_ID: u64 = 0; // Root task has only one output. -pub const ROOT_TASK_OUTPUT_ID: u32 = 0; -pub type TaskId = u32; +pub const ROOT_TASK_OUTPUT_ID: u64 = 0; +pub type TaskId = u64; /// Generated by [`BatchPlanFragmenter`] and used in query execution graph. #[derive(Clone, Debug)] @@ -392,12 +392,12 @@ pub struct TableScanInfo { /// full vnode bitmap, since we need to know where to schedule the singleton scan task. /// /// `None` iff the table is a system table. - partitions: Option>, + partitions: Option>, } impl TableScanInfo { /// For normal tables, `partitions` should always be `Some`. - pub fn new(name: String, partitions: HashMap) -> Self { + pub fn new(name: String, partitions: HashMap) -> Self { Self { name, partitions: Some(partitions), @@ -416,7 +416,7 @@ impl TableScanInfo { self.name.as_ref() } - pub fn partitions(&self) -> Option<&HashMap> { + pub fn partitions(&self) -> Option<&HashMap> { self.partitions.as_ref() } } @@ -1158,10 +1158,10 @@ impl BatchPlanFragmenter { fn derive_partitions( scan_ranges: &[ScanRange], table_desc: &TableDesc, - vnode_mapping: &ParallelUnitMapping, -) -> SchedulerResult> { + vnode_mapping: &WorkerSlotMapping, +) -> SchedulerResult> { let num_vnodes = vnode_mapping.len(); - let mut partitions: HashMap)> = HashMap::new(); + let mut partitions: HashMap)> = HashMap::new(); if scan_ranges.is_empty() { return Ok(vnode_mapping @@ -1190,9 +1190,9 @@ fn derive_partitions( None => { // put this scan_range to all partitions vnode_mapping.to_bitmaps().into_iter().for_each( - |(parallel_unit_id, vnode_bitmap)| { + |(worker_slot_id, vnode_bitmap)| { let (bitmap, scan_ranges) = partitions - .entry(parallel_unit_id) + .entry(worker_slot_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); vnode_bitmap .iter() @@ -1204,9 +1204,9 @@ fn derive_partitions( } // scan a single partition Some(vnode) => { - let parallel_unit_id = vnode_mapping[vnode]; + let worker_slot_id = vnode_mapping[vnode]; let (bitmap, scan_ranges) = partitions - .entry(parallel_unit_id) + .entry(worker_slot_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); bitmap.set(vnode.to_index(), true); scan_ranges.push(scan_range.to_protobuf()); diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index e9a5e4a017ad..f3fec987c2bf 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -24,7 +24,7 @@ use risingwave_pb::hummock::WriteLimits; use risingwave_pb::meta::meta_snapshot::SnapshotVersion; use risingwave_pb::meta::notification_service_server::NotificationService; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest, + FragmentWorkerSlotMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest, SubscribeType, }; use risingwave_pb::user::UserInfo; @@ -138,9 +138,9 @@ impl NotificationServiceImpl { } } - async fn get_parallel_unit_mapping_snapshot( + async fn get_worker_slot_mapping_snapshot( &self, - ) -> MetaResult<(Vec, NotificationVersion)> { + ) -> MetaResult<(Vec, NotificationVersion)> { match &self.metadata_manager { MetadataManager::V1(mgr) => { let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await; @@ -161,11 +161,11 @@ impl NotificationServiceImpl { } } - fn get_serving_vnode_mappings(&self) -> Vec { + fn get_serving_vnode_mappings(&self) -> Vec { self.serving_vnode_mapping .all() .iter() - .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping { fragment_id: *fragment_id, mapping: Some(mapping.to_protobuf()), }) @@ -241,9 +241,11 @@ impl NotificationServiceImpl { users, catalog_version, ) = self.get_catalog_snapshot().await?; - let (parallel_unit_mappings, parallel_unit_mapping_version) = - self.get_parallel_unit_mapping_snapshot().await?; - let serving_parallel_unit_mappings = self.get_serving_vnode_mappings(); + + let (streaming_worker_slot_mappings, streaming_worker_slot_mapping_version) = + self.get_worker_slot_mapping_snapshot().await?; + let serving_worker_slot_mappings = self.get_serving_vnode_mappings(); + let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?; let hummock_snapshot = Some(self.hummock_manager.latest_snapshot()); @@ -270,15 +272,15 @@ impl NotificationServiceImpl { functions, connections, users, - parallel_unit_mappings, nodes, hummock_snapshot, - serving_parallel_unit_mappings, version: Some(SnapshotVersion { catalog_version, - parallel_unit_mapping_version, worker_node_version, + streaming_worker_slot_mapping_version, }), + serving_worker_slot_mappings, + streaming_worker_slot_mappings, session_params, ..Default::default() }) diff --git a/src/meta/service/src/serving_service.rs b/src/meta/service/src/serving_service.rs index d1b013e078e0..92c5fd9db483 100644 --- a/src/meta/service/src/serving_service.rs +++ b/src/meta/service/src/serving_service.rs @@ -16,7 +16,7 @@ use itertools::Itertools; use risingwave_meta::manager::MetadataManager; use risingwave_pb::meta::serving_service_server::ServingService; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse, + FragmentWorkerSlotMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse, }; use tonic::{Request, Response, Status}; @@ -49,7 +49,7 @@ impl ServingService for ServingServiceImpl { .serving_vnode_mapping .all() .into_iter() - .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping { fragment_id, mapping: Some(mapping.to_protobuf()), }) @@ -78,8 +78,8 @@ impl ServingService for ServingServiceImpl { } }; Ok(Response::new(GetServingVnodeMappingsResponse { - mappings, fragment_to_table, + worker_slot_mappings: mappings, })) } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index e6c77dfb1b1f..cb3e386c34fc 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -45,7 +46,9 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::{PbRelation, PbRelationGroup}; +use risingwave_pb::meta::{ + FragmentParallelUnitMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, +}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; @@ -64,9 +67,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - get_fragment_mappings_by_jobs, get_referring_objects, get_referring_objects_cascade, - get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs, - PartialObject, + get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -275,8 +278,30 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; + let fragment_mappings = fragment_mappings + .into_iter() + .map( + |FragmentParallelUnitMapping { + fragment_id, + mapping, + }| { + PbFragmentWorkerSlotMapping { + fragment_id, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.unwrap()) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf(), + ), + } + }, + ) + .collect(); + // The schema and objects in the database will be delete cascade. let res = Object::delete_by_id(database_id).exec(&txn).await?; if res.rows_affected == 0 { @@ -296,6 +321,7 @@ impl CatalogController { }), ) .await; + self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) .await; Ok(( @@ -1973,6 +1999,7 @@ impl CatalogController { let (source_fragments, removed_actors) = resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; @@ -1999,6 +2026,8 @@ impl CatalogController { } let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + txn.commit().await?; // notify about them. @@ -2073,6 +2102,26 @@ impl CatalogController { NotificationInfo::RelationGroup(PbRelationGroup { relations }), ) .await; + + let fragment_mappings = fragment_mappings + .into_iter() + .map( + |FragmentParallelUnitMapping { + fragment_id, + mapping, + }| { + PbFragmentWorkerSlotMapping { + fragment_id, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.unwrap()) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf(), + ), + } + }, + ) + .collect(); + self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) .await; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 7bf5b02f971a..fa5377374148 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -18,6 +18,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; @@ -34,7 +35,7 @@ use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbTableFragments, + FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbTableFragments, }; use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; @@ -50,7 +51,8 @@ use sea_orm::{ use crate::controller::catalog::{CatalogController, CatalogControllerInner}; use crate::controller::utils::{ - get_actor_dispatchers, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, + get_actor_dispatchers, get_parallel_unit_to_worker_map, FragmentDesc, PartialActorLocation, + PartialFragmentStateTables, }; use crate::manager::{ActorInfos, LocalNotification}; use crate::model::TableParallelism; @@ -61,7 +63,9 @@ impl CatalogControllerInner { /// List all fragment vnode mapping info for all CREATED streaming jobs. pub async fn all_running_fragment_mappings( &self, - ) -> MetaResult + '_> { + ) -> MetaResult + '_> { + let txn = self.db.begin().await?; + let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .join(JoinType::InnerJoin, fragment::Relation::Object.def()) .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) @@ -69,14 +73,24 @@ impl CatalogControllerInner { .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .into_tuple() - .all(&self.db) + .all(&txn) .await?; - Ok(fragment_mappings.into_iter().map(|(fragment_id, mapping)| { - FragmentParallelUnitMapping { - fragment_id: fragment_id as _, - mapping: Some(mapping.to_protobuf()), - } - })) + + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + + Ok(fragment_mappings + .into_iter() + .map(move |(fragment_id, mapping)| { + let worker_slot_mapping = + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf(); + + FragmentWorkerSlotMapping { + fragment_id: fragment_id as _, + mapping: Some(worker_slot_mapping), + } + })) } } @@ -84,7 +98,7 @@ impl CatalogController { pub(crate) async fn notify_fragment_mapping( &self, operation: NotificationOperation, - fragment_mappings: Vec, + fragment_mappings: Vec, ) { let fragment_ids = fragment_mappings .iter() @@ -96,7 +110,7 @@ impl CatalogController { .notification_manager() .notify_frontend( operation, - NotificationInfo::ParallelUnitMapping(fragment_mapping), + NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping), ) .await; } @@ -936,15 +950,21 @@ impl CatalogController { .await?; } + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + txn.commit().await?; self.notify_fragment_mapping( NotificationOperation::Update, fragment_mapping .into_iter() - .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.to_protobuf()), + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf(), + ), }) .collect(), ) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index db9c1d03eab4..8c597041e2f7 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -43,7 +43,7 @@ use risingwave_pb::meta::subscribe_response::{ }; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbRelation, PbRelationGroup, + FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation, }; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; @@ -66,7 +66,7 @@ use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id, - get_fragment_actor_ids, get_fragment_mappings, + get_fragment_actor_ids, get_fragment_mappings, get_parallel_unit_to_worker_map, }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; @@ -803,7 +803,7 @@ impl CatalogController { dropping_sink_id: Option, txn: &DatabaseTransaction, streaming_job: StreamingJob, - ) -> MetaResult<(Vec, Vec)> { + ) -> MetaResult<(Vec, Vec)> { // Question: The source catalog should be remain unchanged? let StreamingJob::Table(_, table, ..) = streaming_job else { unreachable!("unexpected job: {streaming_job:?}") @@ -1007,8 +1007,7 @@ impl CatalogController { } } - let fragment_mapping: Vec = - get_fragment_mappings(txn, job_id as _).await?; + let fragment_mapping: Vec<_> = get_fragment_mappings(txn, job_id as _).await?; Ok((relations, fragment_mapping)) } @@ -1224,6 +1223,8 @@ impl CatalogController { let txn = inner.db.begin().await?; + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let mut fragment_mapping_to_notify = vec![]; // for assert only @@ -1399,9 +1400,13 @@ impl CatalogController { fragment.vnode_mapping = Set((&vnode_mapping).into()); fragment.update(&txn).await?; - fragment_mapping_to_notify.push(FragmentParallelUnitMapping { + let worker_slot_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf(); + + fragment_mapping_to_notify.push(FragmentWorkerSlotMapping { fragment_id: fragment_id as u32, - mapping: Some(vnode_mapping), + mapping: Some(worker_slot_mapping), }); // for downstream and upstream diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 7d8f4769e826..b65405070854 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; @@ -24,11 +25,11 @@ use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, subscription, table, user, user_privilege, view, - ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, - PrivilegeId, SchemaId, SourceId, StreamNode, UserId, + worker_property, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, + I32Array, ObjectId, PrivilegeId, SchemaId, SourceId, StreamNode, UserId, WorkerId, }; use risingwave_pb::catalog::{PbConnection, PbFunction, PbSubscription}; -use risingwave_pb::meta::PbFragmentParallelUnitMapping; +use risingwave_pb::meta::{PbFragmentParallelUnitMapping, PbFragmentWorkerSlotMapping}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; @@ -815,10 +816,12 @@ where pub async fn get_fragment_mappings( db: &C, job_id: ObjectId, -) -> MetaResult> +) -> MetaResult> where C: ConnectionTrait, { + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(db).await?; + let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .select_only() .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) @@ -829,9 +832,13 @@ where Ok(fragment_mappings .into_iter() - .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.to_protobuf()), + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf(), + ), }) .collect()) } @@ -948,3 +955,30 @@ where Ok((source_fragment_ids, actors.into_iter().collect())) } + +pub(crate) async fn get_parallel_unit_to_worker_map(db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let worker_parallel_units = WorkerProperty::find() + .select_only() + .columns([ + worker_property::Column::WorkerId, + worker_property::Column::ParallelUnitIds, + ]) + .into_tuple::<(WorkerId, I32Array)>() + .all(db) + .await?; + + let parallel_unit_to_worker = worker_parallel_units + .into_iter() + .flat_map(|(worker_id, parallel_unit_ids)| { + parallel_unit_ids + .into_inner() + .into_iter() + .map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32)) + }) + .collect::>(); + + Ok(parallel_unit_to_worker) +} diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 92c2d11c01b5..9cb0d25a09ae 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -26,10 +26,11 @@ use risingwave_common::util::stream_graph_visitor::{ }; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; +use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerSlotMapping}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; -use risingwave_pb::meta::FragmentParallelUnitMapping; +use risingwave_pb::meta::FragmentWorkerSlotMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ @@ -58,18 +59,21 @@ impl FragmentManagerCore { /// List all fragment vnode mapping info that not in `State::Initial`. pub fn all_running_fragment_mappings( &self, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { self.table_fragments .values() .filter(|tf| tf.state() != State::Initial) .flat_map(|table_fragments| { - table_fragments.fragments.values().map(|fragment| { - let parallel_unit_mapping = fragment.vnode_mapping.clone().unwrap(); - FragmentParallelUnitMapping { + table_fragments + .fragments + .values() + .map(move |fragment| FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some(parallel_unit_mapping), - } - }) + mapping: Some(FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + )), + }) }) } @@ -194,18 +198,23 @@ impl FragmentManager { async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let mapping = fragment - .vnode_mapping - .clone() - .expect("no data distribution found"); - let fragment_mapping = FragmentParallelUnitMapping { + let fragment_mapping = FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some(mapping), + mapping: Some(Self::convert_mapping( + &table_fragment.actor_status, + fragment + .vnode_mapping + .as_ref() + .expect("no data distribution found"), + )), }; self.env .notification_manager() - .notify_frontend(operation, Info::ParallelUnitMapping(fragment_mapping)) + .notify_frontend( + operation, + Info::StreamingWorkerSlotMapping(fragment_mapping), + ) .await; } @@ -1264,11 +1273,14 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); + let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); + // Notify fragment mapping to frontend nodes. - let fragment_mapping = FragmentParallelUnitMapping { + let fragment_mapping = FragmentWorkerSlotMapping { fragment_id: *fragment_id as FragmentId, - mapping: Some(vnode_mapping), + mapping: Some(worker_slot_mapping), }; + fragment_mapping_to_notify.push(fragment_mapping); } @@ -1388,13 +1400,30 @@ impl FragmentManager { for mapping in fragment_mapping_to_notify { self.env .notification_manager() - .notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping)) + .notify_frontend(Operation::Update, Info::StreamingWorkerSlotMapping(mapping)) .await; } Ok(()) } + fn convert_mapping( + actor_status: &BTreeMap, + vnode_mapping: &PbParallelUnitMapping, + ) -> PbWorkerSlotMapping { + let parallel_unit_to_worker = actor_status + .values() + .map(|actor_status| { + let parallel_unit = actor_status.get_parallel_unit().unwrap(); + (parallel_unit.id, parallel_unit.worker_node_id) + }) + .collect(); + + ParallelUnitMapping::from_protobuf(vnode_mapping) + .to_worker_slot(¶llel_unit_to_worker) + .to_protobuf() + } + pub async fn table_node_actors( &self, table_ids: &HashSet, diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 36e7b77ccf63..69e17a978212 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,11 +16,11 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{FragmentParallelUnitMapping, FragmentParallelUnitMappings}; +use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; @@ -31,11 +31,11 @@ pub type ServingVnodeMappingRef = Arc; #[derive(Default)] pub struct ServingVnodeMapping { - serving_vnode_mappings: RwLock>, + serving_vnode_mappings: RwLock>, } impl ServingVnodeMapping { - pub fn all(&self) -> HashMap { + pub fn all(&self) -> HashMap { self.serving_vnode_mappings.read().clone() } @@ -45,9 +45,9 @@ impl ServingVnodeMapping { &self, streaming_parallelisms: HashMap, workers: &[WorkerNode], - ) -> (HashMap, Vec) { + ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); - let mut upserted: HashMap = HashMap::default(); + let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; for (fragment_id, streaming_parallelism) in streaming_parallelisms { let new_mapping = { @@ -81,24 +81,24 @@ impl ServingVnodeMapping { } } -pub(crate) fn to_fragment_parallel_unit_mapping( - mappings: &HashMap, -) -> Vec { +pub(crate) fn to_fragment_worker_slot_mapping( + mappings: &HashMap, +) -> Vec { mappings .iter() - .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping { fragment_id: *fragment_id, mapping: Some(mapping.to_protobuf()), }) .collect() } -pub(crate) fn to_deleted_fragment_parallel_unit_mapping( +pub(crate) fn to_deleted_fragment_worker_slot_mapping( fragment_ids: &[FragmentId], -) -> Vec { +) -> Vec { fragment_ids .iter() - .map(|fragment_id| FragmentParallelUnitMapping { + .map(|fragment_id| FragmentWorkerSlotMapping { fragment_id: *fragment_id, mapping: None, }) @@ -120,8 +120,8 @@ pub async fn on_meta_start( ); notification_manager.notify_frontend_without_version( Operation::Snapshot, - Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { - mappings: to_fragment_parallel_unit_mapping(&mappings), + Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings { + mappings: to_fragment_worker_slot_mapping(&mappings), }), ); } @@ -185,7 +185,7 @@ pub async fn start_serving_vnode_mapping_worker( let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys()); - notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) })); + notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&mappings) })); } LocalNotification::FragmentMappingsUpsert(fragment_ids) => { if fragment_ids.is_empty() { @@ -195,11 +195,11 @@ pub async fn start_serving_vnode_mapping_worker( let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); if !upserted.is_empty() { tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); - notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) })); + notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) })); } if !failed.is_empty() { tracing::debug!("Fail to update serving vnode mapping for fragments {:?}.", failed); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&failed)})); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)})); } } LocalNotification::FragmentMappingsDelete(fragment_ids) => { @@ -208,7 +208,7 @@ pub async fn start_serving_vnode_mapping_worker( } tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids); serving_vnode_mapping.remove(&fragment_ids); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&fragment_ids) })); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) })); } _ => {} } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 8c673b4dc240..34759c09ab72 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,7 +41,7 @@ use risingwave_pb::meta::table_fragments::fragment::{ FragmentDistributionType, PbFragmentDistributionType, }; use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State}; -use risingwave_pb::meta::FragmentParallelUnitMappings; +use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbStreamActor, StreamNode, @@ -61,8 +61,7 @@ use crate::manager::{ }; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ - to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping, - ServingVnodeMapping, + to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping, ServingVnodeMapping, }; use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY}; use crate::stream::{GlobalStreamManager, SourceManagerRef}; @@ -1725,8 +1724,8 @@ impl ScaleController { .notification_manager() .notify_frontend_without_version( Operation::Update, - Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { - mappings: to_fragment_parallel_unit_mapping(&upserted), + Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings { + mappings: to_fragment_worker_slot_mapping(&upserted), }), ); } @@ -1739,8 +1738,8 @@ impl ScaleController { .notification_manager() .notify_frontend_without_version( Operation::Delete, - Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { - mappings: to_deleted_fragment_parallel_unit_mapping(&failed), + Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings { + mappings: to_deleted_fragment_worker_slot_mapping(&failed), }), ); } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 7fd32f3b8bab..e4d6b5300378 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,7 +25,7 @@ use futures::stream::BoxStream; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -1171,11 +1171,11 @@ impl MetaClient { pub async fn list_serving_vnode_mappings( &self, - ) -> Result> { + ) -> Result> { let req = GetServingVnodeMappingsRequest {}; let resp = self.inner.get_serving_vnode_mappings(req).await?; let mappings = resp - .mappings + .worker_slot_mappings .into_iter() .map(|p| { ( @@ -1185,7 +1185,7 @@ impl MetaClient { .get(&p.fragment_id) .cloned() .unwrap_or(0), - ParallelUnitMapping::from_protobuf(p.mapping.as_ref().unwrap()), + WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()), ), ) })