Skip to content

Commit

Permalink
vnode mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Sep 5, 2024
1 parent 670a94f commit e1ad5ea
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ fn generate_hash_values(
.iter()
.map(|idx| *idx as usize)
.collect::<Vec<_>>(),
consistent_hash_info.vmap.len(),
);

let hash_values = vnodes
Expand Down
42 changes: 22 additions & 20 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,26 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
///
/// For example, if `items` is `[0, 1, 2]`, and the total vnode count is 10, we'll generate
/// mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`.
pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>) -> Self {
pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
// If the number of items is greater than the total vnode count, no vnode will be mapped to
// some items and the mapping will be invalid.
assert!(items.len() <= VirtualNode::COUNT);
assert!(items.len() <= vnode_count);

let mut original_indices = Vec::with_capacity(items.len());
let mut data = Vec::with_capacity(items.len());

let hash_shard_size = VirtualNode::COUNT / items.len();
let mut one_more_count = VirtualNode::COUNT % items.len();
let hash_shard_size = vnode_count / items.len();
let mut one_more_count = vnode_count % items.len();
let mut init_bound = 0;

for item in items {
let vnode_count = if one_more_count > 0 {
let count = if one_more_count > 0 {
one_more_count -= 1;
hash_shard_size + 1
} else {
hash_shard_size
};
init_bound += vnode_count;
init_bound += count;

original_indices.push(init_bound as u32 - 1);
data.push(item);
Expand All @@ -141,10 +141,11 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {

/// Create a vnode mapping where all vnodes are mapped to the same single item.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item))
// TODO(var-vnode): always 1 correct?
Self::new_uniform(std::iter::once(item), 1)
}

/// The length of the vnode in this mapping, typically [`VirtualNode::COUNT`].
/// The length (or count) of the vnode in this mapping.
pub fn len(&self) -> usize {
self.original_indices
.last()
Expand Down Expand Up @@ -204,12 +205,13 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
/// Convert this vnode mapping to a mapping from items to bitmaps, where each bitmap represents
/// the vnodes mapped to the item.
pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
let vnode_count = self.len();
let mut vnode_bitmaps = HashMap::new();

for (vnode, item) in self.iter_with_vnode() {
vnode_bitmaps
.entry(item)
.or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT))
.or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
.set(vnode.to_index(), true);
}

Expand All @@ -222,10 +224,11 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
/// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap
/// represents the vnodes mapped to the item.
pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
let mut items = vec![None; VirtualNode::COUNT];
let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
let mut items = vec![None; vnode_count];

for (&item, bitmap) in bitmaps {
assert_eq!(bitmap.len(), VirtualNode::COUNT);
assert_eq!(bitmap.len(), vnode_count);
for idx in bitmap.iter_ones() {
if let Some(prev) = items[idx].replace(item) {
panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
Expand All @@ -241,17 +244,16 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
Self::from_expanded(&items)
}

/// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`].
/// Create a vnode mapping from the expanded slice of items.
pub fn from_expanded(items: &[T::Item]) -> Self {
assert_eq!(items.len(), VirtualNode::COUNT);
let (original_indices, data) = compress_data(items);
Self {
original_indices,
data,
}
}

/// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::COUNT`].
/// Convert this vnode mapping to a expanded vector of items.
pub fn to_expanded(&self) -> ExpandedMapping<T> {
self.iter().collect()
}
Expand Down Expand Up @@ -353,8 +355,8 @@ impl ActorMapping {

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Self::new_uniform(worker_slot_ids.iter().cloned())
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
}

/// Create a worker mapping from the protobuf representation.
Expand Down Expand Up @@ -403,18 +405,18 @@ mod tests {
type TestMapping = VnodeMapping<Test>;
type Test2Mapping = VnodeMapping<Test2>;

const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT];
const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];

fn uniforms() -> impl Iterator<Item = TestMapping> {
COUNTS
.iter()
.map(|&count| TestMapping::new_uniform(0..count as u32))
.map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
}

fn randoms() -> impl Iterator<Item = TestMapping> {
COUNTS.iter().map(|&count| {
let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32))
.take(VirtualNode::COUNT)
.take(VirtualNode::COUNT_FOR_TEST)
.collect_vec();
TestMapping::from_expanded(&raw)
})
Expand All @@ -427,7 +429,7 @@ mod tests {
#[test]
fn test_uniform() {
for vnode_mapping in uniforms() {
assert_eq!(vnode_mapping.len(), VirtualNode::COUNT);
assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
let item_count = vnode_mapping.iter_unique().count();

let mut check: HashMap<u32, Vec<_>> = HashMap::new();
Expand Down
83 changes: 53 additions & 30 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,44 @@ use crate::util::row_id::extract_vnode_id_from_row_id;
pub struct VirtualNode(VirtualNodeInner);

/// The internal representation of a virtual node id.
///
/// Note: not all bits of the inner representation are used.
type VirtualNodeInner = u16;
static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32);

impl From<Crc32HashCode> for VirtualNode {
fn from(hash_code: Crc32HashCode) -> Self {
/// `vnode_count` must be provided to convert a hash code to a virtual node.
///
/// Use [`Crc32HashCodeToVnodeExt::to_vnode`] instead.
impl !From<Crc32HashCode> for VirtualNode {}

#[easy_ext::ext(Crc32HashCodeToVnodeExt)]
impl Crc32HashCode {
fn to_vnode(self, vnode_count: usize) -> VirtualNode {
// Take the least significant bits of the hash code.
// TODO: should we use the most significant bits?
let inner = (hash_code.value() % Self::COUNT as u64) as VirtualNodeInner;
let inner = (self.value() % vnode_count as u64) as VirtualNodeInner;
VirtualNode(inner)
}
}

impl VirtualNode {
/// The number of bits used to represent a virtual node.
///
/// Note: Not all bits of the inner representation are used. One should rely on this constant
/// to determine the count of virtual nodes.
pub const BITS: usize = 8;
/// The total count of virtual nodes.
pub const COUNT: usize = 1 << Self::BITS;
// TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST`
pub const COUNT: usize = 1 << 8;
/// The maximum value of the virtual node.
// TODO(var-vnode): remove this and only keep `MAX_FOR_TEST`
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
}

impl VirtualNode {
/// The total count of virtual nodes, for testing purposes.
pub const COUNT_FOR_TEST: usize = Self::COUNT;
/// The maximum value of the virtual node, for testing purposes.
pub const MAX_FOR_TEST: VirtualNode = Self::MAX;
}

impl VirtualNode {
/// The maximum count of virtual nodes that fits in [`VirtualNodeInner`].
pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS;
/// The size of a virtual node in bytes, in memory or serialized representation.
pub const SIZE: usize = std::mem::size_of::<Self>();
}
Expand All @@ -58,8 +76,6 @@ impl VirtualNode {
pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;

impl VirtualNode {
/// The maximum value of the virtual node.
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// We may use `VirtualNode` as a datum in a stream, or store it as a column.
/// Hence this reifies it as a RW datatype.
pub const RW_TYPE: DataType = DataType::Int16;
Expand All @@ -68,7 +84,7 @@ impl VirtualNode {

/// Creates a virtual node from the `usize` index.
pub const fn from_index(index: usize) -> Self {
debug_assert!(index < Self::COUNT);
debug_assert!(index < Self::MAX_COUNT);
Self(index as _)
}

Expand All @@ -79,7 +95,6 @@ impl VirtualNode {

/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
pub const fn from_scalar(scalar: i16) -> Self {
debug_assert!((scalar as usize) < Self::COUNT);
Self(scalar as _)
}

Expand All @@ -99,7 +114,6 @@ impl VirtualNode {
/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
debug_assert!((inner as usize) < Self::COUNT);
Self(inner)
}

Expand All @@ -109,22 +123,21 @@ impl VirtualNode {
}

/// Iterates over all virtual nodes.
pub fn all() -> AllVirtualNodeIter {
(0..Self::COUNT).map(Self::from_index)
pub fn all(vnode_count: usize) -> AllVirtualNodeIter {
(0..vnode_count).map(Self::from_index)
}
}

impl VirtualNode {
pub const COUNT_FOR_TEST: usize = Self::COUNT;
pub const MAX_FOR_TEST: VirtualNode = Self::MAX;
}

impl VirtualNode {
// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
// chunk. When only one column is provided and its type is `Serial`, we consider the column to
// be the one that contains RowId, and use a special method to skip the calculation of Hash
// and directly extract the `VirtualNode` from `RowId`.
pub fn compute_chunk(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
pub fn compute_chunk(
data_chunk: &DataChunk,
keys: &[usize],
vnode_count: usize,
) -> Vec<VirtualNode> {
if let Ok(idx) = keys.iter().exactly_one()
&& let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
{
Expand All @@ -140,7 +153,7 @@ impl VirtualNode {
// This process doesn’t guarantee the order of rows, producing indeterminate results in some cases,
// such as when `distinct on` is used without an `order by`.
let (row, _) = data_chunk.row_at(idx);
row.hash(Crc32FastBuilder).into()
row.hash(Crc32FastBuilder).to_vnode(vnode_count)
}
})
.collect();
Expand All @@ -149,19 +162,29 @@ impl VirtualNode {
data_chunk
.get_hash_values(keys, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.into())
.map(|hash| hash.to_vnode(vnode_count))
.collect()
}

/// Equivalent to [`Self::compute_chunk`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST)
}

// `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a
// `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns.
pub fn compute_row(row: impl Row, indices: &[usize]) -> VirtualNode {
pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
let project = row.project(indices);
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
return extract_vnode_id_from_row_id(s.as_row_id());
}

project.hash(Crc32FastBuilder).into()
project.hash(Crc32FastBuilder).to_vnode(vnode_count)
}

/// Equivalent to [`Self::compute_row`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode {
Self::compute_row(row, indices, Self::COUNT_FOR_TEST)
}
}

Expand All @@ -184,7 +207,7 @@ mod tests {
);

let chunk = DataChunk::from_pretty(chunk.as_str());
let vnodes = VirtualNode::compute_chunk(&chunk, &[0]);
let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);

assert_eq!(
vnodes.as_slice(),
Expand All @@ -200,7 +223,7 @@ mod tests {
Some(ScalarImpl::Int64(12345)),
]);

let vnode = VirtualNode::compute_row(&row, &[0]);
let vnode = VirtualNode::compute_row_for_test(&row, &[0]);

assert_eq!(vnode, VirtualNode::from_index(100));
}
Expand All @@ -221,7 +244,7 @@ mod tests {
);

let chunk = DataChunk::from_pretty(chunk.as_str());
let vnodes = VirtualNode::compute_chunk(&chunk, &[0]);
let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);

assert_eq!(
vnodes.as_slice(),
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl TableDistribution {
/// Get vnode value with `indices` on the given `row`.
pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode {
assert!(!indices.is_empty());
let vnode = VirtualNode::compute_row(&row, indices);
let vnode = VirtualNode::compute_row(&row, indices, vnodes.len());
check_vnode_is_set(vnode, vnodes);

tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode);
Expand Down Expand Up @@ -219,7 +219,7 @@ impl TableDistribution {
.map(|idx| pk_indices[*idx])
.collect_vec();

VirtualNode::compute_chunk(chunk, &dist_key_indices)
VirtualNode::compute_chunk(chunk, &dist_key_indices, vnodes.len())
.into_iter()
.zip_eq_fast(chunk.visibility().iter())
.map(|(vnode, vis)| {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/util/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct RowIdGenerator {

pub type RowId = i64;

// TODO(var-vnode): how should we handle this for different virtual node counts?
#[inline]
pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode {
let vnode_id = ((id >> VNODE_ID_SHIFT_BITS) & (VNODE_ID_UPPER_BOUND as i64 - 1)) as u32;
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ mod tests {
Some(ScalarImpl::from(514)),
]);

let vnode = VirtualNode::compute_row(&row, &[0, 1]);
let vnode = VirtualNode::compute_row_for_test(&row, &[0, 1]);

assert_eq!(scan_range.try_compute_vnode(&dist), Some(vnode));
}
Expand Down Expand Up @@ -203,7 +203,7 @@ mod tests {
Some(ScalarImpl::from(114514)),
]);

let vnode = VirtualNode::compute_row(&row, &[2, 1]);
let vnode = VirtualNode::compute_row_for_test(&row, &[2, 1]);

assert_eq!(scan_range.try_compute_vnode(&dist), Some(vnode));
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn place_vnode(
}
None => {
// No hint is provided, assign all vnodes to `temp_pu`.
for vnode in VirtualNode::all() {
for vnode in VirtualNode::all(VirtualNode::COUNT) {
temp_slot.balance += 1;
temp_slot.builder.set(vnode.to_index(), true);
}
Expand Down
Loading

0 comments on commit e1ad5ea

Please sign in to comment.