Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unify compute vnode logic #14526

Merged
merged 8 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{Distribution, TableIter};
use risingwave_storage::table::{TableDistribution, TableIter};
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::Result;
Expand Down Expand Up @@ -176,7 +176,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.collect();

// Lookup Join always contains distribution key, so we don't need vnode bitmap
let vnodes = Some(Distribution::all_vnodes());
let vnodes = Some(TableDistribution::all_vnodes());
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, Distribution};
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -183,7 +183,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(Distribution::all_vnodes()),
None => Some(TableDistribution::all_vnodes()),
};

let scan_ranges = {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub trait Array:
///
/// The raw iterator simply iterates values without checking the null bitmap.
/// The returned value for NULL values is undefined.
fn raw_iter(&self) -> impl DoubleEndedIterator<Item = Self::RefItem<'_>> {
fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
(0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) })
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl<T: PrimitiveArrayItemType> Array for PrimitiveArray<T> {
*self.data.get_unchecked(idx)
}

fn raw_iter(&self) -> impl DoubleEndedIterator<Item = Self::RefItem<'_>> {
fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
self.data.iter().cloned()
}

Expand Down
6 changes: 5 additions & 1 deletion src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parse_display::Display;
use crate::array::{Array, ArrayImpl, DataChunk};
use crate::hash::Crc32HashCode;
use crate::row::{Row, RowExt};
use crate::types::{DataType, ScalarRefImpl};
use crate::types::{DataType, DatumRef, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

Expand Down Expand Up @@ -87,6 +87,10 @@ impl VirtualNode {
Self(scalar as _)
}

pub fn from_datum(datum: DatumRef<'_>) -> Self {
Self::from_scalar(datum.expect("should not be none").into_int16())
}

/// Returns the scalar representation of the virtual node. Used by `VNODE` expression.
pub const fn to_scalar(self) -> i16 {
self.0 as _
Expand Down
6 changes: 3 additions & 3 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_storage::hummock::HummockStorage;
use risingwave_storage::monitor::MonitoredStateStore;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::Distribution;
use risingwave_storage::table::TableDistribution;
use risingwave_storage::StateStore;
use risingwave_stream::common::table::state_table::StateTable;

Expand Down Expand Up @@ -63,7 +63,7 @@ pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -
.collect(),
table.pk().iter().map(|x| x.order_type).collect(),
table.pk().iter().map(|x| x.column_index).collect(),
Distribution::all(table.distribution_key().to_vec()), // scan all vnodes
TableDistribution::all(table.distribution_key().to_vec()), // scan all vnodes
Some(table.value_indices.clone()),
)
.await
Expand All @@ -78,7 +78,7 @@ pub fn make_storage_table<S: StateStore>(hummock: S, table: &TableCatalog) -> St
StorageTable::new_partial(
hummock,
output_columns_ids,
Some(Distribution::all_vnodes()),
Some(TableDistribution::all_vnodes()),
&table.table_desc().to_protobuf(),
)
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/expr/type_inference/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use itertools::Itertools as _;
use num_integer::Integer as _;
use risingwave_common::bail_no_function;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::hash::VirtualNode;
use risingwave_common::types::{DataType, StructType};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::AggKind;
Expand Down Expand Up @@ -578,7 +579,7 @@ fn infer_type_for_special(
}
ExprType::Vnode => {
ensure_arity!("vnode", 1 <= | inputs |);
Ok(Some(DataType::Int16))
Ok(Some(VirtualNode::RW_TYPE))
}
ExprType::Greatest | ExprType::Least => {
ensure_arity!("greatest/least", 1 <= | inputs |);
Expand Down
71 changes: 19 additions & 52 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{PrefetchOptions, ReadOptions};
use crate::table::merge_sort::merge_sort;
use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter};
use crate::table::{KeyedRow, TableDistribution, TableIter};
use crate::StateStore;

/// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with
Expand Down Expand Up @@ -89,16 +89,7 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
// FIXME: revisit constructions and usages.
pk_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the primary key columns by `pk_indices`.
dist_key_in_pk_indices: Vec<usize>,

/// Virtual nodes that the table is partitioned into.
///
/// Only the rows whose vnode of the primary key is in this set will be visible to the
/// executor. For READ_WRITE instances, the table will also check whether the written rows
/// confirm to this partition.
vnodes: Arc<Bitmap>,
distribution: TableDistribution,

/// Used for catalog table_properties
table_option: TableOption,
Expand Down Expand Up @@ -168,20 +159,12 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
.collect_vec();
let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let versioned = table_desc.versioned;
let distribution = match vnodes {
None => Distribution::fallback(),
Some(vnodes) => {
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
Distribution {
dist_key_in_pk_indices,
vnodes,
}
}
};
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, None);

Self::new_inner(
store,
Expand Down Expand Up @@ -214,7 +197,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
output_column_ids,
order_types,
pk_indices,
Distribution::fallback(),
TableDistribution::singleton(),
Default::default(),
value_indices,
0,
Expand Down Expand Up @@ -250,10 +233,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
output_column_ids: Vec<ColumnId>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
Distribution {
dist_key_in_pk_indices,
vnodes,
}: Distribution,
distribution: TableDistribution,
table_option: TableOption,
value_indices: Vec<usize>,
read_prefix_len_hint: usize,
Expand Down Expand Up @@ -316,8 +296,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
mapping: Arc::new(mapping),
row_serde: Arc::new(row_serde),
pk_indices,
dist_key_in_pk_indices,
vnodes,
distribution,
table_option,
read_prefix_len_hint,
}
Expand Down Expand Up @@ -357,20 +336,6 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
}
/// Point get
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
/// Get vnode value with given primary key.
fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes)
}

/// Try getting vnode value with given primary key prefix, used for `vnode_hint` in iterators.
/// Return `None` if the provided columns are not enough.
fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option<VirtualNode> {
self.dist_key_in_pk_indices
.iter()
.all(|&d| d < pk_prefix.len())
.then(|| compute_vnode(pk_prefix, &self.dist_key_in_pk_indices, &self.vnodes))
}

/// Get a single row by point get
pub async fn get_row(
&self,
Expand All @@ -380,8 +345,11 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
let epoch = wait_epoch.get_epoch();
let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
self.store.try_wait_epoch(wait_epoch).await?;
let serialized_pk =
serialize_pk_with_vnode(&pk, &self.pk_serializer, self.compute_vnode_by_pk(&pk));
let serialized_pk = serialize_pk_with_vnode(
&pk,
&self.pk_serializer,
self.distribution.compute_vnode_by_pk(&pk),
);
assert!(pk.len() <= self.pk_indices.len());

let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
Expand Down Expand Up @@ -444,8 +412,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
/// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
assert_eq!(self.vnodes.len(), new_vnodes.len());
std::mem::replace(&mut self.vnodes, new_vnodes)
self.distribution.update_vnode_bitmap(new_vnodes)
}
}

Expand Down Expand Up @@ -494,7 +461,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
// If `vnode_hint` is set, we can only access this single vnode.
Some(vnode) => Either::Left(std::iter::once(vnode)),
// Otherwise, we need to access all vnodes of this table.
None => Either::Right(self.vnodes.iter_vnodes()),
None => Either::Right(self.distribution.vnodes().iter_vnodes()),
};
vnodes.map(|vnode| prefixed_range_with_vnode(encoded_key_range.clone(), vnode))
};
Expand Down Expand Up @@ -667,7 +634,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
prefix_hint,
(start_key, end_key),
epoch,
self.try_compute_vnode_by_pk_prefix(pk_prefix),
self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
ordered,
prefetch_options,
)
Expand Down
Loading
Loading