From 0c63f01ae6bf784f49703424532cfd098c49ae6f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 21 Aug 2024 12:25:23 +0800 Subject: [PATCH 1/6] read env var for vnode count Signed-off-by: Bugen Zhao --- src/common/src/config.rs | 4 +- .../src/hash/consistent_hash/mapping.rs | 20 ++--- src/common/src/hash/consistent_hash/vnode.rs | 77 +++++++++++++------ src/common/src/hash/table_distribution.rs | 4 +- src/common/src/util/row_id.rs | 2 +- .../src/vnode_mapping/vnode_placement.rs | 19 +++-- src/expr/impl/src/scalar/vnode.rs | 4 +- src/frontend/src/handler/alter_parallelism.rs | 10 +-- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- src/jni_core/src/lib.rs | 2 +- .../sink_coordination/coordinator_worker.rs | 4 +- .../src/manager/sink_coordination/manager.rs | 24 +++--- src/meta/src/rpc/ddl_controller.rs | 2 +- src/meta/src/stream/scale.rs | 16 ++-- src/meta/src/stream/test_scale.rs | 14 ++-- src/storage/benches/bench_table_watermarks.rs | 10 +-- src/storage/hummock_sdk/src/key.rs | 8 +- .../hummock_sdk/src/table_watermark.rs | 30 ++++---- .../hummock_test/src/compactor_tests.rs | 12 +-- .../src/hummock_read_version_tests.rs | 4 +- .../hummock_test/src/hummock_storage_tests.rs | 4 +- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_trace/src/opts.rs | 2 +- .../event_handler/hummock_event_handler.rs | 2 +- .../src/hummock/event_handler/uploader/mod.rs | 4 +- .../shared_buffer/shared_buffer_batch.rs | 4 +- .../src/hummock/sstable/multi_builder.rs | 12 +-- src/storage/src/store.rs | 2 +- .../common/log_store_impl/kv_log_store/mod.rs | 8 +- .../log_store_impl/kv_log_store/serde.rs | 12 +-- .../log_store_impl/kv_log_store/test_utils.rs | 2 +- .../log_store_impl/kv_log_store/writer.rs | 2 +- src/stream/src/executor/dispatch.rs | 10 +-- src/stream/src/executor/row_id_gen.rs | 2 +- .../scale/streaming_parallelism.rs | 6 +- 35 files changed, 187 insertions(+), 155 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 50e1ce853ebe6..9b1a846ca3c86 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -427,10 +427,10 @@ impl<'de> Deserialize<'de> for DefaultParallelism { ))) } } - Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::COUNT { + Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::DEFAULT_COUNT { Err(serde::de::Error::custom(format!( "default parallelism should be not great than {}", - VirtualNode::COUNT + VirtualNode::DEFAULT_COUNT )))? } else { NonZeroUsize::new(i).ok_or_else(|| { diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index a462acb291853..e59a9260475ca 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -108,13 +108,13 @@ impl VnodeMapping { pub fn new_uniform(items: impl ExactSizeIterator) -> Self { // If the number of items is greater than the total vnode count, no vnode will be mapped to // some items and the mapping will be invalid. - assert!(items.len() <= VirtualNode::COUNT); + assert!(items.len() <= VirtualNode::DEFAULT_COUNT); let mut original_indices = Vec::with_capacity(items.len()); let mut data = Vec::with_capacity(items.len()); - let hash_shard_size = VirtualNode::COUNT / items.len(); - let mut one_more_count = VirtualNode::COUNT % items.len(); + let hash_shard_size = VirtualNode::DEFAULT_COUNT / items.len(); + let mut one_more_count = VirtualNode::DEFAULT_COUNT % items.len(); let mut init_bound = 0; for item in items { @@ -209,7 +209,7 @@ impl VnodeMapping { for (vnode, item) in self.iter_with_vnode() { vnode_bitmaps .entry(item) - .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT)) .set(vnode.to_index(), true); } @@ -222,10 +222,10 @@ impl VnodeMapping { /// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap /// represents the vnodes mapped to the item. pub fn from_bitmaps(bitmaps: &HashMap) -> Self { - let mut items = vec![None; VirtualNode::COUNT]; + let mut items = vec![None; VirtualNode::DEFAULT_COUNT]; for (&item, bitmap) in bitmaps { - assert_eq!(bitmap.len(), VirtualNode::COUNT); + assert_eq!(bitmap.len(), VirtualNode::DEFAULT_COUNT); for idx in bitmap.iter_ones() { if let Some(prev) = items[idx].replace(item) { panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`"); @@ -243,7 +243,7 @@ impl VnodeMapping { /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`]. pub fn from_expanded(items: &[T::Item]) -> Self { - assert_eq!(items.len(), VirtualNode::COUNT); + assert_eq!(items.len(), VirtualNode::DEFAULT_COUNT); let (original_indices, data) = compress_data(items); Self { original_indices, @@ -403,7 +403,7 @@ mod tests { type TestMapping = VnodeMapping; type Test2Mapping = VnodeMapping; - const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT]; + const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::DEFAULT_COUNT]; fn uniforms() -> impl Iterator { COUNTS @@ -414,7 +414,7 @@ mod tests { fn randoms() -> impl Iterator { COUNTS.iter().map(|&count| { let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32)) - .take(VirtualNode::COUNT) + .take(VirtualNode::DEFAULT_COUNT) .collect_vec(); TestMapping::from_expanded(&raw) }) @@ -427,7 +427,7 @@ mod tests { #[test] fn test_uniform() { for vnode_mapping in uniforms() { - assert_eq!(vnode_mapping.len(), VirtualNode::COUNT); + assert_eq!(vnode_mapping.len(), VirtualNode::DEFAULT_COUNT); let item_count = vnode_mapping.iter_unique().count(); let mut check: HashMap> = HashMap::new(); diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index f528544689f31..cd64749b5ae2b 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::LazyLock; + use itertools::Itertools; use parse_display::Display; @@ -31,44 +33,71 @@ pub struct VirtualNode(VirtualNodeInner); /// The internal representation of a virtual node id. type VirtualNodeInner = u16; -static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); +// static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); impl From for VirtualNode { fn from(hash_code: Crc32HashCode) -> Self { // Take the least significant bits of the hash code. // TODO: should we use the most significant bits? - let inner = (hash_code.value() % Self::COUNT as u64) as VirtualNodeInner; + let inner = (hash_code.value() % Self::count() as u64) as VirtualNodeInner; VirtualNode(inner) } } impl VirtualNode { - /// The number of bits used to represent a virtual node. - /// - /// Note: Not all bits of the inner representation are used. One should rely on this constant - /// to determine the count of virtual nodes. - pub const BITS: usize = 8; - /// The total count of virtual nodes. - pub const COUNT: usize = 1 << Self::BITS; + pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; + /// We may use `VirtualNode` as a datum in a stream, or store it as a column. + /// Hence this reifies it as a RW datatype. + pub const RW_TYPE: DataType = DataType::Int16; /// The size of a virtual node in bytes, in memory or serialized representation. pub const SIZE: usize = std::mem::size_of::(); + /// The minimum (zero) value of the virtual node. + pub const ZERO: VirtualNode = unsafe { VirtualNode::from_index_unchecked(0) }; +} + +impl VirtualNode { + /// The total count of virtual nodes. + pub const DEFAULT_COUNT: usize = 1 << 8; + /// The maximum value of the virtual node. + pub const DEFAULT_MAX: VirtualNode = + unsafe { VirtualNode::from_index_unchecked(Self::DEFAULT_COUNT - 1) }; +} + +impl VirtualNode { + pub fn count() -> usize { + static COUNT: LazyLock = LazyLock::new(|| { + if let Ok(count) = std::env::var("RW_VNODE_COUNT") { + let count: usize = count.parse().expect("RW_VNODE_COUNT must be a number"); + assert!( + count <= VirtualNode::MAX_COUNT, + "vnode count must be less than {}", + VirtualNode::MAX_COUNT + ); + count + } else { + VirtualNode::DEFAULT_COUNT + } + }); + + *COUNT + } + + pub fn max() -> VirtualNode { + VirtualNode::from_index(Self::count() - 1) + } } /// An iterator over all virtual nodes. pub type AllVirtualNodeIter = std::iter::Map, fn(usize) -> VirtualNode>; impl VirtualNode { - /// The maximum value of the virtual node. - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); - /// We may use `VirtualNode` as a datum in a stream, or store it as a column. - /// Hence this reifies it as a RW datatype. - pub const RW_TYPE: DataType = DataType::Int16; - /// The minimum (zero) value of the virtual node. - pub const ZERO: VirtualNode = VirtualNode::from_index(0); - /// Creates a virtual node from the `usize` index. - pub const fn from_index(index: usize) -> Self { - debug_assert!(index < Self::COUNT); + pub fn from_index(index: usize) -> Self { + debug_assert!(index < Self::count()); + Self(index as _) + } + + pub const unsafe fn from_index_unchecked(index: usize) -> Self { Self(index as _) } @@ -78,8 +107,8 @@ impl VirtualNode { } /// Creates a virtual node from the given scalar representation. Used by `VNODE` expression. - pub const fn from_scalar(scalar: i16) -> Self { - debug_assert!((scalar as usize) < Self::COUNT); + pub fn from_scalar(scalar: i16) -> Self { + debug_assert!((scalar as usize) < Self::count()); Self(scalar as _) } @@ -97,9 +126,9 @@ impl VirtualNode { } /// Creates a virtual node from the given big-endian bytes representation. - pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { + pub fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); - debug_assert!((inner as usize) < Self::COUNT); + debug_assert!((inner as usize) < Self::count()); Self(inner) } @@ -110,7 +139,7 @@ impl VirtualNode { /// Iterates over all virtual nodes. pub fn all() -> AllVirtualNodeIter { - (0..Self::COUNT).map(Self::from_index) + (0..Self::count()).map(Self::from_index) } } diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 9be9cd2abafb2..766a381add3cd 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -108,7 +108,7 @@ impl TableDistribution { pub fn singleton_vnode_bitmap_ref() -> &'static Arc { /// A bitmap that only the default vnode is set. static SINGLETON_VNODES: LazyLock> = LazyLock::new(|| { - let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnodes = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); vnodes.set(SINGLETON_VNODE.to_index(), true); vnodes.finish().into() }); @@ -123,7 +123,7 @@ impl TableDistribution { pub fn all_vnodes_ref() -> &'static Arc { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); + LazyLock::new(|| Bitmap::ones(VirtualNode::DEFAULT_COUNT).into()); &ALL_VNODES } diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index 508f418903413..1950b4f6358af 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -25,7 +25,7 @@ const VNODE_ID_SHIFT_BITS: u8 = 12; const SEQUENCE_UPPER_BOUND: u16 = 1 << 12; const VNODE_ID_UPPER_BOUND: u32 = 1 << 10; -const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::COUNT as u32); +const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::DEFAULT_COUNT as u32); /// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format: /// diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 5619ffc6e0f96..23fbcdda43008 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -44,7 +44,10 @@ pub fn place_vnode( // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( worker_slots.iter().map(|slots| slots.len()).sum(), - std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT), + std::cmp::min( + max_parallelism.unwrap_or(usize::MAX), + VirtualNode::DEFAULT_COUNT, + ), ); // Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload @@ -79,14 +82,14 @@ pub fn place_vnode( is_temp: bool, } - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_slots.len()); + let (expected, mut remain) = VirtualNode::DEFAULT_COUNT.div_rem(&selected_slots.len()); let mut balances: HashMap = HashMap::default(); for slot in &selected_slots { let mut balance = Balance { slot: *slot, balance: -(expected as i32), - builder: BitmapBuilder::zeroed(VirtualNode::COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT), is_temp: false, }; @@ -102,7 +105,7 @@ pub fn place_vnode( let mut temp_slot = Balance { slot: WorkerSlotId::new(0u32, usize::MAX), /* This id doesn't matter for `temp_slot`. It's distinguishable via `is_temp`. */ balance: 0, - builder: BitmapBuilder::zeroed(VirtualNode::COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT), is_temp: true, }; match hint_worker_slot_mapping { @@ -158,7 +161,7 @@ pub fn place_vnode( let mut dst = balances.pop_back().unwrap(); let n = std::cmp::min(src.balance.abs(), dst.balance.abs()); let mut moved = 0; - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::DEFAULT_COUNT { if moved >= n { break; } @@ -189,7 +192,7 @@ pub fn place_vnode( for (worker_slot, bitmap) in results { worker_result .entry(worker_slot) - .or_insert(BitmapBuilder::zeroed(VirtualNode::COUNT).finish()) + .or_insert(BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT).finish()) .bitor_assign(&bitmap); } @@ -207,7 +210,7 @@ mod tests { use crate::vnode_mapping::vnode_placement::place_vnode; #[test] fn test_place_vnode() { - assert_eq!(VirtualNode::COUNT, 256); + assert_eq!(VirtualNode::DEFAULT_COUNT, 256); let serving_property = Property { is_unschedulable: false, @@ -220,7 +223,7 @@ mod tests { assert_eq!(wm1.len(), 256); assert_eq!(wm2.len(), 256); let mut count: usize = 0; - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::DEFAULT_COUNT { let vnode = VirtualNode::from_index(idx); if wm1.get(vnode) == wm2.get(vnode) { count += 1; diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index e544c39f62499..9b689002b6a1c 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -81,14 +81,14 @@ mod tests { let output = expr.eval(&input).await.unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..VirtualNode::DEFAULT_COUNT as i16).contains(&vnode)); } // test eval_row for row in input.rows() { let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..VirtualNode::DEFAULT_COUNT as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 3c6ab52f51e39..f981b7ba6fabd 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -108,16 +108,16 @@ pub async fn handle_alter_parallelism( match &target_parallelism.parallelism { Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { - if available_parallelism > VirtualNode::COUNT as u32 { - builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::COUNT)); + if available_parallelism > VirtualNode::DEFAULT_COUNT as u32 { + builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::DEFAULT_COUNT)); } } Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { - if *parallelism > VirtualNode::COUNT as u32 { - builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::COUNT)); + if *parallelism > VirtualNode::DEFAULT_COUNT as u32 { + builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::DEFAULT_COUNT)); target_parallelism = PbTableParallelism { parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: VirtualNode::COUNT as u32, + parallelism: VirtualNode::DEFAULT_COUNT as u32, })), }; } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 65bfbe09c54b0..51194ea7f4142 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -910,7 +910,7 @@ impl BatchPlanFragmenter { .take(1) .update(|(_, info)| { info.vnode_bitmap = - Bitmap::ones(VirtualNode::COUNT).to_protobuf(); + Bitmap::ones(VirtualNode::DEFAULT_COUNT).to_protobuf(); }) .collect(); } diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 419f4ffd21cb5..af0c067354e68 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -320,7 +320,7 @@ impl<'a> Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - VirtualNode::COUNT as jint + VirtualNode::DEFAULT_COUNT as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 8409e714852c2..2338d170437ad 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -160,8 +160,8 @@ impl CoordinatorWorker { } async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> { - let mut remaining_count = VirtualNode::COUNT; - let mut registered_vnode = HashSet::with_capacity(VirtualNode::COUNT); + let mut remaining_count = VirtualNode::DEFAULT_COUNT; + let mut registered_vnode = HashSet::with_capacity(VirtualNode::DEFAULT_COUNT); for vnode in first_vnode_bitmap.iter_vnodes() { remaining_count -= 1; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index fd2b986be28e7..c4f2fa529f006 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -414,11 +414,11 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::DEFAULT_COUNT / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for i in indexes { builder.set(*i, true); } @@ -584,9 +584,9 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for i in indexes { builder.set(*i, true); } @@ -705,7 +705,7 @@ mod tests { let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream( param.to_proto(), - Bitmap::zeros(VirtualNode::COUNT), + Bitmap::zeros(VirtualNode::DEFAULT_COUNT), |rx| async { Ok(tonic::Response::new( manager @@ -742,11 +742,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::DEFAULT_COUNT / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for i in indexes { builder.set(*i, true); } @@ -821,11 +821,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::DEFAULT_COUNT / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for i in indexes { builder.set(*i, true); } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8062675156fe4..3c6f88319ca04 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1610,7 +1610,7 @@ impl DdlController { let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap(); + const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::DEFAULT_COUNT).unwrap(); let parallelism_limited = parallelism > MAX_PARALLELISM; if parallelism_limited { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 42ed98b372c7d..1d3a3d50eb0c0 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -258,7 +258,7 @@ pub fn rebalance_actor_vnode( balance: i32, builder: BitmapBuilder, } - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&target_actor_count); + let (expected, mut remain) = VirtualNode::DEFAULT_COUNT.div_rem(&target_actor_count); tracing::debug!( "expected {}, remain {}, prev actors {}, target actors {}", @@ -289,7 +289,7 @@ pub fn rebalance_actor_vnode( builder }; - let (prev_expected, _) = VirtualNode::COUNT.div_rem(&actors.len()); + let (prev_expected, _) = VirtualNode::DEFAULT_COUNT.div_rem(&actors.len()); let prev_remain = removed .iter() @@ -322,7 +322,7 @@ pub fn rebalance_actor_vnode( .map(|actor_id| Balance { actor_id: *actor_id, balance: -(expected as i32), - builder: BitmapBuilder::zeroed(VirtualNode::COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT), }) .collect_vec(); @@ -384,7 +384,7 @@ pub fn rebalance_actor_vnode( let n = min(abs(src.balance), abs(dst.balance)); let mut moved = 0; - for idx in (0..VirtualNode::COUNT).rev() { + for idx in (0..VirtualNode::DEFAULT_COUNT).rev() { if moved >= n { break; } @@ -2238,12 +2238,12 @@ impl ScaleController { } FragmentDistributionType::Hash => match parallelism { TableParallelism::Adaptive => { - if all_available_slots > VirtualNode::COUNT { + if all_available_slots > VirtualNode::DEFAULT_COUNT { tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); // force limit to VirtualNode::COUNT let target_worker_slots = schedule_units_for_slots( &schedulable_worker_slots, - VirtualNode::COUNT, + VirtualNode::DEFAULT_COUNT, table_id, )?; @@ -2265,10 +2265,10 @@ impl ScaleController { } } TableParallelism::Fixed(mut n) => { - if n > VirtualNode::COUNT { + if n > VirtualNode::DEFAULT_COUNT { // This should be unreachable, but we still intercept it to prevent accidental modifications. tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); - n = VirtualNode::COUNT + n = VirtualNode::DEFAULT_COUNT } let target_worker_slots = diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index 0dc0bced84005..c030237fe29d9 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -26,7 +26,7 @@ mod tests { use crate::stream::CustomActorInfo; fn simulated_parallelism(min: Option, max: Option) -> Vec { - let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT]; + let mut raw = vec![1, 3, 12, 42, VirtualNode::DEFAULT_COUNT]; if let Some(min) = min { raw.retain(|n| *n > min); raw.push(min); @@ -55,7 +55,7 @@ mod tests { fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) { let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()); - for idx in 0..VirtualNode::COUNT { + for idx in 0..VirtualNode::DEFAULT_COUNT { if prev_bitmap.is_set(idx) { assert!(bitmap.is_set(idx)); } @@ -63,7 +63,7 @@ mod tests { } fn check_bitmaps(bitmaps: &HashMap) { - let mut target = (0..VirtualNode::COUNT).map(|_| false).collect_vec(); + let mut target = (0..VirtualNode::DEFAULT_COUNT).map(|_| false).collect_vec(); for bitmap in bitmaps.values() { for (idx, pos) in target.iter_mut().enumerate() { @@ -91,7 +91,7 @@ mod tests { let actor_ids = (0..parallelism as ActorId).collect_vec(); let actor_mapping = ActorMapping::new_uniform(actor_ids.into_iter()); - assert_eq!(actor_mapping.len(), VirtualNode::COUNT); + assert_eq!(actor_mapping.len(), VirtualNode::DEFAULT_COUNT); let mut check: HashMap> = HashMap::new(); for (vnode, actor_id) in actor_mapping.iter_with_vnode() { @@ -178,7 +178,7 @@ mod tests { #[test] fn test_rebalance_scale_out() { - for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT - 1)) { + for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::DEFAULT_COUNT - 1)) { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add 1 @@ -190,7 +190,7 @@ mod tests { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add to VirtualNode::COUNT - let actors_to_add = (parallelism as ActorId..VirtualNode::COUNT as ActorId).collect(); + let actors_to_add = (parallelism as ActorId..VirtualNode::DEFAULT_COUNT as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); @@ -275,7 +275,7 @@ mod tests { #[test] fn test_rebalance_scale_real() { - let actor_ids = (0..(VirtualNode::COUNT - 1) as ActorId).collect_vec(); + let actor_ids = (0..(VirtualNode::DEFAULT_COUNT - 1) as ActorId).collect_vec(); let actors = build_fake_actors(actor_ids); let actors_to_remove = btreeset! {0, 1}; let actors_to_add = btreeset! {255}; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 4a9e1c5edda0b..57934ec71a04f 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel; fn vnode_bitmaps(part_count: usize) -> impl Iterator> { static BITMAP_CACHE: LazyLock>>>> = LazyLock::new(|| Mutex::new(HashMap::new())); - assert_eq!(VirtualNode::COUNT % part_count, 0); + assert_eq!(VirtualNode::DEFAULT_COUNT % part_count, 0); let mut cache = BITMAP_CACHE.lock(); match cache.entry(part_count) { Entry::Occupied(entry) => entry.get().clone().into_iter(), Entry::Vacant(entry) => entry .insert({ - let part_size = VirtualNode::COUNT / part_count; + let part_size = VirtualNode::DEFAULT_COUNT / part_count; (0..part_count) .map(move |part_idx| { let start = part_idx * part_size; let end = part_idx * part_size + part_size; - let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut bitmap = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for i in start..end { bitmap.set(i, true); } @@ -253,7 +253,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read latest watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::DEFAULT_COUNT { let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i)); } }) @@ -261,7 +261,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read committed watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::DEFAULT_COUNT { let _ = table_watermarks.read_watermark( VirtualNode::from_index(i), test_epoch(committed_epoch_idx as u64), diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 6a33d1ff1a09b..bfd23f2916128 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -74,7 +74,7 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { vnode.to_index() + 1 } } - Unbounded => VirtualNode::COUNT, + Unbounded => VirtualNode::DEFAULT_COUNT, }; (left, right) } @@ -321,7 +321,7 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec { } pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound { - if vnode == VirtualNode::MAX { + if vnode == VirtualNode::DEFAULT_MAX { Unbounded } else { let end_bound_index = vnode.to_index() + 1; @@ -1299,7 +1299,7 @@ mod tests { Excluded(TableKey(concat(234, b""))) ) ); - let max_vnode = VirtualNode::COUNT - 1; + let max_vnode = VirtualNode::DEFAULT_COUNT - 1; assert_eq!( prefixed_range_with_vnode( (Bound::::Unbounded, Bound::::Unbounded), @@ -1332,7 +1332,7 @@ mod tests { Excluded(b"1".as_slice()), Unbounded, ]; - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::DEFAULT_COUNT { for left in &left_bound { for right in &right_bound { assert_eq!( diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 250e9014a1d36..55a1a4fca9d20 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -176,7 +176,7 @@ impl TableWatermarksIndex { prev_watermark ); regress_vnodes - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT)) .set(vnode.to_index(), true); } } @@ -187,7 +187,7 @@ impl TableWatermarksIndex { let vnode_index = vnode.to_index(); if !regress_vnodes.is_set(vnode_index) { bitmap_builder - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT)) .set(vnode_index, true); } } @@ -220,7 +220,7 @@ impl TableWatermarksIndex { self.latest_epoch = epoch; #[cfg(debug_assertions)] { - let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for vnode_watermark in vnode_watermark_list.as_ref() { for vnode in vnode_watermark.vnode_bitmap.iter_ones() { assert!(!vnode_is_set.is_set(vnode)); @@ -507,7 +507,7 @@ impl TableWatermarks { } debug!("clear stale table watermark below epoch {}", safe_epoch); let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len()); - let mut unset_vnode: HashSet = (0..VirtualNode::COUNT) + let mut unset_vnode: HashSet = (0..VirtualNode::DEFAULT_COUNT) .map(VirtualNode::from_index) .collect(); while let Some((epoch, _)) = self.watermarks.last() { @@ -535,7 +535,7 @@ impl TableWatermarks { } } if !set_vnode.is_empty() { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for vnode in set_vnode { builder.set(vnode.to_index(), true); } @@ -706,7 +706,7 @@ mod tests { use crate::version::HummockVersion; fn build_bitmap(vnodes: impl IntoIterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for vnode in vnodes { builder.set(vnode, true); } @@ -746,7 +746,7 @@ mod tests { let mut second_table_watermark = TableWatermarks::single_epoch( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::DEFAULT_COUNT), watermark3.clone(), )], direction, @@ -754,7 +754,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::DEFAULT_COUNT), watermark3.clone(), )] .into(), @@ -815,7 +815,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::DEFAULT_COUNT), watermark3.clone(), )] .into(), @@ -853,7 +853,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::DEFAULT_COUNT), watermark3.clone(), )] .into() @@ -879,7 +879,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::DEFAULT_COUNT), watermark3.clone(), )] .into() @@ -905,7 +905,7 @@ mod tests { ( epoch4, vec![VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::DEFAULT_COUNT)), watermark3.clone() )] .into() @@ -932,7 +932,7 @@ mod tests { vec![ VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()), VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::DEFAULT_COUNT)), watermark3.clone() ) ] @@ -1164,7 +1164,7 @@ mod tests { EPOCH1, vec![VnodeWatermark { watermark: watermark1.clone(), - vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), + vnode_bitmap: build_bitmap(0..VirtualNode::DEFAULT_COUNT), }] .into(), )], @@ -1182,7 +1182,7 @@ mod tests { ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); assert_eq!(EPOCH2, index.latest_epoch); - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::DEFAULT_COUNT { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 79b00d0f9b8f2..bd17aad8f91c3 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1836,8 +1836,8 @@ pub(crate) mod tests { Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), None, ); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - for vnode_id in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::DEFAULT_COUNT * 2; + for vnode_id in 0..VirtualNode::DEFAULT_COUNT / 2 { let mut last_k: u64 = 1; let init_epoch = test_epoch(100 * object_id); let mut last_epoch = init_epoch; @@ -1880,9 +1880,9 @@ pub(crate) mod tests { let target_file_size = max_sst_file_size / 4; let mut table_watermarks = BTreeMap::default(); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for i in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::DEFAULT_COUNT * 2; + let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + for i in 0..VirtualNode::DEFAULT_COUNT / 2 { if i % 2 == 0 { vnode_builder.set(i, true); } else { @@ -1947,7 +1947,7 @@ pub(crate) mod tests { direction: WatermarkDirection::Ascending, vnode_watermarks: BTreeMap::default(), }; - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::DEFAULT_COUNT { if i % 2 == 0 { watermark .vnode_watermarks diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 67c3aa8b059bd..551bca19716c3 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -50,7 +50,7 @@ async fn test_read_version_basic() { let mut epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)); let mut read_version = HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, @@ -276,7 +276,7 @@ async fn test_read_filter_basic() { let epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index a1f74949b7313..c642621ebe91e 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2025,13 +2025,13 @@ async fn test_table_watermark() { let vnode1 = VirtualNode::from_index(1); let vnode_bitmap1 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); builder.set(1, true); builder.finish() }); let vnode2 = VirtualNode::from_index(2); let vnode_bitmap2 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); builder.set(2, true); builder.finish() }); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 1df58074e22d0..7f3acda536920 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1377,7 +1377,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, - Arc::new(Bitmap::ones(VirtualNode::COUNT)), + Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)), )) .await; diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 2a5b62f046d32..1326b3d3e5f1c 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -184,7 +184,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)), + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::DEFAULT_COUNT)), } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index ee87177923e9b..c91f06f65110a 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -1149,7 +1149,7 @@ mod tests { table_id: TEST_TABLE_ID, new_read_version_sender: tx, is_replicated: false, - vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::COUNT).finish()), + vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::DEFAULT_COUNT).finish()), }); rx.await.unwrap() }; diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 88351c34b6210..5ebced5fcca4b 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -370,14 +370,14 @@ impl TableUnsyncData { prev_watermarks.extend(table_watermarks); } Entry::Vacant(entry) => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); entry.insert((table_watermarks, vnode_bitmap)); } } } None => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); self.table_watermarks = Some(( direction, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 1f8b17fb6c662..06d19d4b9b528 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -533,9 +533,9 @@ impl SharedBufferBatch { } pub fn collect_vnodes(&self) -> Vec { - let mut vnodes = Vec::with_capacity(VirtualNode::COUNT); + let mut vnodes = Vec::with_capacity(VirtualNode::DEFAULT_COUNT); let mut next_vnode_id = 0; - while next_vnode_id < VirtualNode::COUNT { + while next_vnode_id < VirtualNode::DEFAULT_COUNT { let seek_key = TableKey( VirtualNode::from_index(next_vnode_id) .to_be_bytes() diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 97b448faec8d7..2e185c6cf3e81 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -100,7 +100,7 @@ where last_table_id: 0, table_partition_vnode, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::DEFAULT_MAX.to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count, } @@ -116,7 +116,7 @@ where last_table_id: 0, table_partition_vnode: BTreeMap::default(), split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::DEFAULT_MAX.to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count: None, } @@ -229,14 +229,14 @@ where switch_builder = true; if self.split_weight_by_vnode > 1 { self.largest_vnode_in_current_partition = - VirtualNode::COUNT / (self.split_weight_by_vnode as usize) - 1; + VirtualNode::DEFAULT_COUNT / (self.split_weight_by_vnode as usize) - 1; } else { // default - self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index(); + self.largest_vnode_in_current_partition = VirtualNode::DEFAULT_MAX.to_index(); } } } - if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() { + if self.largest_vnode_in_current_partition != VirtualNode::DEFAULT_MAX.to_index() { let key_vnode = user_key.get_vnode_id(); if key_vnode > self.largest_vnode_in_current_partition { // vnode partition change @@ -244,7 +244,7 @@ where // SAFETY: `self.split_weight_by_vnode > 1` here. let (basic, remainder) = - VirtualNode::COUNT.div_rem(&(self.split_weight_by_vnode as usize)); + VirtualNode::DEFAULT_COUNT.div_rem(&(self.split_weight_by_vnode as usize)); let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder); self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area { (key_vnode / basic + 1) * basic diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index f382bf5fc2d5d..49ae6892c88ce 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -733,7 +733,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), + vnodes: Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 63ee6762cfb30..bf33362284436 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1000,15 +1000,15 @@ mod tests { test_env.register_table(table.clone()).await; fn build_bitmap(indexes: impl Iterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for i in indexes { builder.set(i, true); } Arc::new(builder.finish()) } - let vnodes1 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 0)); - let vnodes2 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 1)); + let vnodes1 = build_bitmap((0..VirtualNode::DEFAULT_COUNT).filter(|i| i % 2 == 0)); + let vnodes2 = build_bitmap((0..VirtualNode::DEFAULT_COUNT).filter(|i| i % 2 == 1)); let factory1 = KvLogStoreFactory::new( test_env.storage.clone(), @@ -1150,7 +1150,7 @@ mod tests { .clear_shared_buffer(test_env.manager.get_current_version().await.id) .await; - let vnodes = build_bitmap(0..VirtualNode::COUNT); + let vnodes = build_bitmap(0..VirtualNode::DEFAULT_COUNT); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 92a3caf4cd2e3..41162022b06c2 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -336,7 +336,7 @@ impl LogStoreRowSerde { ) -> Bytes { let (epoch, seq_id) = offset; Bytes::from(next_key(&serialize_pk( - (self.pk_info.compute_pk)(VirtualNode::MAX, Self::encode_epoch(epoch), seq_id), + (self.pk_info.compute_pk)(VirtualNode::DEFAULT_MAX, Self::encode_epoch(epoch), seq_id), &self.pk_serde, ))) } @@ -981,7 +981,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), pk_info, ); @@ -1124,7 +1124,7 @@ mod tests { let table = gen_test_log_store_table(pk_info); let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), pk_info, ); let (ops, rows) = gen_test_data(0); @@ -1283,7 +1283,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), pk_info, ); @@ -1428,7 +1428,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), pk_info, ); @@ -1538,7 +1538,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), pk_info, ); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs index 5fc10cd0cc58a..d96602aedd888 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs @@ -177,7 +177,7 @@ pub(crate) fn gen_test_log_store_table(pk_info: &'static KvLogStorePkInfo) -> Pb pub(crate) fn calculate_vnode_bitmap<'a>( test_data: impl Iterator)>, ) -> Bitmap { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); for vnode in test_data.map(|(_, row)| VirtualNode::compute_row(row, &[TEST_SCHEMA_DIST_KEY_INDEX])) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 120a49e8e7ee3..7abae4d36d400 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -110,7 +110,7 @@ impl LogWriter for KvLogStoreWriter { { // When enter this branch, the chunk cannot be added directly, and should be add to // state store and flush - let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); let mut flush_info = FlushInfo::new(); for (i, (op, row)) in chunk.rows().enumerate() { let seq_id = start_seq_id + (i as SeqIdType); diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 9f452dc1863b0..5f49ed3dc6f28 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1116,9 +1116,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::DEFAULT_COUNT / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::DEFAULT_COUNT, num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1372,9 +1372,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::DEFAULT_COUNT / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::DEFAULT_COUNT, num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1408,7 +1408,7 @@ mod tests { hasher.update(&bytes); } let output_idx = - hash_mapping[hasher.finish() as usize % VirtualNode::COUNT] as usize - 1; + hash_mapping[hasher.finish() as usize % VirtualNode::DEFAULT_COUNT] as usize - 1; for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) { builder.append(Some(*val)); } diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 1fcb85c26f88e..b423a78c041d9 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -140,7 +140,7 @@ mod tests { ]); let pk_indices = vec![0]; let row_id_index = 0; - let row_id_generator = Bitmap::ones(VirtualNode::COUNT); + let row_id_generator = Bitmap::ones(VirtualNode::DEFAULT_COUNT); let (mut tx, upstream) = MockSource::channel(); let upstream = upstream.into_executor(schema.clone(), pk_indices.clone()); diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index bef6b1ca1f1c2..804f751b8430d 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::DEFAULT_COUNT; let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::DEFAULT_COUNT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::DEFAULT_COUNT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; From 69b6d7e4a9a275094bf0b0b64557223622412571 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 21 Aug 2024 12:40:21 +0800 Subject: [PATCH 2/6] use function everywhere Signed-off-by: Bugen Zhao --- src/common/src/config.rs | 4 +-- .../src/hash/consistent_hash/mapping.rs | 20 ++++++------- src/common/src/hash/consistent_hash/vnode.rs | 26 ++++++++-------- src/common/src/hash/table_distribution.rs | 4 +-- src/common/src/util/row_id.rs | 15 ++++++---- .../src/vnode_mapping/vnode_placement.rs | 19 +++++------- src/expr/impl/src/scalar/vnode.rs | 4 +-- src/frontend/src/handler/alter_parallelism.rs | 10 +++---- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- src/jni_core/src/lib.rs | 2 +- .../sink_coordination/coordinator_worker.rs | 4 +-- .../src/manager/sink_coordination/manager.rs | 24 +++++++-------- src/meta/src/rpc/ddl_controller.rs | 2 +- src/meta/src/stream/scale.rs | 16 +++++----- src/meta/src/stream/test_scale.rs | 14 ++++----- src/storage/benches/bench_table_watermarks.rs | 10 +++---- src/storage/hummock_sdk/src/key.rs | 8 ++--- .../hummock_sdk/src/table_watermark.rs | 30 +++++++++---------- .../hummock_test/src/compactor_tests.rs | 12 ++++---- .../src/hummock_read_version_tests.rs | 4 +-- .../hummock_test/src/hummock_storage_tests.rs | 4 +-- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_trace/src/opts.rs | 2 +- .../event_handler/hummock_event_handler.rs | 2 +- .../src/hummock/event_handler/uploader/mod.rs | 4 +-- .../shared_buffer/shared_buffer_batch.rs | 4 +-- .../src/hummock/sstable/multi_builder.rs | 12 ++++---- src/storage/src/store.rs | 2 +- .../common/log_store_impl/kv_log_store/mod.rs | 8 ++--- .../log_store_impl/kv_log_store/serde.rs | 12 ++++---- .../log_store_impl/kv_log_store/test_utils.rs | 2 +- .../log_store_impl/kv_log_store/writer.rs | 2 +- src/stream/src/executor/dispatch.rs | 10 +++---- src/stream/src/executor/row_id_gen.rs | 2 +- .../scale/streaming_parallelism.rs | 6 ++-- 35 files changed, 151 insertions(+), 153 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 9b1a846ca3c86..baebdbf21170c 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -427,10 +427,10 @@ impl<'de> Deserialize<'de> for DefaultParallelism { ))) } } - Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::DEFAULT_COUNT { + Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::count() { Err(serde::de::Error::custom(format!( "default parallelism should be not great than {}", - VirtualNode::DEFAULT_COUNT + VirtualNode::count() )))? } else { NonZeroUsize::new(i).ok_or_else(|| { diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index e59a9260475ca..97b034cc12a3d 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -108,13 +108,13 @@ impl VnodeMapping { pub fn new_uniform(items: impl ExactSizeIterator) -> Self { // If the number of items is greater than the total vnode count, no vnode will be mapped to // some items and the mapping will be invalid. - assert!(items.len() <= VirtualNode::DEFAULT_COUNT); + assert!(items.len() <= VirtualNode::count()); let mut original_indices = Vec::with_capacity(items.len()); let mut data = Vec::with_capacity(items.len()); - let hash_shard_size = VirtualNode::DEFAULT_COUNT / items.len(); - let mut one_more_count = VirtualNode::DEFAULT_COUNT % items.len(); + let hash_shard_size = VirtualNode::count() / items.len(); + let mut one_more_count = VirtualNode::count() % items.len(); let mut init_bound = 0; for item in items { @@ -209,7 +209,7 @@ impl VnodeMapping { for (vnode, item) in self.iter_with_vnode() { vnode_bitmaps .entry(item) - .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT)) + .or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count())) .set(vnode.to_index(), true); } @@ -222,10 +222,10 @@ impl VnodeMapping { /// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap /// represents the vnodes mapped to the item. pub fn from_bitmaps(bitmaps: &HashMap) -> Self { - let mut items = vec![None; VirtualNode::DEFAULT_COUNT]; + let mut items = vec![None; VirtualNode::count()]; for (&item, bitmap) in bitmaps { - assert_eq!(bitmap.len(), VirtualNode::DEFAULT_COUNT); + assert_eq!(bitmap.len(), VirtualNode::count()); for idx in bitmap.iter_ones() { if let Some(prev) = items[idx].replace(item) { panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`"); @@ -243,7 +243,7 @@ impl VnodeMapping { /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`]. pub fn from_expanded(items: &[T::Item]) -> Self { - assert_eq!(items.len(), VirtualNode::DEFAULT_COUNT); + assert_eq!(items.len(), VirtualNode::count()); let (original_indices, data) = compress_data(items); Self { original_indices, @@ -403,7 +403,7 @@ mod tests { type TestMapping = VnodeMapping; type Test2Mapping = VnodeMapping; - const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::DEFAULT_COUNT]; + const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::count()]; fn uniforms() -> impl Iterator { COUNTS @@ -414,7 +414,7 @@ mod tests { fn randoms() -> impl Iterator { COUNTS.iter().map(|&count| { let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32)) - .take(VirtualNode::DEFAULT_COUNT) + .take(VirtualNode::count()) .collect_vec(); TestMapping::from_expanded(&raw) }) @@ -427,7 +427,7 @@ mod tests { #[test] fn test_uniform() { for vnode_mapping in uniforms() { - assert_eq!(vnode_mapping.len(), VirtualNode::DEFAULT_COUNT); + assert_eq!(vnode_mapping.len(), VirtualNode::count()); let item_count = vnode_mapping.iter_unique().count(); let mut check: HashMap> = HashMap::new(); diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index cd64749b5ae2b..ec733ccbb44a8 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZero; use std::sync::LazyLock; use itertools::Itertools; @@ -45,7 +46,6 @@ impl From for VirtualNode { } impl VirtualNode { - pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; /// We may use `VirtualNode` as a datum in a stream, or store it as a column. /// Hence this reifies it as a RW datatype. pub const RW_TYPE: DataType = DataType::Int16; @@ -55,27 +55,25 @@ impl VirtualNode { pub const ZERO: VirtualNode = unsafe { VirtualNode::from_index_unchecked(0) }; } -impl VirtualNode { - /// The total count of virtual nodes. - pub const DEFAULT_COUNT: usize = 1 << 8; - /// The maximum value of the virtual node. - pub const DEFAULT_MAX: VirtualNode = - unsafe { VirtualNode::from_index_unchecked(Self::DEFAULT_COUNT - 1) }; -} - impl VirtualNode { pub fn count() -> usize { + const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; + const DEFAULT_COUNT: usize = 1 << 8; + static COUNT: LazyLock = LazyLock::new(|| { if let Ok(count) = std::env::var("RW_VNODE_COUNT") { - let count: usize = count.parse().expect("RW_VNODE_COUNT must be a number"); + let count = count + .parse::>() + .expect("`RW_VNODE_COUNT` must be a positive integer") + .get(); assert!( - count <= VirtualNode::MAX_COUNT, - "vnode count must be less than {}", - VirtualNode::MAX_COUNT + count <= MAX_COUNT, + "`RW_VNODE_COUNT` should not exceed maximum value {}", + MAX_COUNT ); count } else { - VirtualNode::DEFAULT_COUNT + DEFAULT_COUNT } }); diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 766a381add3cd..e7403dfcf72d6 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -108,7 +108,7 @@ impl TableDistribution { pub fn singleton_vnode_bitmap_ref() -> &'static Arc { /// A bitmap that only the default vnode is set. static SINGLETON_VNODES: LazyLock> = LazyLock::new(|| { - let mut vnodes = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut vnodes = BitmapBuilder::zeroed(VirtualNode::count()); vnodes.set(SINGLETON_VNODE.to_index(), true); vnodes.finish().into() }); @@ -123,7 +123,7 @@ impl TableDistribution { pub fn all_vnodes_ref() -> &'static Arc { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::ones(VirtualNode::DEFAULT_COUNT).into()); + LazyLock::new(|| Bitmap::ones(VirtualNode::count()).into()); &ALL_VNODES } diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index 1950b4f6358af..c3eaa722b1914 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -15,17 +15,14 @@ use std::cmp::Ordering; use std::time::SystemTime; -use static_assertions::const_assert; - use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH; use crate::hash::VirtualNode; +// TODO(var-vnode): should fit vnode count up to 16 bits const TIMESTAMP_SHIFT_BITS: u8 = 22; const VNODE_ID_SHIFT_BITS: u8 = 12; -const SEQUENCE_UPPER_BOUND: u16 = 1 << 12; -const VNODE_ID_UPPER_BOUND: u32 = 1 << 10; - -const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::DEFAULT_COUNT as u32); +const SEQUENCE_UPPER_BOUND: u16 = 1 << VNODE_ID_SHIFT_BITS; +const VNODE_ID_UPPER_BOUND: u32 = 1 << (TIMESTAMP_SHIFT_BITS - VNODE_ID_SHIFT_BITS); /// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format: /// @@ -62,6 +59,12 @@ pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode { impl RowIdGenerator { /// Create a new `RowIdGenerator` with given virtual nodes. pub fn new(vnodes: impl IntoIterator) -> Self { + assert!( + VirtualNode::count() <= VNODE_ID_UPPER_BOUND as usize, + "vnode count should not exceed {} due to limitation of row id format", + VNODE_ID_UPPER_BOUND + ); + let base = *UNIX_RISINGWAVE_DATE_EPOCH; Self { base, diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 23fbcdda43008..322e88a1edf17 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -44,10 +44,7 @@ pub fn place_vnode( // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( worker_slots.iter().map(|slots| slots.len()).sum(), - std::cmp::min( - max_parallelism.unwrap_or(usize::MAX), - VirtualNode::DEFAULT_COUNT, - ), + std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::count()), ); // Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload @@ -82,14 +79,14 @@ pub fn place_vnode( is_temp: bool, } - let (expected, mut remain) = VirtualNode::DEFAULT_COUNT.div_rem(&selected_slots.len()); + let (expected, mut remain) = VirtualNode::count().div_rem(&selected_slots.len()); let mut balances: HashMap = HashMap::default(); for slot in &selected_slots { let mut balance = Balance { slot: *slot, balance: -(expected as i32), - builder: BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::count()), is_temp: false, }; @@ -105,7 +102,7 @@ pub fn place_vnode( let mut temp_slot = Balance { slot: WorkerSlotId::new(0u32, usize::MAX), /* This id doesn't matter for `temp_slot`. It's distinguishable via `is_temp`. */ balance: 0, - builder: BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::count()), is_temp: true, }; match hint_worker_slot_mapping { @@ -161,7 +158,7 @@ pub fn place_vnode( let mut dst = balances.pop_back().unwrap(); let n = std::cmp::min(src.balance.abs(), dst.balance.abs()); let mut moved = 0; - for idx in 0..VirtualNode::DEFAULT_COUNT { + for idx in 0..VirtualNode::count() { if moved >= n { break; } @@ -192,7 +189,7 @@ pub fn place_vnode( for (worker_slot, bitmap) in results { worker_result .entry(worker_slot) - .or_insert(BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT).finish()) + .or_insert(BitmapBuilder::zeroed(VirtualNode::count()).finish()) .bitor_assign(&bitmap); } @@ -210,7 +207,7 @@ mod tests { use crate::vnode_mapping::vnode_placement::place_vnode; #[test] fn test_place_vnode() { - assert_eq!(VirtualNode::DEFAULT_COUNT, 256); + assert_eq!(VirtualNode::count(), 256); let serving_property = Property { is_unschedulable: false, @@ -223,7 +220,7 @@ mod tests { assert_eq!(wm1.len(), 256); assert_eq!(wm2.len(), 256); let mut count: usize = 0; - for idx in 0..VirtualNode::DEFAULT_COUNT { + for idx in 0..VirtualNode::count() { let vnode = VirtualNode::from_index(idx); if wm1.get(vnode) == wm2.get(vnode) { count += 1; diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 9b689002b6a1c..0658010ea5c0a 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -81,14 +81,14 @@ mod tests { let output = expr.eval(&input).await.unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::DEFAULT_COUNT as i16).contains(&vnode)); + assert!((0..VirtualNode::count() as i16).contains(&vnode)); } // test eval_row for row in input.rows() { let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::DEFAULT_COUNT as i16).contains(&vnode)); + assert!((0..VirtualNode::count() as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index f981b7ba6fabd..f069856acc33e 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -108,16 +108,16 @@ pub async fn handle_alter_parallelism( match &target_parallelism.parallelism { Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { - if available_parallelism > VirtualNode::DEFAULT_COUNT as u32 { - builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::DEFAULT_COUNT)); + if available_parallelism > VirtualNode::count() as u32 { + builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::count())); } } Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { - if *parallelism > VirtualNode::DEFAULT_COUNT as u32 { - builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::DEFAULT_COUNT)); + if *parallelism > VirtualNode::count() as u32 { + builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::count())); target_parallelism = PbTableParallelism { parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: VirtualNode::DEFAULT_COUNT as u32, + parallelism: VirtualNode::count() as u32, })), }; } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 51194ea7f4142..cf3f02c2bdd70 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -910,7 +910,7 @@ impl BatchPlanFragmenter { .take(1) .update(|(_, info)| { info.vnode_bitmap = - Bitmap::ones(VirtualNode::DEFAULT_COUNT).to_protobuf(); + Bitmap::ones(VirtualNode::count()).to_protobuf(); }) .collect(); } diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index af0c067354e68..9a3e6d07b734c 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -320,7 +320,7 @@ impl<'a> Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - VirtualNode::DEFAULT_COUNT as jint + VirtualNode::count() as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 2338d170437ad..e4becf1759351 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -160,8 +160,8 @@ impl CoordinatorWorker { } async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> { - let mut remaining_count = VirtualNode::DEFAULT_COUNT; - let mut registered_vnode = HashSet::with_capacity(VirtualNode::DEFAULT_COUNT); + let mut remaining_count = VirtualNode::count(); + let mut registered_vnode = HashSet::with_capacity(VirtualNode::count()); for vnode in first_vnode_bitmap.iter_vnodes() { remaining_count -= 1; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index c4f2fa529f006..62912274a57d8 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -414,11 +414,11 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let mut all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::count()).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::DEFAULT_COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::count() / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } @@ -584,9 +584,9 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); + let all_vnode = (0..VirtualNode::count()).collect_vec(); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } @@ -705,7 +705,7 @@ mod tests { let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream( param.to_proto(), - Bitmap::zeros(VirtualNode::DEFAULT_COUNT), + Bitmap::zeros(VirtualNode::count()), |rx| async { Ok(tonic::Response::new( manager @@ -742,11 +742,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::count()).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::DEFAULT_COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::count() / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } @@ -821,11 +821,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::DEFAULT_COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::count()).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::DEFAULT_COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::count() / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(*i, true); } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3c6f88319ca04..645ef8f5c31fd 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1610,7 +1610,7 @@ impl DdlController { let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::DEFAULT_COUNT).unwrap(); + const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::count()).unwrap(); let parallelism_limited = parallelism > MAX_PARALLELISM; if parallelism_limited { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 1d3a3d50eb0c0..00febe1bc31af 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -258,7 +258,7 @@ pub fn rebalance_actor_vnode( balance: i32, builder: BitmapBuilder, } - let (expected, mut remain) = VirtualNode::DEFAULT_COUNT.div_rem(&target_actor_count); + let (expected, mut remain) = VirtualNode::count().div_rem(&target_actor_count); tracing::debug!( "expected {}, remain {}, prev actors {}, target actors {}", @@ -289,7 +289,7 @@ pub fn rebalance_actor_vnode( builder }; - let (prev_expected, _) = VirtualNode::DEFAULT_COUNT.div_rem(&actors.len()); + let (prev_expected, _) = VirtualNode::count().div_rem(&actors.len()); let prev_remain = removed .iter() @@ -322,7 +322,7 @@ pub fn rebalance_actor_vnode( .map(|actor_id| Balance { actor_id: *actor_id, balance: -(expected as i32), - builder: BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT), + builder: BitmapBuilder::zeroed(VirtualNode::count()), }) .collect_vec(); @@ -384,7 +384,7 @@ pub fn rebalance_actor_vnode( let n = min(abs(src.balance), abs(dst.balance)); let mut moved = 0; - for idx in (0..VirtualNode::DEFAULT_COUNT).rev() { + for idx in (0..VirtualNode::count()).rev() { if moved >= n { break; } @@ -2238,12 +2238,12 @@ impl ScaleController { } FragmentDistributionType::Hash => match parallelism { TableParallelism::Adaptive => { - if all_available_slots > VirtualNode::DEFAULT_COUNT { + if all_available_slots > VirtualNode::count() { tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); // force limit to VirtualNode::COUNT let target_worker_slots = schedule_units_for_slots( &schedulable_worker_slots, - VirtualNode::DEFAULT_COUNT, + VirtualNode::count(), table_id, )?; @@ -2265,10 +2265,10 @@ impl ScaleController { } } TableParallelism::Fixed(mut n) => { - if n > VirtualNode::DEFAULT_COUNT { + if n > VirtualNode::count() { // This should be unreachable, but we still intercept it to prevent accidental modifications. tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); - n = VirtualNode::DEFAULT_COUNT + n = VirtualNode::count() } let target_worker_slots = diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index c030237fe29d9..c08f892276a5d 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -26,7 +26,7 @@ mod tests { use crate::stream::CustomActorInfo; fn simulated_parallelism(min: Option, max: Option) -> Vec { - let mut raw = vec![1, 3, 12, 42, VirtualNode::DEFAULT_COUNT]; + let mut raw = vec![1, 3, 12, 42, VirtualNode::count()]; if let Some(min) = min { raw.retain(|n| *n > min); raw.push(min); @@ -55,7 +55,7 @@ mod tests { fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) { let prev_bitmap = Bitmap::from(actor.vnode_bitmap.as_ref().unwrap()); - for idx in 0..VirtualNode::DEFAULT_COUNT { + for idx in 0..VirtualNode::count() { if prev_bitmap.is_set(idx) { assert!(bitmap.is_set(idx)); } @@ -63,7 +63,7 @@ mod tests { } fn check_bitmaps(bitmaps: &HashMap) { - let mut target = (0..VirtualNode::DEFAULT_COUNT).map(|_| false).collect_vec(); + let mut target = (0..VirtualNode::count()).map(|_| false).collect_vec(); for bitmap in bitmaps.values() { for (idx, pos) in target.iter_mut().enumerate() { @@ -91,7 +91,7 @@ mod tests { let actor_ids = (0..parallelism as ActorId).collect_vec(); let actor_mapping = ActorMapping::new_uniform(actor_ids.into_iter()); - assert_eq!(actor_mapping.len(), VirtualNode::DEFAULT_COUNT); + assert_eq!(actor_mapping.len(), VirtualNode::count()); let mut check: HashMap> = HashMap::new(); for (vnode, actor_id) in actor_mapping.iter_with_vnode() { @@ -178,7 +178,7 @@ mod tests { #[test] fn test_rebalance_scale_out() { - for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::DEFAULT_COUNT - 1)) { + for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::count() - 1)) { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add 1 @@ -190,7 +190,7 @@ mod tests { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); // add to VirtualNode::COUNT - let actors_to_add = (parallelism as ActorId..VirtualNode::DEFAULT_COUNT as ActorId).collect(); + let actors_to_add = (parallelism as ActorId..VirtualNode::count() as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); check_bitmaps(&result); @@ -275,7 +275,7 @@ mod tests { #[test] fn test_rebalance_scale_real() { - let actor_ids = (0..(VirtualNode::DEFAULT_COUNT - 1) as ActorId).collect_vec(); + let actor_ids = (0..(VirtualNode::count() - 1) as ActorId).collect_vec(); let actors = build_fake_actors(actor_ids); let actors_to_remove = btreeset! {0, 1}; let actors_to_add = btreeset! {255}; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 57934ec71a04f..1ea2722869bcb 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel; fn vnode_bitmaps(part_count: usize) -> impl Iterator> { static BITMAP_CACHE: LazyLock>>>> = LazyLock::new(|| Mutex::new(HashMap::new())); - assert_eq!(VirtualNode::DEFAULT_COUNT % part_count, 0); + assert_eq!(VirtualNode::count() % part_count, 0); let mut cache = BITMAP_CACHE.lock(); match cache.entry(part_count) { Entry::Occupied(entry) => entry.get().clone().into_iter(), Entry::Vacant(entry) => entry .insert({ - let part_size = VirtualNode::DEFAULT_COUNT / part_count; + let part_size = VirtualNode::count() / part_count; (0..part_count) .map(move |part_idx| { let start = part_idx * part_size; let end = part_idx * part_size + part_size; - let mut bitmap = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut bitmap = BitmapBuilder::zeroed(VirtualNode::count()); for i in start..end { bitmap.set(i, true); } @@ -253,7 +253,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read latest watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::DEFAULT_COUNT { + for i in 0..VirtualNode::count() { let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i)); } }) @@ -261,7 +261,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read committed watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::DEFAULT_COUNT { + for i in 0..VirtualNode::count() { let _ = table_watermarks.read_watermark( VirtualNode::from_index(i), test_epoch(committed_epoch_idx as u64), diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index bfd23f2916128..8bfa57e19ac58 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -74,7 +74,7 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { vnode.to_index() + 1 } } - Unbounded => VirtualNode::DEFAULT_COUNT, + Unbounded => VirtualNode::count(), }; (left, right) } @@ -321,7 +321,7 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec { } pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound { - if vnode == VirtualNode::DEFAULT_MAX { + if vnode == VirtualNode::max() { Unbounded } else { let end_bound_index = vnode.to_index() + 1; @@ -1299,7 +1299,7 @@ mod tests { Excluded(TableKey(concat(234, b""))) ) ); - let max_vnode = VirtualNode::DEFAULT_COUNT - 1; + let max_vnode = VirtualNode::count() - 1; assert_eq!( prefixed_range_with_vnode( (Bound::::Unbounded, Bound::::Unbounded), @@ -1332,7 +1332,7 @@ mod tests { Excluded(b"1".as_slice()), Unbounded, ]; - for vnode in 0..VirtualNode::DEFAULT_COUNT { + for vnode in 0..VirtualNode::count() { for left in &left_bound { for right in &right_bound { assert_eq!( diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 55a1a4fca9d20..2305d62389a57 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -176,7 +176,7 @@ impl TableWatermarksIndex { prev_watermark ); regress_vnodes - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count())) .set(vnode.to_index(), true); } } @@ -187,7 +187,7 @@ impl TableWatermarksIndex { let vnode_index = vnode.to_index(); if !regress_vnodes.is_set(vnode_index) { bitmap_builder - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count())) .set(vnode_index, true); } } @@ -220,7 +220,7 @@ impl TableWatermarksIndex { self.latest_epoch = epoch; #[cfg(debug_assertions)] { - let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::count()); for vnode_watermark in vnode_watermark_list.as_ref() { for vnode in vnode_watermark.vnode_bitmap.iter_ones() { assert!(!vnode_is_set.is_set(vnode)); @@ -507,7 +507,7 @@ impl TableWatermarks { } debug!("clear stale table watermark below epoch {}", safe_epoch); let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len()); - let mut unset_vnode: HashSet = (0..VirtualNode::DEFAULT_COUNT) + let mut unset_vnode: HashSet = (0..VirtualNode::count()) .map(VirtualNode::from_index) .collect(); while let Some((epoch, _)) = self.watermarks.last() { @@ -535,7 +535,7 @@ impl TableWatermarks { } } if !set_vnode.is_empty() { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for vnode in set_vnode { builder.set(vnode.to_index(), true); } @@ -706,7 +706,7 @@ mod tests { use crate::version::HummockVersion; fn build_bitmap(vnodes: impl IntoIterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for vnode in vnodes { builder.set(vnode, true); } @@ -746,7 +746,7 @@ mod tests { let mut second_table_watermark = TableWatermarks::single_epoch( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::DEFAULT_COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )], direction, @@ -754,7 +754,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::DEFAULT_COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into(), @@ -815,7 +815,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::DEFAULT_COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into(), @@ -853,7 +853,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::DEFAULT_COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into() @@ -879,7 +879,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::DEFAULT_COUNT), + build_bitmap(0..VirtualNode::count()), watermark3.clone(), )] .into() @@ -905,7 +905,7 @@ mod tests { ( epoch4, vec![VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::DEFAULT_COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::count())), watermark3.clone() )] .into() @@ -932,7 +932,7 @@ mod tests { vec![ VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()), VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::DEFAULT_COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::count())), watermark3.clone() ) ] @@ -1164,7 +1164,7 @@ mod tests { EPOCH1, vec![VnodeWatermark { watermark: watermark1.clone(), - vnode_bitmap: build_bitmap(0..VirtualNode::DEFAULT_COUNT), + vnode_bitmap: build_bitmap(0..VirtualNode::count()), }] .into(), )], @@ -1182,7 +1182,7 @@ mod tests { ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); assert_eq!(EPOCH2, index.latest_epoch); - for vnode in 0..VirtualNode::DEFAULT_COUNT { + for vnode in 0..VirtualNode::count() { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index bd17aad8f91c3..20b82c5c9b312 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1836,8 +1836,8 @@ pub(crate) mod tests { Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), None, ); - let key_count = KEY_COUNT / VirtualNode::DEFAULT_COUNT * 2; - for vnode_id in 0..VirtualNode::DEFAULT_COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::count() * 2; + for vnode_id in 0..VirtualNode::count() / 2 { let mut last_k: u64 = 1; let init_epoch = test_epoch(100 * object_id); let mut last_epoch = init_epoch; @@ -1880,9 +1880,9 @@ pub(crate) mod tests { let target_file_size = max_sst_file_size / 4; let mut table_watermarks = BTreeMap::default(); - let key_count = KEY_COUNT / VirtualNode::DEFAULT_COUNT * 2; - let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); - for i in 0..VirtualNode::DEFAULT_COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::count() * 2; + let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::count()); + for i in 0..VirtualNode::count() / 2 { if i % 2 == 0 { vnode_builder.set(i, true); } else { @@ -1947,7 +1947,7 @@ pub(crate) mod tests { direction: WatermarkDirection::Ascending, vnode_watermarks: BTreeMap::default(), }; - for i in 0..VirtualNode::DEFAULT_COUNT { + for i in 0..VirtualNode::count() { if i % 2 == 0 { watermark .vnode_watermarks diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 551bca19716c3..2b27d010ed8ac 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -50,7 +50,7 @@ async fn test_read_version_basic() { let mut epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::count())); let mut read_version = HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, @@ -276,7 +276,7 @@ async fn test_read_filter_basic() { let epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::count())); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index c642621ebe91e..0dc75f936cdbd 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2025,13 +2025,13 @@ async fn test_table_watermark() { let vnode1 = VirtualNode::from_index(1); let vnode_bitmap1 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); builder.set(1, true); builder.finish() }); let vnode2 = VirtualNode::from_index(2); let vnode_bitmap2 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); builder.set(2, true); builder.finish() }); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 7f3acda536920..8c7c8c261d4c6 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1377,7 +1377,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, - Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)), + Arc::new(Bitmap::ones(VirtualNode::count())), )) .await; diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 1326b3d3e5f1c..ad163041a2f44 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -184,7 +184,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::DEFAULT_COUNT)), + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::count())), } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index c91f06f65110a..60633b9480af5 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -1149,7 +1149,7 @@ mod tests { table_id: TEST_TABLE_ID, new_read_version_sender: tx, is_replicated: false, - vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::DEFAULT_COUNT).finish()), + vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::count()).finish()), }); rx.await.unwrap() }; diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 5ebced5fcca4b..f3e1ea5ea03e4 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -370,14 +370,14 @@ impl TableUnsyncData { prev_watermarks.extend(table_watermarks); } Entry::Vacant(entry) => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::count()); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); entry.insert((table_watermarks, vnode_bitmap)); } } } None => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::count()); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); self.table_watermarks = Some(( direction, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 06d19d4b9b528..f7cf5d9c09990 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -533,9 +533,9 @@ impl SharedBufferBatch { } pub fn collect_vnodes(&self) -> Vec { - let mut vnodes = Vec::with_capacity(VirtualNode::DEFAULT_COUNT); + let mut vnodes = Vec::with_capacity(VirtualNode::count()); let mut next_vnode_id = 0; - while next_vnode_id < VirtualNode::DEFAULT_COUNT { + while next_vnode_id < VirtualNode::count() { let seek_key = TableKey( VirtualNode::from_index(next_vnode_id) .to_be_bytes() diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 2e185c6cf3e81..91d8dd226d53d 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -100,7 +100,7 @@ where last_table_id: 0, table_partition_vnode, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::DEFAULT_MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::max().to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count, } @@ -116,7 +116,7 @@ where last_table_id: 0, table_partition_vnode: BTreeMap::default(), split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::DEFAULT_MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::max().to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count: None, } @@ -229,14 +229,14 @@ where switch_builder = true; if self.split_weight_by_vnode > 1 { self.largest_vnode_in_current_partition = - VirtualNode::DEFAULT_COUNT / (self.split_weight_by_vnode as usize) - 1; + VirtualNode::count() / (self.split_weight_by_vnode as usize) - 1; } else { // default - self.largest_vnode_in_current_partition = VirtualNode::DEFAULT_MAX.to_index(); + self.largest_vnode_in_current_partition = VirtualNode::max().to_index(); } } } - if self.largest_vnode_in_current_partition != VirtualNode::DEFAULT_MAX.to_index() { + if self.largest_vnode_in_current_partition != VirtualNode::max().to_index() { let key_vnode = user_key.get_vnode_id(); if key_vnode > self.largest_vnode_in_current_partition { // vnode partition change @@ -244,7 +244,7 @@ where // SAFETY: `self.split_weight_by_vnode > 1` here. let (basic, remainder) = - VirtualNode::DEFAULT_COUNT.div_rem(&(self.split_weight_by_vnode as usize)); + VirtualNode::count().div_rem(&(self.split_weight_by_vnode as usize)); let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder); self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area { (key_vnode / basic + 1) * basic diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 49ae6892c88ce..e6c6d83111c87 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -733,7 +733,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT)), + vnodes: Arc::new(Bitmap::ones(VirtualNode::count())), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index bf33362284436..1e14ea6d71977 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1000,15 +1000,15 @@ mod tests { test_env.register_table(table.clone()).await; fn build_bitmap(indexes: impl Iterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for i in indexes { builder.set(i, true); } Arc::new(builder.finish()) } - let vnodes1 = build_bitmap((0..VirtualNode::DEFAULT_COUNT).filter(|i| i % 2 == 0)); - let vnodes2 = build_bitmap((0..VirtualNode::DEFAULT_COUNT).filter(|i| i % 2 == 1)); + let vnodes1 = build_bitmap((0..VirtualNode::count()).filter(|i| i % 2 == 0)); + let vnodes2 = build_bitmap((0..VirtualNode::count()).filter(|i| i % 2 == 1)); let factory1 = KvLogStoreFactory::new( test_env.storage.clone(), @@ -1150,7 +1150,7 @@ mod tests { .clear_shared_buffer(test_env.manager.get_current_version().await.id) .await; - let vnodes = build_bitmap(0..VirtualNode::DEFAULT_COUNT); + let vnodes = build_bitmap(0..VirtualNode::count()); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 41162022b06c2..f2bedacac2dbc 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -336,7 +336,7 @@ impl LogStoreRowSerde { ) -> Bytes { let (epoch, seq_id) = offset; Bytes::from(next_key(&serialize_pk( - (self.pk_info.compute_pk)(VirtualNode::DEFAULT_MAX, Self::encode_epoch(epoch), seq_id), + (self.pk_info.compute_pk)(VirtualNode::max(), Self::encode_epoch(epoch), seq_id), &self.pk_serde, ))) } @@ -981,7 +981,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); @@ -1124,7 +1124,7 @@ mod tests { let table = gen_test_log_store_table(pk_info); let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); let (ops, rows) = gen_test_data(0); @@ -1283,7 +1283,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); @@ -1428,7 +1428,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); @@ -1538,7 +1538,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::DEFAULT_COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::count()))), pk_info, ); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs index d96602aedd888..fd691e236d394 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs @@ -177,7 +177,7 @@ pub(crate) fn gen_test_log_store_table(pk_info: &'static KvLogStorePkInfo) -> Pb pub(crate) fn calculate_vnode_bitmap<'a>( test_data: impl Iterator)>, ) -> Bitmap { - let mut builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::count()); for vnode in test_data.map(|(_, row)| VirtualNode::compute_row(row, &[TEST_SCHEMA_DIST_KEY_INDEX])) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 7abae4d36d400..e7b57bdc93af2 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -110,7 +110,7 @@ impl LogWriter for KvLogStoreWriter { { // When enter this branch, the chunk cannot be added directly, and should be add to // state store and flush - let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::DEFAULT_COUNT); + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::count()); let mut flush_info = FlushInfo::new(); for (i, (op, row)) in chunk.rows().enumerate() { let seq_id = start_seq_id + (i as SeqIdType); diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 5f49ed3dc6f28..77fb4a6191e8e 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1116,9 +1116,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::DEFAULT_COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::count() / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::DEFAULT_COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::count(), num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1372,9 +1372,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::DEFAULT_COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::count() / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::DEFAULT_COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::count(), num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1408,7 +1408,7 @@ mod tests { hasher.update(&bytes); } let output_idx = - hash_mapping[hasher.finish() as usize % VirtualNode::DEFAULT_COUNT] as usize - 1; + hash_mapping[hasher.finish() as usize % VirtualNode::count()] as usize - 1; for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) { builder.append(Some(*val)); } diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index b423a78c041d9..e69b682e1ba5b 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -140,7 +140,7 @@ mod tests { ]); let pk_indices = vec![0]; let row_id_index = 0; - let row_id_generator = Bitmap::ones(VirtualNode::DEFAULT_COUNT); + let row_id_generator = Bitmap::ones(VirtualNode::count()); let (mut tx, upstream) = MockSource::channel(); let upstream = upstream.into_executor(schema.clone(), pk_indices.clone()); diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index 804f751b8430d..1660c381e69df 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::DEFAULT_COUNT; + let vnode_max = VirtualNode::count(); let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::DEFAULT_COUNT; + let vnode_max = VirtualNode::count(); let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::DEFAULT_COUNT; + let vnode_max = VirtualNode::count(); let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; From 560745c90247b620dd9dbf5d986d004605726741 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 21 Aug 2024 12:50:00 +0800 Subject: [PATCH 3/6] add docs Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/vnode.rs | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index ec733ccbb44a8..c09841f50a2c9 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -56,10 +56,16 @@ impl VirtualNode { } impl VirtualNode { + /// The default count of virtual nodes. + const DEFAULT_COUNT: usize = 1 << 8; + /// The maximum count of virtual nodes, limited by the size of the inner type [`VirtualNodeInner`]. + const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; + + /// The total count of virtual nodes. + /// + /// It can be customized by the environment variable `RW_VNODE_COUNT`, or defaults to [`Self::DEFAULT_COUNT`]. pub fn count() -> usize { - const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; - const DEFAULT_COUNT: usize = 1 << 8; - + // Cache the value to avoid repeated env lookups and parsing. static COUNT: LazyLock = LazyLock::new(|| { if let Ok(count) = std::env::var("RW_VNODE_COUNT") { let count = count @@ -67,19 +73,21 @@ impl VirtualNode { .expect("`RW_VNODE_COUNT` must be a positive integer") .get(); assert!( - count <= MAX_COUNT, + count <= VirtualNode::MAX_COUNT, "`RW_VNODE_COUNT` should not exceed maximum value {}", - MAX_COUNT + VirtualNode::MAX_COUNT ); + // TODO(var-vnode): shall we enforce it to be a power of 2? count } else { - DEFAULT_COUNT + VirtualNode::DEFAULT_COUNT } }); *COUNT } + /// The last virtual node in the range. It's derived from [`Self::count`]. pub fn max() -> VirtualNode { VirtualNode::from_index(Self::count() - 1) } @@ -95,6 +103,7 @@ impl VirtualNode { Self(index as _) } + /// Creates a virtual node from the `usize` index without bounds checking. pub const unsafe fn from_index_unchecked(index: usize) -> Self { Self(index as _) } From f708314ad74e67d947a22cd35542360caaf56c4c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 21 Aug 2024 13:14:49 +0800 Subject: [PATCH 4/6] fix compiling Signed-off-by: Bugen Zhao --- src/meta/src/rpc/ddl_controller.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 645ef8f5c31fd..ff01a115886f3 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1610,14 +1610,14 @@ impl DdlController { let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::count()).unwrap(); + let max_parallelism: NonZeroUsize = NonZeroUsize::new(VirtualNode::count()).unwrap(); - let parallelism_limited = parallelism > MAX_PARALLELISM; + let parallelism_limited = parallelism > max_parallelism; if parallelism_limited { - tracing::warn!("Too many parallelism, use {} instead", MAX_PARALLELISM); + tracing::warn!("Too many parallelism, use {} instead", max_parallelism); } - let parallelism = parallelism.min(MAX_PARALLELISM); + let parallelism = parallelism.min(max_parallelism); let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; From f3dc33686bfb09fcd8311a5272873f24b1e7be38 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 21 Aug 2024 15:10:19 +0800 Subject: [PATCH 5/6] add safety doc and fix tests Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/vnode.rs | 5 +++++ src/stream/src/executor/dispatch.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index c09841f50a2c9..220389e2b5854 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -104,6 +104,11 @@ impl VirtualNode { } /// Creates a virtual node from the `usize` index without bounds checking. + /// + /// # Safety + /// + /// The caller must ensure that the index is within the range of virtual nodes, + /// i.e., less than [`Self::count`]. pub const unsafe fn from_index_unchecked(index: usize) -> Self { Self(index as _) } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 77fb4a6191e8e..aa1311143a5e7 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1101,7 +1101,7 @@ mod tests { async fn test_hash_dispatcher_complex_inner() { // This test only works when VirtualNode::COUNT is 256. - static_assertions::const_assert_eq!(VirtualNode::COUNT, 256); + assert_eq!(VirtualNode::count(), 256); let num_outputs = 2; // actor id ranges from 1 to 2 let key_indices = &[0, 2]; From 3a871321df6be94bbabc07f1aa22dd1d9b5d8fa4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 22 Aug 2024 12:09:22 +0800 Subject: [PATCH 6/6] update docs and tests Signed-off-by: Bugen Zhao --- src/common/src/config.rs | 4 ++-- src/common/src/hash/consistent_hash/mapping.rs | 14 ++++++++------ src/config/docs.md | 4 ++-- src/meta/src/stream/scale.rs | 6 +++--- src/meta/src/stream/test_scale.rs | 2 +- src/stream/src/executor/dispatch.rs | 2 +- 6 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index baebdbf21170c..0c36eb1bf9d6e 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -319,7 +319,7 @@ pub struct MetaConfig { pub do_not_config_object_storage_lifecycle: bool, /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. - /// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `VirtualNode::count() / partition_vnode_count` consecutive virtual-nodes of one state table. #[serde(default = "default::meta::partition_vnode_count")] pub partition_vnode_count: u32, @@ -348,7 +348,7 @@ pub struct MetaConfig { /// Count of partitions of tables in default group and materialized view group. /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. - /// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `VirtualNode::count() / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. /// Set it zero to disable this feature. #[serde(default = "default::meta::hybrid_partition_vnode_count")] pub hybrid_partition_vnode_count: u32, diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 97b034cc12a3d..6e8603528d514 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -144,7 +144,7 @@ impl VnodeMapping { Self::new_uniform(std::iter::once(item)) } - /// The length of the vnode in this mapping, typically [`VirtualNode::COUNT`]. + /// The length of the vnode in this mapping, typically [`VirtualNode::count`]. pub fn len(&self) -> usize { self.original_indices .last() @@ -241,7 +241,7 @@ impl VnodeMapping { Self::from_expanded(&items) } - /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`]. + /// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::count`]. pub fn from_expanded(items: &[T::Item]) -> Self { assert_eq!(items.len(), VirtualNode::count()); let (original_indices, data) = compress_data(items); @@ -251,7 +251,7 @@ impl VnodeMapping { } } - /// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::COUNT`]. + /// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::count`]. pub fn to_expanded(&self) -> ExpandedMapping { self.iter().collect() } @@ -403,16 +403,18 @@ mod tests { type TestMapping = VnodeMapping; type Test2Mapping = VnodeMapping; - const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::count()]; + fn counts() -> &[usize] { + &[1, 3, 12, 42, VirtualNode::count()] + } fn uniforms() -> impl Iterator { - COUNTS + counts() .iter() .map(|&count| TestMapping::new_uniform(0..count as u32)) } fn randoms() -> impl Iterator { - COUNTS.iter().map(|&count| { + counts().iter().map(|&count| { let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32)) .take(VirtualNode::count()) .collect_vec(); diff --git a/src/config/docs.md b/src/config/docs.md index 47905d71e5e0c..4646f0cc2280a 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config` | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | -| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | +| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::count() / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | | meta_leader_lease_secs | | 30 | | min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 | @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config` | parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 | | parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 | | parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 | -| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | +| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::count() / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | | periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 | | periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 | | periodic_split_compact_group_interval_sec | | 10 | diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 00febe1bc31af..af86513dd5fd3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2239,8 +2239,8 @@ impl ScaleController { FragmentDistributionType::Hash => match parallelism { TableParallelism::Adaptive => { if all_available_slots > VirtualNode::count() { - tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); - // force limit to VirtualNode::COUNT + tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::count(), force limit to VirtualNode::count()"); + // force limit to VirtualNode::count() let target_worker_slots = schedule_units_for_slots( &schedulable_worker_slots, VirtualNode::count(), @@ -2267,7 +2267,7 @@ impl ScaleController { TableParallelism::Fixed(mut n) => { if n > VirtualNode::count() { // This should be unreachable, but we still intercept it to prevent accidental modifications. - tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT"); + tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::count(), force limit to VirtualNode::count()"); n = VirtualNode::count() } diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index c08f892276a5d..a822a145ab25e 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -189,7 +189,7 @@ mod tests { let actors = build_fake_actors((0..parallelism as ActorId).collect_vec()); - // add to VirtualNode::COUNT + // add to VirtualNode::count() let actors_to_add = (parallelism as ActorId..VirtualNode::count() as ActorId).collect(); let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add); assert_eq!(result.len(), actors.len() + actors_to_add.len()); diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index aa1311143a5e7..73bdbdd1dc3b1 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1100,7 +1100,7 @@ mod tests { } async fn test_hash_dispatcher_complex_inner() { - // This test only works when VirtualNode::COUNT is 256. + // This test only works when VirtualNode::count() is 256. assert_eq!(VirtualNode::count(), 256); let num_outputs = 2; // actor id ranges from 1 to 2