diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index f95d89bad0cbd..963a226b805c5 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,20 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; -use std::ops::{Index, Sub}; +use std::ops::Index; use educe::Educe; use itertools::Itertools; -use risingwave_pb::common::{ - ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping, -}; +use risingwave_pb::common::PbWorkerSlotMapping; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; use super::bitmap::VnodeBitmapExt; -use super::vnode::{ParallelUnitId, VirtualNode}; use crate::bitmap::{Bitmap, BitmapBuilder}; use crate::util::compress::compress_data; use crate::util::iter_util::ZipEqDebug; @@ -78,7 +75,7 @@ impl Debug for WorkerSlotId { pub trait VnodeMappingItem { /// The type of the item. /// - /// Currently, there are two types of items: [`ParallelUnitId`] and [`ActorId`]. We don't use + /// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use /// them directly as the generic parameter because they're the same type aliases. type Item: Copy + Ord + Hash + Debug; } @@ -292,12 +289,6 @@ pub mod marker { type Item = ActorId; } - /// A marker type for items of [`ParallelUnitId`]. - pub struct ParallelUnit; - impl VnodeMappingItem for ParallelUnit { - type Item = ParallelUnitId; - } - /// A marker type for items of [`WorkerSlotId`]. pub struct WorkerSlot; impl VnodeMappingItem for WorkerSlot { @@ -310,25 +301,12 @@ pub type ActorMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`ActorId`]. pub type ExpandedActorMapping = ExpandedMapping; -/// A mapping from [`VirtualNode`] to [`ParallelUnitId`]. -pub type ParallelUnitMapping = VnodeMapping; -/// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`]. -pub type ExpandedParallelUnitMapping = ExpandedMapping; - /// A mapping from [`VirtualNode`] to [`WorkerSlotId`]. pub type WorkerSlotMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`]. pub type ExpandedWorkerSlotMapping = ExpandedMapping; impl ActorMapping { - /// Transform this actor mapping to a parallel unit mapping, essentially `transform`. - pub fn to_parallel_unit(&self, to_map: &M) -> ParallelUnitMapping - where - M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>, - { - self.transform(to_map) - } - /// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker. pub fn to_worker_slot(&self, actor_to_worker: &HashMap) -> WorkerSlotMapping { let mut worker_actors = HashMap::new(); @@ -367,12 +345,6 @@ impl ActorMapping { } } -#[derive(thiserror::Error, Debug)] -pub enum ParallelUnitError { - #[error("parallel units {0:?} are not covered by the worker slot mapping")] - NotCovered(HashSet), -} - impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { @@ -397,76 +369,6 @@ impl WorkerSlotMapping { } } -impl ParallelUnitMapping { - /// Create a uniform parallel unit mapping from the given parallel units, essentially - /// `new_uniform`. - pub fn build(parallel_units: &[ParallelUnit]) -> Self { - Self::new_uniform(parallel_units.iter().map(|pu| pu.id)) - } - - /// Create a uniform parallel unit mapping from the given parallel units ids - pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self { - Self::new_uniform(parallel_unit_ids.iter().cloned()) - } - - /// Transform this parallel unit mapping to an actor mapping, essentially `transform`. - pub fn to_actor(&self, to_map: &HashMap) -> ActorMapping { - self.transform(to_map) - } - - /// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`. - pub fn to_worker_slot( - &self, - to_map: &HashMap, - ) -> Result { - let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); - for (parallel_unit_id, worker_id) in to_map { - worker_to_parallel_units - .entry(*worker_id) - .or_default() - .insert(*parallel_unit_id); - } - - let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len()); - - for (worker_id, parallel_unit_ids) in worker_to_parallel_units { - for (index, ¶llel_unit_id) in parallel_unit_ids.iter().enumerate() { - parallel_unit_to_worker_slot - .insert(parallel_unit_id, WorkerSlotId::new(worker_id, index)); - } - } - - let available_parallel_unit_ids: HashSet<_> = - parallel_unit_to_worker_slot.keys().copied().collect(); - - let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect(); - - let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids); - if sub_set.is_empty() { - Ok(self.transform(¶llel_unit_to_worker_slot)) - } else { - Err(ParallelUnitError::NotCovered(sub_set)) - } - } - - /// Create a parallel unit mapping from the protobuf representation. - pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self { - assert_eq!(proto.original_indices.len(), proto.data.len()); - Self { - original_indices: proto.original_indices.clone(), - data: proto.data.clone(), - } - } - - /// Convert this parallel unit mapping to the protobuf representation. - pub fn to_protobuf(&self) -> ParallelUnitMappingProto { - ParallelUnitMappingProto { - original_indices: self.original_indices.clone(), - data: self.data.clone(), - } - } -} - impl WorkerSlotMapping { /// Transform this worker slot mapping to an actor mapping, essentially `transform`. pub fn to_actor(&self, to_map: &HashMap) -> ActorMapping { diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index fc6dac0978adf..f528544689f31 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -22,10 +22,6 @@ use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; -/// Parallel unit is the minimal scheduling unit. -// TODO: make it a newtype -pub type ParallelUnitId = u32; - /// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for /// consistent hashing. #[repr(transparent)] diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 9d3d558ac4839..8aeaed2f9c5a8 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_common::error::BoxedError; -use risingwave_common::hash::ParallelUnitError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -126,13 +125,6 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, - - #[error("ParallelUnit error: {0}")] - ParallelUnit( - #[from] - #[backtrace] - ParallelUnitError, - ), } impl MetaError { diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index 636ca8e1af5e8..9cc3697cc6950 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -19,14 +19,14 @@ mod tests { use itertools::Itertools; use maplit::btreeset; use risingwave_common::bitmap::Bitmap; - use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_pb::common::ParallelUnit; + use risingwave_common::hash::{ActorMapping, VirtualNode}; use crate::model::ActorId; use crate::stream::scale::rebalance_actor_vnode; use crate::stream::CustomActorInfo; - fn simulated_parallel_unit_nums(min: Option, max: Option) -> Vec { + fn simulated_parallelism(min: Option, max: Option) -> Vec { let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT]; if let Some(min) = min { raw.retain(|n| *n > min); @@ -39,31 +39,20 @@ mod tests { raw } - fn build_fake_actors(info: &[(ActorId, ParallelUnitId)]) -> Vec { - let parallel_units = generate_parallel_units(info); - - let vnode_bitmaps = ParallelUnitMapping::build(¶llel_units).to_bitmaps(); - - info.iter() - .map(|(actor_id, parallel_unit_id)| CustomActorInfo { + fn build_fake_actors(actor_ids: Vec) -> Vec { + let actor_bitmaps = ActorMapping::new_uniform(actor_ids.clone().into_iter()).to_bitmaps(); + actor_ids + .iter() + .map(|actor_id| CustomActorInfo { actor_id: *actor_id, - vnode_bitmap: vnode_bitmaps - .get(parallel_unit_id) + vnode_bitmap: actor_bitmaps + .get(actor_id) .map(|bitmap| bitmap.to_protobuf()), ..Default::default() }) .collect() } - fn generate_parallel_units(info: &[(ActorId, ParallelUnitId)]) -> Vec { - info.iter() - .map(|(_, parallel_unit_id)| ParallelUnit { - id: *parallel_unit_id, - ..Default::default() - }) - .collect_vec() - } - fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) { let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()); @@ -98,22 +87,19 @@ mod tests { } #[test] - fn test_build_vnode_mapping() { - for parallel_units_num in simulated_parallel_unit_nums(None, None) { - let info = (0..parallel_units_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(); - let parallel_units = generate_parallel_units(&info); - let vnode_mapping = ParallelUnitMapping::build(¶llel_units); + fn test_build_actor_mapping() { + for parallelism in simulated_parallelism(None, None) { + let actor_ids = (0..parallelism as ActorId).collect_vec(); + let actor_mapping = ActorMapping::new_uniform(actor_ids.into_iter()); - assert_eq!(vnode_mapping.len(), VirtualNode::COUNT); + assert_eq!(actor_mapping.len(), VirtualNode::COUNT); let mut check: HashMap> = HashMap::new(); - for (vnode, parallel_unit_id) in vnode_mapping.iter_with_vnode() { - check.entry(parallel_unit_id).or_default().push(vnode); + for (vnode, actor_id) in actor_mapping.iter_with_vnode() { + check.entry(actor_id).or_default().push(vnode); } - assert_eq!(check.len(), parallel_units_num); + assert_eq!(check.len(), parallelism); let (min, max) = check .values() @@ -126,46 +112,9 @@ mod tests { } } - #[test] - fn test_vnode_mapping_to_bitmaps() { - for parallel_units_num in simulated_parallel_unit_nums(None, None) { - let info = (0..parallel_units_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(); - let parallel_units = generate_parallel_units(&info); - let bitmaps = ParallelUnitMapping::build(¶llel_units).to_bitmaps(); - check_bitmaps(&bitmaps); - } - } - - #[test] - fn test_mapping_convert() { - for parallel_unit_num in simulated_parallel_unit_nums(None, None) { - let (actor_mapping, _) = generate_actor_mapping(parallel_unit_num); - - let actor_to_parallel_unit_map: HashMap<_, _> = (0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect(); - let parallel_unit_mapping = actor_mapping.to_parallel_unit(&actor_to_parallel_unit_map); - - let parallel_unit_to_actor_map: HashMap<_, _> = actor_to_parallel_unit_map - .into_iter() - .map(|(k, v)| (v, k)) - .collect(); - - let new_actor_mapping = parallel_unit_mapping.to_actor(¶llel_unit_to_actor_map); - - assert_eq!(actor_mapping, new_actor_mapping) - } - } - - fn generate_actor_mapping( - parallel_unit_num: usize, - ) -> (ActorMapping, HashMap) { - let parallel_units = (0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(); - let actors = build_fake_actors(¶llel_units); + fn generate_actor_mapping(parallelism: usize) -> (ActorMapping, HashMap) { + let actor_ids = (0..parallelism).map(|i| i as ActorId).collect_vec(); + let actors = build_fake_actors(actor_ids); let bitmaps: HashMap<_, _> = actors .into_iter() @@ -182,8 +131,8 @@ mod tests { #[test] fn test_actor_mapping_from_bitmaps() { - for parallel_unit_num in simulated_parallel_unit_nums(None, None) { - let (actor_mapping, bitmaps) = generate_actor_mapping(parallel_unit_num); + for parallelism in simulated_parallelism(None, None) { + let (actor_mapping, bitmaps) = generate_actor_mapping(parallelism); check_bitmaps(&bitmaps); for (actor_id, bitmap) in &bitmaps { @@ -198,7 +147,7 @@ mod tests { #[test] fn test_rebalance_empty() { - let actors = build_fake_actors(&(0..3).map(|i| (i, i)).collect_vec()); + let actors = build_fake_actors((0..3 as ActorId).collect_vec()); // empty input let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &BTreeSet::new()); @@ -207,12 +156,8 @@ mod tests { #[test] fn test_rebalance_scale_in() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), None) { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), None) { + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // remove 1 let actors_to_remove = btreeset! {0}; @@ -222,7 +167,7 @@ mod tests { check_affinity_for_scale_in(result.get(&(1 as ActorId)).unwrap(), &actors[1]); // remove n-1 - let actors_to_remove = (1..parallel_unit_num as ActorId).collect(); + let actors_to_remove = (1..parallelism as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &actors_to_remove, &BTreeSet::new()); assert_eq!(result.len(), 1); check_bitmaps(&result); @@ -234,28 +179,19 @@ mod tests { #[test] fn test_rebalance_scale_out() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), Some(VirtualNode::COUNT - 1)) - { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT - 1)) { + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add 1 - let actors_to_add = btreeset! {parallel_unit_num as ActorId}; + let actors_to_add = btreeset! {parallelism as ActorId}; let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); + // add to VirtualNode::COUNT - let actors_to_add = - (parallel_unit_num as ActorId..VirtualNode::COUNT as ActorId).collect(); + let actors_to_add = (parallelism as ActorId..VirtualNode::COUNT as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); @@ -264,16 +200,12 @@ mod tests { #[test] fn test_rebalance_migration() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), None) { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), None) { + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - for idx in 0..parallel_unit_num { + for idx in 0..parallelism { let actors_to_remove = btreeset! {idx as ActorId}; - let actors_to_add = btreeset! {parallel_unit_num as ActorId}; + let actors_to_add = btreeset! {parallelism as ActorId}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -293,18 +225,12 @@ mod tests { assert!(prev_bitmap.eq(target_bitmap)); } } + let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); - - for migration_count in 1..parallel_unit_num { + for migration_count in 1..parallelism { let actors_to_remove = (0..migration_count as ActorId).collect(); - let actors_to_add = (parallel_unit_num as ActorId - ..(parallel_unit_num + migration_count) as ActorId) - .collect(); + let actors_to_add = + (parallelism as ActorId..(parallelism + migration_count) as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -319,16 +245,13 @@ mod tests { #[test] fn test_rebalance_scale() { - for parallel_unit_num in simulated_parallel_unit_nums(Some(3), None) { - let actors = build_fake_actors( - &(0..parallel_unit_num) - .map(|i| (i as ActorId, i as ParallelUnitId)) - .collect_vec(), - ); + for parallelism in simulated_parallelism(Some(3), None) { + let actor_ids = (0..parallelism as ActorId).collect_vec(); + let actors = build_fake_actors(actor_ids); - let parallel_unit_num = parallel_unit_num as ActorId; + let parallelism = parallelism as ActorId; let actors_to_remove = btreeset! {0}; - let actors_to_add = btreeset! {parallel_unit_num, parallel_unit_num+1}; + let actors_to_add = btreeset! {parallelism, parallelism+1}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -338,7 +261,7 @@ mod tests { check_bitmaps(&result); let actors_to_remove = btreeset! {0, 1}; - let actors_to_add = btreeset! {parallel_unit_num}; + let actors_to_add = btreeset! {parallelism}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); assert_eq!( @@ -353,11 +276,8 @@ mod tests { #[test] fn test_rebalance_scale_real() { - let parallel_units = (0..(VirtualNode::COUNT - 1) as ActorId) - .map(|i| (i, i)) - .collect_vec(); - let actors = build_fake_actors(¶llel_units); - + let actor_ids = (0..(VirtualNode::COUNT - 1) as ActorId).collect_vec(); + let actors = build_fake_actors(actor_ids); let actors_to_remove = btreeset! {0, 1}; let actors_to_add = btreeset! {255}; let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add); diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 4eeffada2840f..63cf9e414a0cb 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -26,7 +26,7 @@ use itertools::Itertools; use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng}; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ParallelUnitId, WorkerSlotId}; +use risingwave_common::hash::WorkerSlotId; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;