diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 50e1ce853ebe6..0c36eb1bf9d6e 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -319,7 +319,7 @@ pub struct MetaConfig { pub do_not_config_object_storage_lifecycle: bool, /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. - /// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `VirtualNode::count() / partition_vnode_count` consecutive virtual-nodes of one state table. #[serde(default = "default::meta::partition_vnode_count")] pub partition_vnode_count: u32, @@ -348,7 +348,7 @@ pub struct MetaConfig { /// Count of partitions of tables in default group and materialized view group. /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. - /// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `VirtualNode::count() / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. /// Set it zero to disable this feature. #[serde(default = "default::meta::hybrid_partition_vnode_count")] pub hybrid_partition_vnode_count: u32, @@ -427,10 +427,10 @@ impl<'de> Deserialize<'de> for DefaultParallelism { ))) } } - Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::COUNT { + Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::count() { Err(serde::de::Error::custom(format!( "default parallelism should be not great than {}", - VirtualNode::COUNT + VirtualNode::count() )))? } else { NonZeroUsize::new(i).ok_or_else(|| { diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index a462acb291853..6e8603528d514 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -108,13 +108,13 @@ impl VnodeMapping { pub fn new_uniform(items: impl ExactSizeIterator) -> Self { // If the number of items is greater than the total vnode count, no vnode will be mapped to // some items and the mapping will be invalid. - assert!(items.len() <= VirtualNode::COUNT); + assert!(items.len() <= VirtualNode::count()); let mut original_indices = Vec::with_capacity(items.len()); let mut data = Vec::with_capacity(items.len()); - let hash_shard_size = VirtualNode::COUNT / items.len(); - let mut one_more_count = VirtualNode::COUNT % items.len(); + let hash_shard_size = VirtualNode::count() / items.len(); + let mut one_more_count = VirtualNode::count() % items.len(); let mut init_bound = 0; for item in items { @@ -144,7 +144,7 @@ impl VnodeMapping { Self::new_uniform(std::iter::once(item)) } - /// The length of the vnode in this mapping, typically [`VirtualNode::COUNT`]. + /// The length of the vnode in this mapping, typically [`VirtualNode::count`]. pub fn len(&self) -> usize { self.original_indices .last() @@ -209,7 +209,7 @@ impl VnodeMapping { for (vnode, item) in self.iter_with_vnode() { vnode_bitmaps .entry(item) - .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count())) .set(vnode.to_index(), true); } @@ -222,10 +222,10 @@ impl VnodeMapping { /// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap /// represents the vnodes mapped to the item. pub fn from_bitmaps(bitmaps: &HashMap) -> Self { - let mut items = vec![None; VirtualNode::COUNT]; + let mut items = vec![None; VirtualNode::count()]; for (&item, bitmap) in bitmaps { - assert_eq!(bitmap.len(), VirtualNode::COUNT); + assert_eq!(bitmap.len(), VirtualNode::count()); for idx in bitmap.iter_ones() { if let Some(prev) = items[idx].replace(item) { panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`"); @@ -241,9 +241,9 @@ impl VnodeMapping { Self::from_expanded(&items) } - /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`]. + /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::count`]. pub fn from_expanded(items: &[T::Item]) -> Self { - assert_eq!(items.len(), VirtualNode::COUNT); + assert_eq!(items.len(), VirtualNode::count()); let (original_indices, data) = compress_data(items); Self { original_indices, @@ -251,7 +251,7 @@ impl VnodeMapping { } } - /// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::COUNT`]. + /// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::count`]. pub fn to_expanded(&self) -> ExpandedMapping { self.iter().collect() } @@ -403,18 +403,20 @@ mod tests { type TestMapping = VnodeMapping; type Test2Mapping = VnodeMapping; - const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT]; + fn counts() -> &[usize] { + &[1, 3, 12, 42, VirtualNode::count()] + } fn uniforms() -> impl Iterator { - COUNTS + counts() .iter() .map(|&count| TestMapping::new_uniform(0..count as u32)) } fn randoms() -> impl Iterator { - COUNTS.iter().map(|&count| { + counts().iter().map(|&count| { let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32)) - .take(VirtualNode::COUNT) + .take(VirtualNode::count()) .collect_vec(); TestMapping::from_expanded(&raw) }) @@ -427,7 +429,7 @@ mod tests { #[test] fn test_uniform() { for vnode_mapping in uniforms() { - assert_eq!(vnode_mapping.len(), VirtualNode::COUNT); + assert_eq!(vnode_mapping.len(), VirtualNode::count()); let item_count = vnode_mapping.iter_unique().count(); let mut check: HashMap> = HashMap::new(); diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index f528544689f31..220389e2b5854 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZero; +use std::sync::LazyLock; + use itertools::Itertools; use parse_display::Display; @@ -31,44 +34,82 @@ pub struct VirtualNode(VirtualNodeInner); /// The internal representation of a virtual node id. type VirtualNodeInner = u16; -static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); +// static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); impl From for VirtualNode { fn from(hash_code: Crc32HashCode) -> Self { // Take the least significant bits of the hash code. // TODO: should we use the most significant bits? - let inner = (hash_code.value() % Self::COUNT as u64) as VirtualNodeInner; + let inner = (hash_code.value() % Self::count() as u64) as VirtualNodeInner; VirtualNode(inner) } } impl VirtualNode { - /// The number of bits used to represent a virtual node. - /// - /// Note: Not all bits of the inner representation are used. One should rely on this constant - /// to determine the count of virtual nodes. - pub const BITS: usize = 8; - /// The total count of virtual nodes. - pub const COUNT: usize = 1 << Self::BITS; + /// We may use `VirtualNode` as a datum in a stream, or store it as a column. + /// Hence this reifies it as a RW datatype. + pub const RW_TYPE: DataType = DataType::Int16; /// The size of a virtual node in bytes, in memory or serialized representation. pub const SIZE: usize = std::mem::size_of::(); + /// The minimum (zero) value of the virtual node. + pub const ZERO: VirtualNode = unsafe { VirtualNode::from_index_unchecked(0) }; +} + +impl VirtualNode { + /// The default count of virtual nodes. + const DEFAULT_COUNT: usize = 1 << 8; + /// The maximum count of virtual nodes, limited by the size of the inner type [`VirtualNodeInner`]. + const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; + + /// The total count of virtual nodes. + /// + /// It can be customized by the environment variable `RW_VNODE_COUNT`, or defaults to [`Self::DEFAULT_COUNT`]. + pub fn count() -> usize { + // Cache the value to avoid repeated env lookups and parsing. + static COUNT: LazyLock = LazyLock::new(|| { + if let Ok(count) = std::env::var("RW_VNODE_COUNT") { + let count = count + .parse::>() + .expect("`RW_VNODE_COUNT` must be a positive integer") + .get(); + assert!( + count <= VirtualNode::MAX_COUNT, + "`RW_VNODE_COUNT` should not exceed maximum value {}", + VirtualNode::MAX_COUNT + ); + // TODO(var-vnode): shall we enforce it to be a power of 2? + count + } else { + VirtualNode::DEFAULT_COUNT + } + }); + + *COUNT + } + + /// The last virtual node in the range. It's derived from [`Self::count`]. + pub fn max() -> VirtualNode { + VirtualNode::from_index(Self::count() - 1) + } } /// An iterator over all virtual nodes. pub type AllVirtualNodeIter = std::iter::Map, fn(usize) -> VirtualNode>; impl VirtualNode { - /// The maximum value of the virtual node. - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); - /// We may use `VirtualNode` as a datum in a stream, or store it as a column. - /// Hence this reifies it as a RW datatype. - pub const RW_TYPE: DataType = DataType::Int16; - /// The minimum (zero) value of the virtual node. - pub const ZERO: VirtualNode = VirtualNode::from_index(0); - /// Creates a virtual node from the `usize` index. - pub const fn from_index(index: usize) -> Self { - debug_assert!(index < Self::COUNT); + pub fn from_index(index: usize) -> Self { + debug_assert!(index < Self::count()); + Self(index as _) + } + + /// Creates a virtual node from the `usize` index without bounds checking. + /// + /// # Safety + /// + /// The caller must ensure that the index is within the range of virtual nodes, + /// i.e., less than [`Self::count`]. + pub const unsafe fn from_index_unchecked(index: usize) -> Self { Self(index as _) } @@ -78,8 +119,8 @@ impl VirtualNode { } /// Creates a virtual node from the given scalar representation. Used by `VNODE` expression. - pub const fn from_scalar(scalar: i16) -> Self { - debug_assert!((scalar as usize) < Self::COUNT); + pub fn from_scalar(scalar: i16) -> Self { + debug_assert!((scalar as usize) < Self::count()); Self(scalar as _) } @@ -97,9 +138,9 @@ impl VirtualNode { } /// Creates a virtual node from the given big-endian bytes representation. - pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { + pub fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); - debug_assert!((inner as usize) < Self::COUNT); + debug_assert!((inner as usize) < Self::count()); Self(inner) } @@ -110,7 +151,7 @@ impl VirtualNode { /// Iterates over all virtual nodes. pub fn all() -> AllVirtualNodeIter { - (0..Self::COUNT).map(Self::from_index) + (0..Self::count()).map(Self::from_index) } } diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 9be9cd2abafb2..e7403dfcf72d6 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -108,7 +108,7 @@ impl TableDistribution { pub fn singleton_vnode_bitmap_ref() -> &'static Arc { /// A bitmap that only the default vnode is set. static SINGLETON_VNODES: LazyLock> = LazyLock::new(|| { - let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnodes = BitmapBuilder::zeroed(VirtualNode::count()); vnodes.set(SINGLETON_VNODE.to_index(), true); vnodes.finish().into() }); @@ -123,7 +123,7 @@ impl TableDistribution { pub fn all_vnodes_ref() -> &'static Arc { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); + LazyLock::new(|| Bitmap::ones(VirtualNode::count()).into()); &ALL_VNODES } diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index 508f418903413..c3eaa722b1914 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -15,17 +15,14 @@ use std::cmp::Ordering; use std::time::SystemTime; -use static_assertions::const_assert; - use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH; use crate::hash::VirtualNode; +// TODO(var-vnode): should fit vnode count up to 16 bits const TIMESTAMP_SHIFT_BITS: u8 = 22; const VNODE_ID_SHIFT_BITS: u8 = 12; -const SEQUENCE_UPPER_BOUND: u16 = 1 << 12; -const VNODE_ID_UPPER_BOUND: u32 = 1 << 10; - -const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::COUNT as u32); +const SEQUENCE_UPPER_BOUND: u16 = 1 << VNODE_ID_SHIFT_BITS; +const VNODE_ID_UPPER_BOUND: u32 = 1 << (TIMESTAMP_SHIFT_BITS - VNODE_ID_SHIFT_BITS); /// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format: /// @@ -62,6 +59,12 @@ pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode { impl RowIdGenerator { /// Create a new `RowIdGenerator` with given virtual nodes. pub fn new(vnodes: impl IntoIterator) -> Self { + assert!( + VirtualNode::count() <= VNODE_ID_UPPER_BOUND as usize, + "vnode count should not exceed {} due to limitation of row id format", + VNODE_ID_UPPER_BOUND + ); + let base = *UNIX_RISINGWAVE_DATE_EPOCH; Self { base, diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 5619ffc6e0f96..322e88a1edf17 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -44,7 +44,7 @@ pub fn place_vnode( // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( worker_slots.iter().map(|slots| slots.len()).sum(), - std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT), + std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::count()), ); // Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload @@ -79,14 +79,14 @@ pub fn place_vnode( is_temp: bool, } - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_slots.len()); + let (expected, mut remain) = VirtualNode::count().div_rem(&selected_slots.len()); let mut balances: HashMap = HashMap::default(); for slot in &selected_slots { let mut balance = Balance { slot: *slot, balance: -(expected as i32), - builder: BitmapBuilder::zeroed(VirtualNode::COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::count()), is_temp: false, }; @@ -102,7 +102,7 @@ pub fn place_vnode( let mut temp_slot = Balance { slot: WorkerSlotId::new(0u32, usize::MAX), /* This id doesn't matter for `temp_slot`. It's distinguishable via `is_temp`. */ balance: 0, - builder: BitmapBuilder::zeroed(VirtualNode::COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::count()), is_temp: true, }; match hint_worker_slot_mapping { @@ -158,7 +158,7 @@ pub fn place_vnode( let mut dst = balances.pop_back().unwrap(); let n = std::cmp::min(src.balance.abs(), dst.balance.abs()); let mut moved = 0; - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::count() { if moved >= n { break; } @@ -189,7 +189,7 @@ pub fn place_vnode( for (worker_slot, bitmap) in results { worker_result .entry(worker_slot) - .or_insert(BitmapBuilder::zeroed(VirtualNode::COUNT).finish()) + .or_insert(BitmapBuilder::zeroed(VirtualNode::count()).finish()) .bitor_assign(&bitmap); } @@ -207,7 +207,7 @@ mod tests { use crate::vnode_mapping::vnode_placement::place_vnode; #[test] fn test_place_vnode() { - assert_eq!(VirtualNode::COUNT, 256); + assert_eq!(VirtualNode::count(), 256); let serving_property = Property { is_unschedulable: false, @@ -220,7 +220,7 @@ mod tests { assert_eq!(wm1.len(), 256); assert_eq!(wm2.len(), 256); let mut count: usize = 0; - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::count() { let vnode = VirtualNode::from_index(idx); if wm1.get(vnode) == wm2.get(vnode) { count += 1; diff --git a/src/config/docs.md b/src/config/docs.md index 47905d71e5e0c..4646f0cc2280a 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config` | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | -| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | +| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::count() / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | | meta_leader_lease_secs | | 30 | | min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 | @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config` | parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 | | parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 | | parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 | -| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | +| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::count() / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | | periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 | | periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 | | periodic_split_compact_group_interval_sec | | 10 | diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index e544c39f62499..0658010ea5c0a 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -81,14 +81,14 @@ mod tests { let output = expr.eval(&input).await.unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..VirtualNode::count() as i16).contains(&vnode)); } // test eval_row for row in input.rows() { let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..VirtualNode::count() as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 3c6ab52f51e39..f069856acc33e 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -108,16 +108,16 @@ pub async fn handle_alter_parallelism( match &target_parallelism.parallelism { Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { - if available_parallelism > VirtualNode::COUNT as u32 { - builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::COUNT)); + if available_parallelism > VirtualNode::count() as u32 { + builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::count())); } } Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { - if *parallelism > VirtualNode::COUNT as u32 { - builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::COUNT)); + if *parallelism > VirtualNode::count() as u32 { + builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::count())); target_parallelism = PbTableParallelism { parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: VirtualNode::COUNT as u32, + parallelism: VirtualNode::count() as u32, })), }; } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 65bfbe09c54b0..cf3f02c2bdd70 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -910,7 +910,7 @@ impl BatchPlanFragmenter { .take(1) .update(|(_, info)| { info.vnode_bitmap = - Bitmap::ones(VirtualNode::COUNT).to_protobuf(); + Bitmap::ones(VirtualNode::count()).to_protobuf(); }) .collect(); } diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 419f4ffd21cb5..9a3e6d07b734c 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -320,7 +320,7 @@ impl<'a> Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - VirtualNode::COUNT as jint + VirtualNode::count() as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 8409e714852c2..e4becf1759351 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -160,8 +160,8 @@ impl CoordinatorWorker { } async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> { - let mut remaining_count = VirtualNode::COUNT; - let mut registered_vnode = HashSet::with_capacity(VirtualNode::COUNT); + let mut remaining_count = VirtualNode::count(); + let mut registered_vnode = HashSet::with_capacity(VirtualNode::count()); for vnode in first_vnode_bitmap.iter_vnodes() { remaining_count -= 1; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index fd2b986be28e7..62912274a57d8 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -414,11 +414,11 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::count()).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::count() / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } @@ -584,9 +584,9 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let all_vnode = (0..VirtualNode::count()).collect_vec(); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } @@ -705,7 +705,7 @@ mod tests { let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream( param.to_proto(), - Bitmap::zeros(VirtualNode::COUNT), + Bitmap::zeros(VirtualNode::count()), |rx| async { Ok(tonic::Response::new( manager @@ -742,11 +742,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::count()).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::count() / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } @@ -821,11 +821,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::count()).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::count() / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8062675156fe4..ff01a115886f3 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1610,14 +1610,14 @@ impl DdlController { let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap(); + let max_parallelism: NonZeroUsize = NonZeroUsize::new(VirtualNode::count()).unwrap(); - let parallelism_limited = parallelism > MAX_PARALLELISM; + let parallelism_limited = parallelism > max_parallelism; if parallelism_limited { - tracing::warn!("Too many parallelism, use {} instead", MAX_PARALLELISM); + tracing::warn!("Too many parallelism, use {} instead", max_parallelism); } - let parallelism = parallelism.min(MAX_PARALLELISM); + let parallelism = parallelism.min(max_parallelism); let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 42ed98b372c7d..af86513dd5fd3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -258,7 +258,7 @@ pub fn rebalance_actor_vnode( balance: i32, builder: BitmapBuilder, } - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&target_actor_count); + let (expected, mut remain) = VirtualNode::count().div_rem(&target_actor_count); tracing::debug!( "expected {}, remain {}, prev actors {}, target actors {}", @@ -289,7 +289,7 @@ pub fn rebalance_actor_vnode( builder }; - let (prev_expected, _) = VirtualNode::COUNT.div_rem(&actors.len()); + let (prev_expected, _) = VirtualNode::count().div_rem(&actors.len()); let prev_remain = removed .iter() @@ -322,7 +322,7 @@ pub fn rebalance_actor_vnode( .map(|actor_id| Balance { actor_id: *actor_id, balance: -(expected as i32), - builder: BitmapBuilder::zeroed(VirtualNode::COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::count()), }) .collect_vec(); @@ -384,7 +384,7 @@ pub fn rebalance_actor_vnode( let n = min(abs(src.balance), abs(dst.balance)); let mut moved = 0; - for idx in (0..VirtualNode::COUNT).rev() { + for idx in (0..VirtualNode::count()).rev() { if moved >= n { break; } @@ -2238,12 +2238,12 @@ impl ScaleController { } FragmentDistributionType::Hash => match parallelism { TableParallelism::Adaptive => { - if all_available_slots > VirtualNode::COUNT { - tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); - // force limit to VirtualNode::COUNT + if all_available_slots > VirtualNode::count() { + tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::count(), force limit to VirtualNode::count()"); + // force limit to VirtualNode::count() let target_worker_slots = schedule_units_for_slots( &schedulable_worker_slots, - VirtualNode::COUNT, + VirtualNode::count(), table_id, )?; @@ -2265,10 +2265,10 @@ impl ScaleController { } } TableParallelism::Fixed(mut n) => { - if n > VirtualNode::COUNT { + if n > VirtualNode::count() { // This should be unreachable, but we still intercept it to prevent accidental modifications. - tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); - n = VirtualNode::COUNT + tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::count(), force limit to VirtualNode::count()"); + n = VirtualNode::count() } let target_worker_slots = diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index 0dc0bced84005..a822a145ab25e 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -26,7 +26,7 @@ mod tests { use crate::stream::CustomActorInfo; fn simulated_parallelism(min: Option, max: Option) -> Vec { - let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT]; + let mut raw = vec![1, 3, 12, 42, VirtualNode::count()]; if let Some(min) = min { raw.retain(|n| *n > min); raw.push(min); @@ -55,7 +55,7 @@ mod tests { fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) { let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()); - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::count() { if prev_bitmap.is_set(idx) { assert!(bitmap.is_set(idx)); } @@ -63,7 +63,7 @@ mod tests { } fn check_bitmaps(bitmaps: &HashMap) { - let mut target = (0..VirtualNode::COUNT).map(|_| false).collect_vec(); + let mut target = (0..VirtualNode::count()).map(|_| false).collect_vec(); for bitmap in bitmaps.values() { for (idx, pos) in target.iter_mut().enumerate() { @@ -91,7 +91,7 @@ mod tests { let actor_ids = (0..parallelism as ActorId).collect_vec(); let actor_mapping = ActorMapping::new_uniform(actor_ids.into_iter()); - assert_eq!(actor_mapping.len(), VirtualNode::COUNT); + assert_eq!(actor_mapping.len(), VirtualNode::count()); let mut check: HashMap> = HashMap::new(); for (vnode, actor_id) in actor_mapping.iter_with_vnode() { @@ -178,7 +178,7 @@ mod tests { #[test] fn test_rebalance_scale_out() { - for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT - 1)) { + for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::count() - 1)) { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add 1 @@ -189,8 +189,8 @@ mod tests { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - // add to VirtualNode::COUNT - let actors_to_add = (parallelism as ActorId..VirtualNode::COUNT as ActorId).collect(); + // add to VirtualNode::count() + let actors_to_add = (parallelism as ActorId..VirtualNode::count() as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); @@ -275,7 +275,7 @@ mod tests { #[test] fn test_rebalance_scale_real() { - let actor_ids = (0..(VirtualNode::COUNT - 1) as ActorId).collect_vec(); + let actor_ids = (0..(VirtualNode::count() - 1) as ActorId).collect_vec(); let actors = build_fake_actors(actor_ids); let actors_to_remove = btreeset! {0, 1}; let actors_to_add = btreeset! {255}; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 4a9e1c5edda0b..1ea2722869bcb 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel; fn vnode_bitmaps(part_count: usize) -> impl Iterator> { static BITMAP_CACHE: LazyLock>>>> = LazyLock::new(|| Mutex::new(HashMap::new())); - assert_eq!(VirtualNode::COUNT % part_count, 0); + assert_eq!(VirtualNode::count() % part_count, 0); let mut cache = BITMAP_CACHE.lock(); match cache.entry(part_count) { Entry::Occupied(entry) => entry.get().clone().into_iter(), Entry::Vacant(entry) => entry .insert({ - let part_size = VirtualNode::COUNT / part_count; + let part_size = VirtualNode::count() / part_count; (0..part_count) .map(move |part_idx| { let start = part_idx * part_size; let end = part_idx * part_size + part_size; - let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut bitmap = BitmapBuilder::zeroed(VirtualNode::count()); for i in start..end { bitmap.set(i, true); } @@ -253,7 +253,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read latest watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::count() { let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i)); } }) @@ -261,7 +261,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read committed watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::count() { let _ = table_watermarks.read_watermark( VirtualNode::from_index(i), test_epoch(committed_epoch_idx as u64), diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 6a33d1ff1a09b..8bfa57e19ac58 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -74,7 +74,7 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { vnode.to_index() + 1 } } - Unbounded => VirtualNode::COUNT, + Unbounded => VirtualNode::count(), }; (left, right) } @@ -321,7 +321,7 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec { } pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound { - if vnode == VirtualNode::MAX { + if vnode == VirtualNode::max() { Unbounded } else { let end_bound_index = vnode.to_index() + 1; @@ -1299,7 +1299,7 @@ mod tests { Excluded(TableKey(concat(234, b""))) ) ); - let max_vnode = VirtualNode::COUNT - 1; + let max_vnode = VirtualNode::count() - 1; assert_eq!( prefixed_range_with_vnode( (Bound::::Unbounded, Bound::::Unbounded), @@ -1332,7 +1332,7 @@ mod tests { Excluded(b"1".as_slice()), Unbounded, ]; - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::count() { for left in &left_bound { for right in &right_bound { assert_eq!( diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 250e9014a1d36..2305d62389a57 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -176,7 +176,7 @@ impl TableWatermarksIndex { prev_watermark ); regress_vnodes - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count())) .set(vnode.to_index(), true); } } @@ -187,7 +187,7 @@ impl TableWatermarksIndex { let vnode_index = vnode.to_index(); if !regress_vnodes.is_set(vnode_index) { bitmap_builder - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count())) .set(vnode_index, true); } } @@ -220,7 +220,7 @@ impl TableWatermarksIndex { self.latest_epoch = epoch; #[cfg(debug_assertions)] { - let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::count()); for vnode_watermark in vnode_watermark_list.as_ref() { for vnode in vnode_watermark.vnode_bitmap.iter_ones() { assert!(!vnode_is_set.is_set(vnode)); @@ -507,7 +507,7 @@ impl TableWatermarks { } debug!("clear stale table watermark below epoch {}", safe_epoch); let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len()); - let mut unset_vnode: HashSet = (0..VirtualNode::COUNT) + let mut unset_vnode: HashSet = (0..VirtualNode::count()) .map(VirtualNode::from_index) .collect(); while let Some((epoch, _)) = self.watermarks.last() { @@ -535,7 +535,7 @@ impl TableWatermarks { } } if !set_vnode.is_empty() { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for vnode in set_vnode { builder.set(vnode.to_index(), true); } @@ -706,7 +706,7 @@ mod tests { use crate::version::HummockVersion; fn build_bitmap(vnodes: impl IntoIterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for vnode in vnodes { builder.set(vnode, true); } @@ -746,7 +746,7 @@ mod tests { let mut second_table_watermark = TableWatermarks::single_epoch( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )], direction, @@ -754,7 +754,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into(), @@ -815,7 +815,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into(), @@ -853,7 +853,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into() @@ -879,7 +879,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into() @@ -905,7 +905,7 @@ mod tests { ( epoch4, vec![VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::count())), watermark3.clone() )] .into() @@ -932,7 +932,7 @@ mod tests { vec![ VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()), VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::count())), watermark3.clone() ) ] @@ -1164,7 +1164,7 @@ mod tests { EPOCH1, vec![VnodeWatermark { watermark: watermark1.clone(), - vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), + vnode_bitmap: build_bitmap(0..VirtualNode::count()), }] .into(), )], @@ -1182,7 +1182,7 @@ mod tests { ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); assert_eq!(EPOCH2, index.latest_epoch); - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::count() { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 79b00d0f9b8f2..20b82c5c9b312 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1836,8 +1836,8 @@ pub(crate) mod tests { Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), None, ); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - for vnode_id in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::count() * 2; + for vnode_id in 0..VirtualNode::count() / 2 { let mut last_k: u64 = 1; let init_epoch = test_epoch(100 * object_id); let mut last_epoch = init_epoch; @@ -1880,9 +1880,9 @@ pub(crate) mod tests { let target_file_size = max_sst_file_size / 4; let mut table_watermarks = BTreeMap::default(); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for i in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::count() * 2; + let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::count()); + for i in 0..VirtualNode::count() / 2 { if i % 2 == 0 { vnode_builder.set(i, true); } else { @@ -1947,7 +1947,7 @@ pub(crate) mod tests { direction: WatermarkDirection::Ascending, vnode_watermarks: BTreeMap::default(), }; - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::count() { if i % 2 == 0 { watermark .vnode_watermarks diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 67c3aa8b059bd..2b27d010ed8ac 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -50,7 +50,7 @@ async fn test_read_version_basic() { let mut epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::count())); let mut read_version = HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, @@ -276,7 +276,7 @@ async fn test_read_filter_basic() { let epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::count())); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index a1f74949b7313..0dc75f936cdbd 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2025,13 +2025,13 @@ async fn test_table_watermark() { let vnode1 = VirtualNode::from_index(1); let vnode_bitmap1 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); builder.set(1, true); builder.finish() }); let vnode2 = VirtualNode::from_index(2); let vnode_bitmap2 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); builder.set(2, true); builder.finish() }); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 1df58074e22d0..8c7c8c261d4c6 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1377,7 +1377,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, - Arc::new(Bitmap::ones(VirtualNode::COUNT)), + Arc::new(Bitmap::ones(VirtualNode::count())), )) .await; diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 2a5b62f046d32..ad163041a2f44 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -184,7 +184,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)), + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::count())), } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index ee87177923e9b..60633b9480af5 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -1149,7 +1149,7 @@ mod tests { table_id: TEST_TABLE_ID, new_read_version_sender: tx, is_replicated: false, - vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::COUNT).finish()), + vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::count()).finish()), }); rx.await.unwrap() }; diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 88351c34b6210..f3e1ea5ea03e4 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -370,14 +370,14 @@ impl TableUnsyncData { prev_watermarks.extend(table_watermarks); } Entry::Vacant(entry) => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::count()); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); entry.insert((table_watermarks, vnode_bitmap)); } } } None => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::count()); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); self.table_watermarks = Some(( direction, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 1f8b17fb6c662..f7cf5d9c09990 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -533,9 +533,9 @@ impl SharedBufferBatch { } pub fn collect_vnodes(&self) -> Vec { - let mut vnodes = Vec::with_capacity(VirtualNode::COUNT); + let mut vnodes = Vec::with_capacity(VirtualNode::count()); let mut next_vnode_id = 0; - while next_vnode_id < VirtualNode::COUNT { + while next_vnode_id < VirtualNode::count() { let seek_key = TableKey( VirtualNode::from_index(next_vnode_id) .to_be_bytes() diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 97b448faec8d7..91d8dd226d53d 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -100,7 +100,7 @@ where last_table_id: 0, table_partition_vnode, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::max().to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count, } @@ -116,7 +116,7 @@ where last_table_id: 0, table_partition_vnode: BTreeMap::default(), split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::max().to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count: None, } @@ -229,14 +229,14 @@ where switch_builder = true; if self.split_weight_by_vnode > 1 { self.largest_vnode_in_current_partition = - VirtualNode::COUNT / (self.split_weight_by_vnode as usize) - 1; + VirtualNode::count() / (self.split_weight_by_vnode as usize) - 1; } else { // default - self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index(); + self.largest_vnode_in_current_partition = VirtualNode::max().to_index(); } } } - if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() { + if self.largest_vnode_in_current_partition != VirtualNode::max().to_index() { let key_vnode = user_key.get_vnode_id(); if key_vnode > self.largest_vnode_in_current_partition { // vnode partition change @@ -244,7 +244,7 @@ where // SAFETY: `self.split_weight_by_vnode > 1` here. let (basic, remainder) = - VirtualNode::COUNT.div_rem(&(self.split_weight_by_vnode as usize)); + VirtualNode::count().div_rem(&(self.split_weight_by_vnode as usize)); let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder); self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area { (key_vnode / basic + 1) * basic diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index f382bf5fc2d5d..e6c6d83111c87 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -733,7 +733,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), + vnodes: Arc::new(Bitmap::ones(VirtualNode::count())), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 63ee6762cfb30..1e14ea6d71977 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1000,15 +1000,15 @@ mod tests { test_env.register_table(table.clone()).await; fn build_bitmap(indexes: impl Iterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(i, true); } Arc::new(builder.finish()) } - let vnodes1 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 0)); - let vnodes2 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 1)); + let vnodes1 = build_bitmap((0..VirtualNode::count()).filter(|i| i % 2 == 0)); + let vnodes2 = build_bitmap((0..VirtualNode::count()).filter(|i| i % 2 == 1)); let factory1 = KvLogStoreFactory::new( test_env.storage.clone(), @@ -1150,7 +1150,7 @@ mod tests { .clear_shared_buffer(test_env.manager.get_current_version().await.id) .await; - let vnodes = build_bitmap(0..VirtualNode::COUNT); + let vnodes = build_bitmap(0..VirtualNode::count()); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 92a3caf4cd2e3..f2bedacac2dbc 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -336,7 +336,7 @@ impl LogStoreRowSerde { ) -> Bytes { let (epoch, seq_id) = offset; Bytes::from(next_key(&serialize_pk( - (self.pk_info.compute_pk)(VirtualNode::MAX, Self::encode_epoch(epoch), seq_id), + (self.pk_info.compute_pk)(VirtualNode::max(), Self::encode_epoch(epoch), seq_id), &self.pk_serde, ))) } @@ -981,7 +981,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); @@ -1124,7 +1124,7 @@ mod tests { let table = gen_test_log_store_table(pk_info); let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); let (ops, rows) = gen_test_data(0); @@ -1283,7 +1283,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); @@ -1428,7 +1428,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); @@ -1538,7 +1538,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs index 5fc10cd0cc58a..fd691e236d394 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs @@ -177,7 +177,7 @@ pub(crate) fn gen_test_log_store_table(pk_info: &'static KvLogStorePkInfo) -> Pb pub(crate) fn calculate_vnode_bitmap<'a>( test_data: impl Iterator)>, ) -> Bitmap { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for vnode in test_data.map(|(_, row)| VirtualNode::compute_row(row, &[TEST_SCHEMA_DIST_KEY_INDEX])) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 120a49e8e7ee3..e7b57bdc93af2 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -110,7 +110,7 @@ impl LogWriter for KvLogStoreWriter { { // When enter this branch, the chunk cannot be added directly, and should be add to // state store and flush - let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::count()); let mut flush_info = FlushInfo::new(); for (i, (op, row)) in chunk.rows().enumerate() { let seq_id = start_seq_id + (i as SeqIdType); diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 9f452dc1863b0..73bdbdd1dc3b1 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1100,8 +1100,8 @@ mod tests { } async fn test_hash_dispatcher_complex_inner() { - // This test only works when VirtualNode::COUNT is 256. - static_assertions::const_assert_eq!(VirtualNode::COUNT, 256); + // This test only works when VirtualNode::count() is 256. + assert_eq!(VirtualNode::count(), 256); let num_outputs = 2; // actor id ranges from 1 to 2 let key_indices = &[0, 2]; @@ -1116,9 +1116,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::count() / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::count(), num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1372,9 +1372,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::count() / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::count(), num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1408,7 +1408,7 @@ mod tests { hasher.update(&bytes); } let output_idx = - hash_mapping[hasher.finish() as usize % VirtualNode::COUNT] as usize - 1; + hash_mapping[hasher.finish() as usize % VirtualNode::count()] as usize - 1; for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) { builder.append(Some(*val)); } diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 1fcb85c26f88e..e69b682e1ba5b 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -140,7 +140,7 @@ mod tests { ]); let pk_indices = vec![0]; let row_id_index = 0; - let row_id_generator = Bitmap::ones(VirtualNode::COUNT); + let row_id_generator = Bitmap::ones(VirtualNode::count()); let (mut tx, upstream) = MockSource::channel(); let upstream = upstream.into_executor(schema.clone(), pk_indices.clone()); diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index bef6b1ca1f1c2..1660c381e69df 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::count(); let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::count(); let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::count(); let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100;