Skip to content

Commit

Permalink
feat: variable vnode count support in table distribution (#18373)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Sep 5, 2024
1 parent 0dd06ff commit 670a94f
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 122 deletions.
8 changes: 5 additions & 3 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use std::mem::swap;

use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode};
use risingwave_common::memory::MemoryContext;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand All @@ -30,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{TableDistribution, TableIter};
use risingwave_storage::table::TableIter;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::Result;
Expand Down Expand Up @@ -194,7 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.collect();

// Lookup Join always contains distribution key, so we don't need vnode bitmap
let vnodes = Some(TableDistribution::all_vnodes());
// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
9 changes: 4 additions & 5 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::marker::PhantomData;

use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
Expand Down Expand Up @@ -408,12 +408,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
})
.collect();

// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Some(TableDistribution::all_vnodes()),
table_desc,
),
table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
vnode_mapping,
outer_side_key_types,
inner_side_schema,
Expand Down
6 changes: 4 additions & 2 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use prometheus::Histogram;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::table::collect_data_chunk;
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
Expand Down Expand Up @@ -106,7 +107,8 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
};

let chunk_size = source.context.get_config().developer.chunk_size as u32;
Expand Down
5 changes: 3 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand All @@ -32,7 +33,6 @@ use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::TableDistribution;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -210,7 +210,8 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
};

let scan_ranges = {
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::ops::RangeInclusive;

use crate::bitmap::Bitmap;
use crate::hash::table_distribution::SINGLETON_VNODE;
use crate::hash::VirtualNode;

/// An extension trait for `Bitmap` to support virtual node operations.
Expand All @@ -36,4 +37,17 @@ impl Bitmap {
self.high_ranges()
.map(|r| (VirtualNode::from_index(*r.start())..=VirtualNode::from_index(*r.end())))
}

/// Returns whether only the [`SINGLETON_VNODE`] is set in the bitmap.
///
/// Note that this method returning `true` does not imply that the bitmap was created by
/// [`VnodeBitmapExt::singleton`], or that the bitmap has length 1.
pub fn is_singleton(&self) -> bool {
self.count_ones() == 1 && self.iter_vnodes().next().unwrap() == SINGLETON_VNODE
}

/// Creates a bitmap with length 1 and the single bit set.
pub fn singleton() -> Self {
Self::ones(1)
}
}
5 changes: 5 additions & 0 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl VirtualNode {
}
}

impl VirtualNode {
pub const COUNT_FOR_TEST: usize = Self::COUNT;
pub const MAX_FOR_TEST: VirtualNode = Self::MAX;
}

impl VirtualNode {
// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
// chunk. When only one column is provided and its type is `Serial`, we consider the column to
Expand Down
119 changes: 51 additions & 68 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,34 @@
// limitations under the License.

use std::mem::replace;
use std::ops::Deref;
use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use risingwave_pb::plan_common::StorageTableDesc;
use tracing::warn;

use crate::array::{Array, DataChunk, PrimitiveArray};
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::bitmap::Bitmap;
use crate::hash::VirtualNode;
use crate::row::Row;
use crate::util::iter_util::ZipEqFast;

/// For tables without distribution (singleton), the `DEFAULT_VNODE` is encoded.
pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO;
/// For tables without distribution (singleton), the `SINGLETON_VNODE` is encoded.
pub const SINGLETON_VNODE: VirtualNode = VirtualNode::ZERO;

use super::VnodeBitmapExt;

#[derive(Debug, Clone)]
enum ComputeVnode {
Singleton,
DistKeyIndices {
/// Virtual nodes that the table is partitioned into.
vnodes: Arc<Bitmap>,
/// Indices of distribution key for computing vnode, based on the pk columns of the table.
dist_key_in_pk_indices: Vec<usize>,
},
VnodeColumnIndex {
/// Virtual nodes that the table is partitioned into.
vnodes: Arc<Bitmap>,
/// Index of vnode column.
vnode_col_idx_in_pk: usize,
},
Expand All @@ -47,13 +51,8 @@ enum ComputeVnode {
pub struct TableDistribution {
/// The way to compute vnode provided primary key
compute_vnode: ComputeVnode,

/// Virtual nodes that the table is partitioned into.
vnodes: Arc<Bitmap>,
}

pub const SINGLETON_VNODE: VirtualNode = DEFAULT_VNODE;

impl TableDistribution {
pub fn new_from_storage_table_desc(
vnodes: Option<Arc<Bitmap>>,
Expand All @@ -75,125 +74,107 @@ impl TableDistribution {
) -> Self {
let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk {
ComputeVnode::VnodeColumnIndex {
vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton().into()),
vnode_col_idx_in_pk,
}
} else if !dist_key_in_pk_indices.is_empty() {
ComputeVnode::DistKeyIndices {
vnodes: vnodes.expect("vnodes must be `Some` as dist key indices are set"),
dist_key_in_pk_indices,
}
} else {
ComputeVnode::Singleton
};

let vnodes = vnodes.unwrap_or_else(Self::singleton_vnode_bitmap);
if let ComputeVnode::Singleton = &compute_vnode {
if &vnodes != Self::singleton_vnode_bitmap_ref() && &vnodes != Self::all_vnodes_ref() {
warn!(
?vnodes,
"singleton distribution get non-singleton vnode bitmap"
);
}
}

Self {
compute_vnode,
vnodes,
}
Self { compute_vnode }
}

pub fn is_singleton(&self) -> bool {
matches!(&self.compute_vnode, ComputeVnode::Singleton)
}

pub fn singleton_vnode_bitmap_ref() -> &'static Arc<Bitmap> {
/// A bitmap that only the default vnode is set.
static SINGLETON_VNODES: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT);
vnodes.set(SINGLETON_VNODE.to_index(), true);
vnodes.finish().into()
});

SINGLETON_VNODES.deref()
}

pub fn singleton_vnode_bitmap() -> Arc<Bitmap> {
Self::singleton_vnode_bitmap_ref().clone()
}

pub fn all_vnodes_ref() -> &'static Arc<Bitmap> {
/// A bitmap that all vnodes are set.
static ALL_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into());
&ALL_VNODES
}

pub fn all_vnodes() -> Arc<Bitmap> {
Self::all_vnodes_ref().clone()
}

/// Distribution that accesses all vnodes, mainly used for tests.
pub fn all(dist_key_in_pk_indices: Vec<usize>) -> Self {
pub fn all(dist_key_in_pk_indices: Vec<usize>, vnode_count: usize) -> Self {
Self {
compute_vnode: ComputeVnode::DistKeyIndices {
vnodes: Bitmap::ones(vnode_count).into(),
dist_key_in_pk_indices,
},
vnodes: Self::all_vnodes(),
}
}

/// Fallback distribution for singleton or tests.
pub fn singleton() -> Self {
Self {
compute_vnode: ComputeVnode::Singleton,
vnodes: Self::singleton_vnode_bitmap(),
}
}

pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
if self.is_singleton() && &new_vnodes != Self::singleton_vnode_bitmap_ref() {
warn!(?new_vnodes, "update vnode on singleton distribution");
match &mut self.compute_vnode {
ComputeVnode::Singleton => {
if !new_vnodes.is_singleton() {
panic!(
"update vnode bitmap on singleton distribution to non-singleton: {:?}",
new_vnodes
);
}
self.vnodes().clone() // not updated
}

ComputeVnode::DistKeyIndices { vnodes, .. }
| ComputeVnode::VnodeColumnIndex { vnodes, .. } => {
assert_eq!(vnodes.len(), new_vnodes.len());
replace(vnodes, new_vnodes)
}
}
assert_eq!(self.vnodes.len(), new_vnodes.len());
replace(&mut self.vnodes, new_vnodes)
}

/// Get vnode bitmap if distributed, or a dummy [`Bitmap::singleton()`] if singleton.
pub fn vnodes(&self) -> &Arc<Bitmap> {
&self.vnodes
static SINGLETON_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::singleton().into());

match &self.compute_vnode {
ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes,
ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes,
ComputeVnode::Singleton => &SINGLETON_VNODES,
}
}

/// Get vnode value with given primary key.
pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
match &self.compute_vnode {
ComputeVnode::Singleton => SINGLETON_VNODE,
ComputeVnode::DistKeyIndices {
vnodes,
dist_key_in_pk_indices,
} => compute_vnode(pk, dist_key_in_pk_indices, &self.vnodes),
} => compute_vnode(pk, dist_key_in_pk_indices, vnodes),
ComputeVnode::VnodeColumnIndex {
vnodes,
vnode_col_idx_in_pk,
} => get_vnode_from_row(pk, *vnode_col_idx_in_pk, &self.vnodes),
} => get_vnode_from_row(pk, *vnode_col_idx_in_pk, vnodes),
}
}

pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option<VirtualNode> {
match &self.compute_vnode {
ComputeVnode::Singleton => Some(SINGLETON_VNODE),
ComputeVnode::DistKeyIndices {
vnodes,
dist_key_in_pk_indices,
} => dist_key_in_pk_indices
.iter()
.all(|&d| d < pk_prefix.len())
.then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, &self.vnodes)),
.then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, vnodes)),
ComputeVnode::VnodeColumnIndex {
vnodes,
vnode_col_idx_in_pk,
} => {
if *vnode_col_idx_in_pk >= pk_prefix.len() {
None
} else {
Some(get_vnode_from_row(
pk_prefix,
*vnode_col_idx_in_pk,
&self.vnodes,
))
Some(get_vnode_from_row(pk_prefix, *vnode_col_idx_in_pk, vnodes))
}
}
}
Expand Down Expand Up @@ -230,6 +211,7 @@ impl TableDistribution {
vec![SINGLETON_VNODE; chunk.capacity()]
}
ComputeVnode::DistKeyIndices {
vnodes,
dist_key_in_pk_indices,
} => {
let dist_key_indices = dist_key_in_pk_indices
Expand All @@ -243,13 +225,14 @@ impl TableDistribution {
.map(|(vnode, vis)| {
// Ignore the invisible rows.
if vis {
check_vnode_is_set(vnode, &self.vnodes);
check_vnode_is_set(vnode, vnodes);
}
vnode
})
.collect()
}
ComputeVnode::VnodeColumnIndex {
vnodes,
vnode_col_idx_in_pk,
} => {
let array: &PrimitiveArray<i16> =
Expand All @@ -262,7 +245,7 @@ impl TableDistribution {
let vnode = VirtualNode::from_scalar(vnode);
if vis {
assert!(exist);
check_vnode_is_set(vnode, &self.vnodes);
check_vnode_is_set(vnode, vnodes);
}
vnode
})
Expand Down
Loading

0 comments on commit 670a94f

Please sign in to comment.