From e1ad5ea35ba4ec5c1e32be74b2a3bd54da7bfff2 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 3 Sep 2024 15:55:59 +0800 Subject: [PATCH] vnode mapping --- .../task/consistent_hash_shuffle_channel.rs | 1 + .../src/hash/consistent_hash/mapping.rs | 42 +++++----- src/common/src/hash/consistent_hash/vnode.rs | 83 ++++++++++++------- src/common/src/hash/table_distribution.rs | 4 +- src/common/src/util/row_id.rs | 1 + src/common/src/util/scan_range.rs | 4 +- .../src/vnode_mapping/vnode_placement.rs | 2 +- src/expr/impl/src/scalar/vnode.rs | 6 +- src/meta/src/controller/fragment.rs | 16 ++-- src/meta/src/stream/stream_graph/schedule.rs | 5 +- src/meta/src/stream/stream_manager.rs | 17 ++-- src/meta/src/stream/test_scale.rs | 26 +++--- .../log_store_impl/kv_log_store/test_utils.rs | 8 +- src/stream/src/executor/dispatch.rs | 3 +- 14 files changed, 131 insertions(+), 87 deletions(-) diff --git a/src/batch/src/task/consistent_hash_shuffle_channel.rs b/src/batch/src/task/consistent_hash_shuffle_channel.rs index ad0fdbaa8b70a..32d91a7acc09b 100644 --- a/src/batch/src/task/consistent_hash_shuffle_channel.rs +++ b/src/batch/src/task/consistent_hash_shuffle_channel.rs @@ -59,6 +59,7 @@ fn generate_hash_values( .iter() .map(|idx| *idx as usize) .collect::>(), + consistent_hash_info.vmap.len(), ); let hash_values = vnodes diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index a462acb291853..0ab8f9e18fd2e 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -105,26 +105,26 @@ impl VnodeMapping { /// /// For example, if `items` is `[0, 1, 2]`, and the total vnode count is 10, we'll generate /// mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`. - pub fn new_uniform(items: impl ExactSizeIterator) -> Self { + pub fn new_uniform(items: impl ExactSizeIterator, vnode_count: usize) -> Self { // If the number of items is greater than the total vnode count, no vnode will be mapped to // some items and the mapping will be invalid. - assert!(items.len() <= VirtualNode::COUNT); + assert!(items.len() <= vnode_count); let mut original_indices = Vec::with_capacity(items.len()); let mut data = Vec::with_capacity(items.len()); - let hash_shard_size = VirtualNode::COUNT / items.len(); - let mut one_more_count = VirtualNode::COUNT % items.len(); + let hash_shard_size = vnode_count / items.len(); + let mut one_more_count = vnode_count % items.len(); let mut init_bound = 0; for item in items { - let vnode_count = if one_more_count > 0 { + let count = if one_more_count > 0 { one_more_count -= 1; hash_shard_size + 1 } else { hash_shard_size }; - init_bound += vnode_count; + init_bound += count; original_indices.push(init_bound as u32 - 1); data.push(item); @@ -141,10 +141,11 @@ impl VnodeMapping { /// Create a vnode mapping where all vnodes are mapped to the same single item. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item)) + // TODO(var-vnode): always 1 correct? + Self::new_uniform(std::iter::once(item), 1) } - /// The length of the vnode in this mapping, typically [`VirtualNode::COUNT`]. + /// The length (or count) of the vnode in this mapping. pub fn len(&self) -> usize { self.original_indices .last() @@ -204,12 +205,13 @@ impl VnodeMapping { /// Convert this vnode mapping to a mapping from items to bitmaps, where each bitmap represents /// the vnodes mapped to the item. pub fn to_bitmaps(&self) -> HashMap { + let vnode_count = self.len(); let mut vnode_bitmaps = HashMap::new(); for (vnode, item) in self.iter_with_vnode() { vnode_bitmaps .entry(item) - .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .or_insert_with(|| BitmapBuilder::zeroed(vnode_count)) .set(vnode.to_index(), true); } @@ -222,10 +224,11 @@ impl VnodeMapping { /// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap /// represents the vnodes mapped to the item. pub fn from_bitmaps(bitmaps: &HashMap) -> Self { - let mut items = vec![None; VirtualNode::COUNT]; + let vnode_count = bitmaps.values().next().expect("empty bitmaps").len(); + let mut items = vec![None; vnode_count]; for (&item, bitmap) in bitmaps { - assert_eq!(bitmap.len(), VirtualNode::COUNT); + assert_eq!(bitmap.len(), vnode_count); for idx in bitmap.iter_ones() { if let Some(prev) = items[idx].replace(item) { panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`"); @@ -241,9 +244,8 @@ impl VnodeMapping { Self::from_expanded(&items) } - /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`]. + /// Create a vnode mapping from the expanded slice of items. pub fn from_expanded(items: &[T::Item]) -> Self { - assert_eq!(items.len(), VirtualNode::COUNT); let (original_indices, data) = compress_data(items); Self { original_indices, @@ -251,7 +253,7 @@ impl VnodeMapping { } } - /// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::COUNT`]. + /// Convert this vnode mapping to a expanded vector of items. pub fn to_expanded(&self) -> ExpandedMapping { self.iter().collect() } @@ -353,8 +355,8 @@ impl ActorMapping { impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids - pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { - Self::new_uniform(worker_slot_ids.iter().cloned()) + pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self { + Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count) } /// Create a worker mapping from the protobuf representation. @@ -403,18 +405,18 @@ mod tests { type TestMapping = VnodeMapping; type Test2Mapping = VnodeMapping; - const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT]; + const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST]; fn uniforms() -> impl Iterator { COUNTS .iter() - .map(|&count| TestMapping::new_uniform(0..count as u32)) + .map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST)) } fn randoms() -> impl Iterator { COUNTS.iter().map(|&count| { let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32)) - .take(VirtualNode::COUNT) + .take(VirtualNode::COUNT_FOR_TEST) .collect_vec(); TestMapping::from_expanded(&raw) }) @@ -427,7 +429,7 @@ mod tests { #[test] fn test_uniform() { for vnode_mapping in uniforms() { - assert_eq!(vnode_mapping.len(), VirtualNode::COUNT); + assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST); let item_count = vnode_mapping.iter_unique().count(); let mut check: HashMap> = HashMap::new(); diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index dd4095535fdf3..a87ce4fc1b673 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -30,26 +30,44 @@ use crate::util::row_id::extract_vnode_id_from_row_id; pub struct VirtualNode(VirtualNodeInner); /// The internal representation of a virtual node id. +/// +/// Note: not all bits of the inner representation are used. type VirtualNodeInner = u16; -static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); -impl From for VirtualNode { - fn from(hash_code: Crc32HashCode) -> Self { +/// `vnode_count` must be provided to convert a hash code to a virtual node. +/// +/// Use [`Crc32HashCodeToVnodeExt::to_vnode`] instead. +impl !From for VirtualNode {} + +#[easy_ext::ext(Crc32HashCodeToVnodeExt)] +impl Crc32HashCode { + fn to_vnode(self, vnode_count: usize) -> VirtualNode { // Take the least significant bits of the hash code. // TODO: should we use the most significant bits? - let inner = (hash_code.value() % Self::COUNT as u64) as VirtualNodeInner; + let inner = (self.value() % vnode_count as u64) as VirtualNodeInner; VirtualNode(inner) } } impl VirtualNode { - /// The number of bits used to represent a virtual node. - /// - /// Note: Not all bits of the inner representation are used. One should rely on this constant - /// to determine the count of virtual nodes. - pub const BITS: usize = 8; /// The total count of virtual nodes. - pub const COUNT: usize = 1 << Self::BITS; + // TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST` + pub const COUNT: usize = 1 << 8; + /// The maximum value of the virtual node. + // TODO(var-vnode): remove this and only keep `MAX_FOR_TEST` + pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); +} + +impl VirtualNode { + /// The total count of virtual nodes, for testing purposes. + pub const COUNT_FOR_TEST: usize = Self::COUNT; + /// The maximum value of the virtual node, for testing purposes. + pub const MAX_FOR_TEST: VirtualNode = Self::MAX; +} + +impl VirtualNode { + /// The maximum count of virtual nodes that fits in [`VirtualNodeInner`]. + pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; /// The size of a virtual node in bytes, in memory or serialized representation. pub const SIZE: usize = std::mem::size_of::(); } @@ -58,8 +76,6 @@ impl VirtualNode { pub type AllVirtualNodeIter = std::iter::Map, fn(usize) -> VirtualNode>; impl VirtualNode { - /// The maximum value of the virtual node. - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); /// We may use `VirtualNode` as a datum in a stream, or store it as a column. /// Hence this reifies it as a RW datatype. pub const RW_TYPE: DataType = DataType::Int16; @@ -68,7 +84,7 @@ impl VirtualNode { /// Creates a virtual node from the `usize` index. pub const fn from_index(index: usize) -> Self { - debug_assert!(index < Self::COUNT); + debug_assert!(index < Self::MAX_COUNT); Self(index as _) } @@ -79,7 +95,6 @@ impl VirtualNode { /// Creates a virtual node from the given scalar representation. Used by `VNODE` expression. pub const fn from_scalar(scalar: i16) -> Self { - debug_assert!((scalar as usize) < Self::COUNT); Self(scalar as _) } @@ -99,7 +114,6 @@ impl VirtualNode { /// Creates a virtual node from the given big-endian bytes representation. pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); - debug_assert!((inner as usize) < Self::COUNT); Self(inner) } @@ -109,22 +123,21 @@ impl VirtualNode { } /// Iterates over all virtual nodes. - pub fn all() -> AllVirtualNodeIter { - (0..Self::COUNT).map(Self::from_index) + pub fn all(vnode_count: usize) -> AllVirtualNodeIter { + (0..vnode_count).map(Self::from_index) } } -impl VirtualNode { - pub const COUNT_FOR_TEST: usize = Self::COUNT; - pub const MAX_FOR_TEST: VirtualNode = Self::MAX; -} - impl VirtualNode { // `compute_chunk` is used to calculate the `VirtualNode` for the columns in the // chunk. When only one column is provided and its type is `Serial`, we consider the column to // be the one that contains RowId, and use a special method to skip the calculation of Hash // and directly extract the `VirtualNode` from `RowId`. - pub fn compute_chunk(data_chunk: &DataChunk, keys: &[usize]) -> Vec { + pub fn compute_chunk( + data_chunk: &DataChunk, + keys: &[usize], + vnode_count: usize, + ) -> Vec { if let Ok(idx) = keys.iter().exactly_one() && let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx) { @@ -140,7 +153,7 @@ impl VirtualNode { // This process doesn’t guarantee the order of rows, producing indeterminate results in some cases, // such as when `distinct on` is used without an `order by`. let (row, _) = data_chunk.row_at(idx); - row.hash(Crc32FastBuilder).into() + row.hash(Crc32FastBuilder).to_vnode(vnode_count) } }) .collect(); @@ -149,19 +162,29 @@ impl VirtualNode { data_chunk .get_hash_values(keys, Crc32FastBuilder) .into_iter() - .map(|hash| hash.into()) + .map(|hash| hash.to_vnode(vnode_count)) .collect() } + /// Equivalent to [`Self::compute_chunk`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count. + pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec { + Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST) + } + // `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a // `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns. - pub fn compute_row(row: impl Row, indices: &[usize]) -> VirtualNode { + pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode { let project = row.project(indices); if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() { return extract_vnode_id_from_row_id(s.as_row_id()); } - project.hash(Crc32FastBuilder).into() + project.hash(Crc32FastBuilder).to_vnode(vnode_count) + } + + /// Equivalent to [`Self::compute_row`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count. + pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode { + Self::compute_row(row, indices, Self::COUNT_FOR_TEST) } } @@ -184,7 +207,7 @@ mod tests { ); let chunk = DataChunk::from_pretty(chunk.as_str()); - let vnodes = VirtualNode::compute_chunk(&chunk, &[0]); + let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]); assert_eq!( vnodes.as_slice(), @@ -200,7 +223,7 @@ mod tests { Some(ScalarImpl::Int64(12345)), ]); - let vnode = VirtualNode::compute_row(&row, &[0]); + let vnode = VirtualNode::compute_row_for_test(&row, &[0]); assert_eq!(vnode, VirtualNode::from_index(100)); } @@ -221,7 +244,7 @@ mod tests { ); let chunk = DataChunk::from_pretty(chunk.as_str()); - let vnodes = VirtualNode::compute_chunk(&chunk, &[0]); + let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]); assert_eq!( vnodes.as_slice(), diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 480483bc96a5d..5275aca04adb3 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -184,7 +184,7 @@ impl TableDistribution { /// Get vnode value with `indices` on the given `row`. pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode { assert!(!indices.is_empty()); - let vnode = VirtualNode::compute_row(&row, indices); + let vnode = VirtualNode::compute_row(&row, indices, vnodes.len()); check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode); @@ -219,7 +219,7 @@ impl TableDistribution { .map(|idx| pk_indices[*idx]) .collect_vec(); - VirtualNode::compute_chunk(chunk, &dist_key_indices) + VirtualNode::compute_chunk(chunk, &dist_key_indices, vnodes.len()) .into_iter() .zip_eq_fast(chunk.visibility().iter()) .map(|(vnode, vis)| { diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index 508f418903413..7f22c17e925e4 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -52,6 +52,7 @@ pub struct RowIdGenerator { pub type RowId = i64; +// TODO(var-vnode): how should we handle this for different virtual node counts? #[inline] pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode { let vnode_id = ((id >> VNODE_ID_SHIFT_BITS) & (VNODE_ID_UPPER_BOUND as i64 - 1)) as u32; diff --git a/src/common/src/util/scan_range.rs b/src/common/src/util/scan_range.rs index 5d5e84ed32085..cfe209cf2c22a 100644 --- a/src/common/src/util/scan_range.rs +++ b/src/common/src/util/scan_range.rs @@ -173,7 +173,7 @@ mod tests { Some(ScalarImpl::from(514)), ]); - let vnode = VirtualNode::compute_row(&row, &[0, 1]); + let vnode = VirtualNode::compute_row_for_test(&row, &[0, 1]); assert_eq!(scan_range.try_compute_vnode(&dist), Some(vnode)); } @@ -203,7 +203,7 @@ mod tests { Some(ScalarImpl::from(114514)), ]); - let vnode = VirtualNode::compute_row(&row, &[2, 1]); + let vnode = VirtualNode::compute_row_for_test(&row, &[2, 1]); assert_eq!(scan_range.try_compute_vnode(&dist), Some(vnode)); } diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 5619ffc6e0f96..ccaf67db6a3c8 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -123,7 +123,7 @@ pub fn place_vnode( } None => { // No hint is provided, assign all vnodes to `temp_pu`. - for vnode in VirtualNode::all() { + for vnode in VirtualNode::all(VirtualNode::COUNT) { temp_slot.balance += 1; temp_slot.builder.set(vnode.to_index(), true); } diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index e544c39f62499..edd4caa39970e 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -43,7 +43,8 @@ impl Expression for VnodeExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices); + // TODO(var-vnode): get vnode count from context + let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, VirtualNode::COUNT); let mut builder = I16ArrayBuilder::new(input.capacity()); vnodes .into_iter() @@ -52,8 +53,9 @@ impl Expression for VnodeExpression { } async fn eval_row(&self, input: &OwnedRow) -> Result { + // TODO(var-vnode): get vnode count from context Ok(Some( - VirtualNode::compute_row(input, &self.dist_key_indices) + VirtualNode::compute_row(input, &self.dist_key_indices, VirtualNode::COUNT) .to_scalar() .into(), )) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 16228a06d0a9a..31575e72804f9 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1411,7 +1411,7 @@ mod tests { use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; - use risingwave_common::hash::ActorMapping; + use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; @@ -1497,8 +1497,11 @@ mod tests { }) .collect(); - let actor_bitmaps = - ActorMapping::new_uniform((0..actor_count).map(|i| i as _)).to_bitmaps(); + let actor_bitmaps = ActorMapping::new_uniform( + (0..actor_count).map(|i| i as _), + VirtualNode::COUNT_FOR_TEST, + ) + .to_bitmaps(); let pb_actors = (0..actor_count) .map(|actor_id| { @@ -1610,8 +1613,11 @@ mod tests { }) .collect(); - let mut actor_bitmaps = - ActorMapping::new_uniform((0..actor_count).map(|i| i as _)).to_bitmaps(); + let mut actor_bitmaps = ActorMapping::new_uniform( + (0..actor_count).map(|i| i as _), + VirtualNode::COUNT_FOR_TEST, + ) + .to_bitmaps(); let actors = (0..actor_count) .map(|actor_id| { diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 0f9e473c26486..f338dd27725ca 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,7 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::{ @@ -235,7 +235,8 @@ impl Scheduler { assert_eq!(scheduled_worker_slots.len(), parallelism); // Build the default hash mapping uniformly. - let default_hash_mapping = WorkerSlotMapping::build_from_ids(&scheduled_worker_slots); + let default_hash_mapping = + WorkerSlotMapping::build_from_ids(&scheduled_worker_slots, VirtualNode::COUNT); let single_scheduled = schedule_units_for_slots(&slots, 1, streaming_job_id)?; let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a8e8bc47752a5..d8b1dc131fec1 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -764,8 +764,7 @@ mod tests { use std::time::Duration; use futures::{Stream, TryStreamExt}; - use risingwave_common::hash; - use risingwave_common::hash::{ActorMapping, WorkerSlotId}; + use risingwave_common::hash::{self, ActorMapping, VirtualNode, WorkerSlotId}; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property; @@ -1137,12 +1136,14 @@ mod tests { } fn make_mview_stream_actors(table_id: &TableId, count: usize) -> Vec { - let mut actor_bitmaps: HashMap<_, _> = - ActorMapping::new_uniform((0..count).map(|i| i as hash::ActorId)) - .to_bitmaps() - .into_iter() - .map(|(actor_id, bitmap)| (actor_id, bitmap.to_protobuf())) - .collect(); + let mut actor_bitmaps: HashMap<_, _> = ActorMapping::new_uniform( + (0..count).map(|i| i as hash::ActorId), + VirtualNode::COUNT_FOR_TEST, + ) + .to_bitmaps() + .into_iter() + .map(|(actor_id, bitmap)| (actor_id, bitmap.to_protobuf())) + .collect(); (0..count) .map(|i| StreamActor { diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index 0dc0bced84005..54e619e473cd0 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -26,7 +26,7 @@ mod tests { use crate::stream::CustomActorInfo; fn simulated_parallelism(min: Option, max: Option) -> Vec { - let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT]; + let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST]; if let Some(min) = min { raw.retain(|n| *n > min); raw.push(min); @@ -39,7 +39,9 @@ mod tests { } fn build_fake_actors(actor_ids: Vec) -> Vec { - let actor_bitmaps = ActorMapping::new_uniform(actor_ids.clone().into_iter()).to_bitmaps(); + let actor_bitmaps = + ActorMapping::new_uniform(actor_ids.clone().into_iter(), VirtualNode::COUNT_FOR_TEST) + .to_bitmaps(); actor_ids .iter() .map(|actor_id| CustomActorInfo { @@ -55,7 +57,7 @@ mod tests { fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) { let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()); - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::COUNT_FOR_TEST { if prev_bitmap.is_set(idx) { assert!(bitmap.is_set(idx)); } @@ -63,7 +65,9 @@ mod tests { } fn check_bitmaps(bitmaps: &HashMap) { - let mut target = (0..VirtualNode::COUNT).map(|_| false).collect_vec(); + let mut target = (0..VirtualNode::COUNT_FOR_TEST) + .map(|_| false) + .collect_vec(); for bitmap in bitmaps.values() { for (idx, pos) in target.iter_mut().enumerate() { @@ -89,9 +93,10 @@ mod tests { fn test_build_actor_mapping() { for parallelism in simulated_parallelism(None, None) { let actor_ids = (0..parallelism as ActorId).collect_vec(); - let actor_mapping = ActorMapping::new_uniform(actor_ids.into_iter()); + let actor_mapping = + ActorMapping::new_uniform(actor_ids.into_iter(), VirtualNode::COUNT_FOR_TEST); - assert_eq!(actor_mapping.len(), VirtualNode::COUNT); + assert_eq!(actor_mapping.len(), VirtualNode::COUNT_FOR_TEST); let mut check: HashMap> = HashMap::new(); for (vnode, actor_id) in actor_mapping.iter_with_vnode() { @@ -178,7 +183,7 @@ mod tests { #[test] fn test_rebalance_scale_out() { - for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT - 1)) { + for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT_FOR_TEST - 1)) { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add 1 @@ -189,8 +194,9 @@ mod tests { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - // add to VirtualNode::COUNT - let actors_to_add = (parallelism as ActorId..VirtualNode::COUNT as ActorId).collect(); + // add to VirtualNode::COUNT_FOR_TEST + let actors_to_add = + (parallelism as ActorId..VirtualNode::COUNT_FOR_TEST as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); @@ -275,7 +281,7 @@ mod tests { #[test] fn test_rebalance_scale_real() { - let actor_ids = (0..(VirtualNode::COUNT - 1) as ActorId).collect_vec(); + let actor_ids = (0..(VirtualNode::COUNT_FOR_TEST - 1) as ActorId).collect_vec(); let actors = build_fake_actors(actor_ids); let actors_to_remove = btreeset! {0, 1}; let actors_to_add = btreeset! {255}; diff --git a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs index 5fc10cd0cc58a..3114c22e63323 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs @@ -143,7 +143,7 @@ pub(crate) fn gen_multi_vnode_stream_chunks( .collect_vec(); let (ops, rows) = gen_sized_test_data(base, max_count); for (op, row) in zip_eq(ops, rows) { - let vnode = VirtualNode::compute_row(&row, &[TEST_SCHEMA_DIST_KEY_INDEX]); + let vnode = VirtualNode::compute_row_for_test(&row, &[TEST_SCHEMA_DIST_KEY_INDEX]); let (ops, builder) = &mut data_builder[vnode.to_index() % MOD_COUNT]; ops.push(op); assert!(builder.append_one_row(row).is_none()); @@ -177,9 +177,9 @@ pub(crate) fn gen_test_log_store_table(pk_info: &'static KvLogStorePkInfo) -> Pb pub(crate) fn calculate_vnode_bitmap<'a>( test_data: impl Iterator)>, ) -> Bitmap { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for vnode in - test_data.map(|(_, row)| VirtualNode::compute_row(row, &[TEST_SCHEMA_DIST_KEY_INDEX])) + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); + for vnode in test_data + .map(|(_, row)| VirtualNode::compute_row_for_test(row, &[TEST_SCHEMA_DIST_KEY_INDEX])) { builder.set(vnode.to_index(), true); } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 82d11db49513b..4a43ff618ebf7 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -755,7 +755,8 @@ impl Dispatcher for HashDataDispatcher { let num_outputs = self.outputs.len(); // get hash value of every line by its key - let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys); + let vnode_count = self.hash_mapping.len(); + let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count); tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);