Skip to content

Commit

Permalink
refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 16, 2024
1 parent 40d062e commit 17dbbeb
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 29 deletions.
6 changes: 3 additions & 3 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::hash::Crc32HashCode;
use crate::row::{Row, RowExt};
use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;
use crate::util::row_id::compute_vnode_from_row_id;

/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
Expand Down Expand Up @@ -158,7 +158,7 @@ impl VirtualNode {
.enumerate()
.map(|(idx, serial)| {
if let Some(serial) = serial {
extract_vnode_id_from_row_id(serial.as_row_id(), vnode_count)
compute_vnode_from_row_id(serial.as_row_id(), vnode_count)
} else {
// NOTE: here it will hash the entire row when the `_row_id` is missing,
// which could result in rows from the same chunk being allocated to different chunks.
Expand Down Expand Up @@ -188,7 +188,7 @@ impl VirtualNode {
pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
let project = row.project(indices);
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
return extract_vnode_id_from_row_id(s.as_row_id(), vnode_count);
return compute_vnode_from_row_id(s.as_row_id(), vnode_count);
}

project.hash(Crc32FastBuilder).to_vnode(vnode_count)
Expand Down
62 changes: 36 additions & 26 deletions src/common/src/util/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ use std::time::SystemTime;
use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
use crate::hash::VirtualNode;

/// The number of bits occupied by the vnode part and the sequence part of a row id.
const TIMESTAMP_SHIFT_BITS: u32 = 22;

/// The number of bits occupied by the vnode part of a row id in the previous version.
const COMPAT_VNODE_BITS: u32 = 10;

/// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format:
///
/// | timestamp | vnode & sequence |
/// |-----------|------------------|
/// | 41 bits | 22 bits |
///
/// The vnode part can occupy 10..=15 bits, which is determined by the vnode count. Thus,
/// the sequence part will occupy 7..=12 bits. See [`bit_for_vnode_count`] for more details.
/// the sequence part will occupy 7..=12 bits. See [`bit_for_vnode`] for more details.
#[derive(Debug)]
pub struct RowIdGenerator {
/// Specific base timestamp using for generating row ids.
Expand All @@ -51,44 +55,56 @@ pub struct RowIdGenerator {

pub type RowId = i64;

fn bit_for_vnode_count(vnode_count: usize) -> u32 {
/// The number of bits occupied by the vnode part of a row id.
///
/// In previous versions, this was fixed to 10 bits even if the vnode count was fixed to 256.
/// For backward compatibility, we still use 10 bits for vnode count less than or equal to 1024.
/// For larger vnode counts, we use the smallest power of 2 that fits the vnode count.
fn bit_for_vnode(vnode_count: usize) -> u32 {
debug_assert!(
vnode_count <= VirtualNode::MAX_COUNT as usize,
vnode_count <= VirtualNode::MAX_COUNT,
"invalid vnode count {vnode_count}"
);

if vnode_count <= 1024 {
10
if vnode_count <= 1 << COMPAT_VNODE_BITS {
COMPAT_VNODE_BITS
} else {
vnode_count.next_power_of_two().ilog2()
}
}

/// Compute vnode from the given row id.
///
/// # `vnode_count`
///
/// The given `vnode_count` determines the valid range of the returned vnode. It does not have to
/// be the same as the vnode count used when the row id was generated with [`RowIdGenerator`].
///
/// However, only if they are the same, the vnode retrieved here is guaranteed to be the same as
/// when it was generated. Otherwise, the vnode can be different and skewed, but the row ids
/// generated under the same vnode will still yield the same result.
///
/// This is okay because we rely on the reversibility only if the serial type (row id) is generated
/// and persisted in the same fragment, where the vnode count is the same. In other cases, the
/// serial type is more like a normal integer type, and the algorithm to hash or compute vnode from
/// it does not matter.
#[inline]
// TODO(var-vnode): rename, not `extract` but `compute`
pub fn extract_vnode_id_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
let vnode_bit = bit_for_vnode_count(vnode_count);
pub fn compute_vnode_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
let vnode_bit = bit_for_vnode(vnode_count);
let sequence_bit = TIMESTAMP_SHIFT_BITS - vnode_bit;

let vnode_part = ((id >> sequence_bit) & ((1 << vnode_bit) - 1)) as usize;

// TODO: update comments
// Previously, the vnode count was fixed to 256 for all jobs in all clusters. As a result, the
// `vnode_id` must reside in the range of `0..256` and the following modulo operation will be
// no-op. So this will retrieve the exact same vnode as when it was generated.
//
// In newer versions, fragments can have different vnode counts. To make sure the vnode is
// within the range, we need to apply modulo operation here. Therefore, there is no guarantee
// that the vnode retrieved here is the same as when it was generated. However, the row ids
// generated under the same vnode will still yield the same result.
// If the given `vnode_count` is the same as the one used when the row id was generated, this
// is no-op. Otherwise, we clamp the vnode to fit in the given vnode count.
VirtualNode::from_index(vnode_part % vnode_count)
}

impl RowIdGenerator {
/// Create a new `RowIdGenerator` with given virtual nodes and vnode count.
pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>, vnode_count: usize) -> Self {
let base = *UNIX_RISINGWAVE_DATE_EPOCH;
let vnode_bit = bit_for_vnode_count(vnode_count);
let vnode_bit = bit_for_vnode(vnode_count);

Self {
base,
Expand All @@ -100,12 +116,6 @@ impl RowIdGenerator {
}
}

/// Create a new `RowIdGenerator` with given virtual nodes and [`VirtualNode::COUNT_FOR_TEST`]
/// as vnode count.
pub fn new_for_test(vnodes: impl IntoIterator<Item = VirtualNode>) -> Self {
Self::new(vnodes, VirtualNode::COUNT_FOR_TEST)
}

/// The upper bound of the sequence part, exclusive.
fn sequence_upper_bound(&self) -> u16 {
1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
Expand Down Expand Up @@ -275,7 +285,7 @@ mod tests {
.chain((vnode_count - 10)..vnode_count)
.map(VirtualNode::from_index)
};
let vnode_of = |row_id: RowId| extract_vnode_id_from_row_id(row_id, vnode_count);
let vnode_of = |row_id: RowId| compute_vnode_from_row_id(row_id, vnode_count);

let mut generator = RowIdGenerator::new(vnodes(), vnode_count);
let sequence_upper_bound = generator.sequence_upper_bound();
Expand Down Expand Up @@ -319,7 +329,7 @@ mod tests {
test!(64, test_64, test_64_mul); // less than default value
test!(114, test_114, test_114_mul); // not a power of 2, less than default value
test!(256, test_256, test_256_mul); // default value, backward compatibility
test!(1024, test_1024, test_1024_mul); // max value with 10 bits
test!(1 << COMPAT_VNODE_BITS, test_1024, test_1024_mul); // max value with 10 bits
test!(2048, test_2048, test_2048_mul); // more than 10 bits
test!(2333, test_2333, test_2333_mul); // not a power of 2, larger than default value
test!(VirtualNode::MAX_COUNT, test_max, test_max_mul); // max supported
Expand Down

0 comments on commit 17dbbeb

Please sign in to comment.