diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 2e2d9016f2531..388083b81bffd 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -20,7 +20,7 @@ use crate::hash::Crc32HashCode; use crate::row::{Row, RowExt}; use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; -use crate::util::row_id::extract_vnode_id_from_row_id; +use crate::util::row_id::compute_vnode_from_row_id; /// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for /// consistent hashing. @@ -158,7 +158,7 @@ impl VirtualNode { .enumerate() .map(|(idx, serial)| { if let Some(serial) = serial { - extract_vnode_id_from_row_id(serial.as_row_id(), vnode_count) + compute_vnode_from_row_id(serial.as_row_id(), vnode_count) } else { // NOTE: here it will hash the entire row when the `_row_id` is missing, // which could result in rows from the same chunk being allocated to different chunks. @@ -188,7 +188,7 @@ impl VirtualNode { pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode { let project = row.project(indices); if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() { - return extract_vnode_id_from_row_id(s.as_row_id(), vnode_count); + return compute_vnode_from_row_id(s.as_row_id(), vnode_count); } project.hash(Crc32FastBuilder).to_vnode(vnode_count) diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index 5d03d7691f432..0e044dc8f26f4 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -18,8 +18,12 @@ use std::time::SystemTime; use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH; use crate::hash::VirtualNode; +/// The number of bits occupied by the vnode part and the sequence part of a row id. const TIMESTAMP_SHIFT_BITS: u32 = 22; +/// The number of bits occupied by the vnode part of a row id in the previous version. +const COMPAT_VNODE_BITS: u32 = 10; + /// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format: /// /// | timestamp | vnode & sequence | @@ -27,7 +31,7 @@ const TIMESTAMP_SHIFT_BITS: u32 = 22; /// | 41 bits | 22 bits | /// /// The vnode part can occupy 10..=15 bits, which is determined by the vnode count. Thus, -/// the sequence part will occupy 7..=12 bits. See [`bit_for_vnode_count`] for more details. +/// the sequence part will occupy 7..=12 bits. See [`bit_for_vnode`] for more details. #[derive(Debug)] pub struct RowIdGenerator { /// Specific base timestamp using for generating row ids. @@ -51,36 +55,48 @@ pub struct RowIdGenerator { pub type RowId = i64; -fn bit_for_vnode_count(vnode_count: usize) -> u32 { +/// The number of bits occupied by the vnode part of a row id. +/// +/// In previous versions, this was fixed to 10 bits even if the vnode count was fixed to 256. +/// For backward compatibility, we still use 10 bits for vnode count less than or equal to 1024. +/// For larger vnode counts, we use the smallest power of 2 that fits the vnode count. +fn bit_for_vnode(vnode_count: usize) -> u32 { debug_assert!( - vnode_count <= VirtualNode::MAX_COUNT as usize, + vnode_count <= VirtualNode::MAX_COUNT, "invalid vnode count {vnode_count}" ); - if vnode_count <= 1024 { - 10 + if vnode_count <= 1 << COMPAT_VNODE_BITS { + COMPAT_VNODE_BITS } else { vnode_count.next_power_of_two().ilog2() } } +/// Compute vnode from the given row id. +/// +/// # `vnode_count` +/// +/// The given `vnode_count` determines the valid range of the returned vnode. It does not have to +/// be the same as the vnode count used when the row id was generated with [`RowIdGenerator`]. +/// +/// However, only if they are the same, the vnode retrieved here is guaranteed to be the same as +/// when it was generated. Otherwise, the vnode can be different and skewed, but the row ids +/// generated under the same vnode will still yield the same result. +/// +/// This is okay because we rely on the reversibility only if the serial type (row id) is generated +/// and persisted in the same fragment, where the vnode count is the same. In other cases, the +/// serial type is more like a normal integer type, and the algorithm to hash or compute vnode from +/// it does not matter. #[inline] -// TODO(var-vnode): rename, not `extract` but `compute` -pub fn extract_vnode_id_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode { - let vnode_bit = bit_for_vnode_count(vnode_count); +pub fn compute_vnode_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode { + let vnode_bit = bit_for_vnode(vnode_count); let sequence_bit = TIMESTAMP_SHIFT_BITS - vnode_bit; let vnode_part = ((id >> sequence_bit) & ((1 << vnode_bit) - 1)) as usize; - // TODO: update comments - // Previously, the vnode count was fixed to 256 for all jobs in all clusters. As a result, the - // `vnode_id` must reside in the range of `0..256` and the following modulo operation will be - // no-op. So this will retrieve the exact same vnode as when it was generated. - // - // In newer versions, fragments can have different vnode counts. To make sure the vnode is - // within the range, we need to apply modulo operation here. Therefore, there is no guarantee - // that the vnode retrieved here is the same as when it was generated. However, the row ids - // generated under the same vnode will still yield the same result. + // If the given `vnode_count` is the same as the one used when the row id was generated, this + // is no-op. Otherwise, we clamp the vnode to fit in the given vnode count. VirtualNode::from_index(vnode_part % vnode_count) } @@ -88,7 +104,7 @@ impl RowIdGenerator { /// Create a new `RowIdGenerator` with given virtual nodes and vnode count. pub fn new(vnodes: impl IntoIterator, vnode_count: usize) -> Self { let base = *UNIX_RISINGWAVE_DATE_EPOCH; - let vnode_bit = bit_for_vnode_count(vnode_count); + let vnode_bit = bit_for_vnode(vnode_count); Self { base, @@ -100,12 +116,6 @@ impl RowIdGenerator { } } - /// Create a new `RowIdGenerator` with given virtual nodes and [`VirtualNode::COUNT_FOR_TEST`] - /// as vnode count. - pub fn new_for_test(vnodes: impl IntoIterator) -> Self { - Self::new(vnodes, VirtualNode::COUNT_FOR_TEST) - } - /// The upper bound of the sequence part, exclusive. fn sequence_upper_bound(&self) -> u16 { 1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit) @@ -229,6 +239,7 @@ mod tests { use super::*; + #[allow(clippy::unused_async)] // `madsim::time::advance` requires to be in async context async fn test_generator_with_vnode_count(vnode_count: usize) { let mut generator = RowIdGenerator::new([VirtualNode::from_index(0)], vnode_count); let sequence_upper_bound = generator.sequence_upper_bound(); @@ -267,6 +278,7 @@ mod tests { ); } + #[allow(clippy::unused_async)] // `madsim::time::advance` requires to be in async context async fn test_generator_multiple_vnodes_with_vnode_count(vnode_count: usize) { assert!(vnode_count >= 20); @@ -275,7 +287,7 @@ mod tests { .chain((vnode_count - 10)..vnode_count) .map(VirtualNode::from_index) }; - let vnode_of = |row_id: RowId| extract_vnode_id_from_row_id(row_id, vnode_count); + let vnode_of = |row_id: RowId| compute_vnode_from_row_id(row_id, vnode_count); let mut generator = RowIdGenerator::new(vnodes(), vnode_count); let sequence_upper_bound = generator.sequence_upper_bound(); @@ -295,7 +307,7 @@ mod tests { let expected_vnodes = vnodes().cycle(); let actual_vnodes = row_ids.iter().map(|&r| vnode_of(r)); - for (expected, actual) in expected_vnodes.zip(actual_vnodes) { + for (expected, actual) in expected_vnodes.zip_eq(actual_vnodes) { assert_eq!(expected, actual); } @@ -319,7 +331,7 @@ mod tests { test!(64, test_64, test_64_mul); // less than default value test!(114, test_114, test_114_mul); // not a power of 2, less than default value test!(256, test_256, test_256_mul); // default value, backward compatibility - test!(1024, test_1024, test_1024_mul); // max value with 10 bits + test!(1 << COMPAT_VNODE_BITS, test_1024, test_1024_mul); // max value with 10 bits test!(2048, test_2048, test_2048_mul); // more than 10 bits test!(2333, test_2333, test_2333_mul); // not a power of 2, larger than default value test!(VirtualNode::MAX_COUNT, test_max, test_max_mul); // max supported