diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index f261c22ba8626..1ff32d9631a8f 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -30,7 +30,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{Distribution, TableIter}; +use risingwave_storage::table::{TableDistribution, TableIter}; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::Result; @@ -176,7 +176,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { .collect(); // Lookup Join always contains distribution key, so we don't need vnode bitmap - let vnodes = Some(Distribution::all_vnodes()); + let vnodes = Some(TableDistribution::all_vnodes()); dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 0e27a8d600d38..6a5ec3cdf704f 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -32,7 +32,7 @@ use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{collect_data_chunk, Distribution}; +use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::{BatchError, Result}; @@ -183,7 +183,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. // Or it's single distribution, e.g., distinct agg. We scan in a single executor. - None => Some(Distribution::all_vnodes()), + None => Some(TableDistribution::all_vnodes()), }; let scan_ranges = { diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 64218af99f494..ef2caa8daa26a 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -238,7 +238,7 @@ pub trait Array: /// /// The raw iterator simply iterates values without checking the null bitmap. /// The returned value for NULL values is undefined. - fn raw_iter(&self) -> impl DoubleEndedIterator> { + fn raw_iter(&self) -> impl ExactSizeIterator> { (0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) }) } diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 62ea40ac6e19f..844774f9f69e8 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -208,7 +208,7 @@ impl Array for PrimitiveArray { *self.data.get_unchecked(idx) } - fn raw_iter(&self) -> impl DoubleEndedIterator> { + fn raw_iter(&self) -> impl ExactSizeIterator> { self.data.iter().cloned() } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index ac8719217e783..19b8975f775af 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -18,7 +18,7 @@ use parse_display::Display; use crate::array::{Array, ArrayImpl, DataChunk}; use crate::hash::Crc32HashCode; use crate::row::{Row, RowExt}; -use crate::types::{DataType, ScalarRefImpl}; +use crate::types::{DataType, DatumRef, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; @@ -87,6 +87,10 @@ impl VirtualNode { Self(scalar as _) } + pub fn from_datum(datum: DatumRef<'_>) -> Self { + Self::from_scalar(datum.expect("should not be none").into_int16()) + } + /// Returns the scalar representation of the virtual node. Used by `VNODE` expression. pub const fn to_scalar(self) -> i16 { self.0 as _ diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index dcca81808159e..af268e7d33193 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -21,7 +21,7 @@ use risingwave_storage::hummock::HummockStorage; use risingwave_storage::monitor::MonitoredStateStore; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::Distribution; +use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use risingwave_stream::common::table::state_table::StateTable; @@ -63,7 +63,7 @@ pub async fn make_state_table(hummock: S, table: &TableCatalog) - .collect(), table.pk().iter().map(|x| x.order_type).collect(), table.pk().iter().map(|x| x.column_index).collect(), - Distribution::all(table.distribution_key().to_vec()), // scan all vnodes + TableDistribution::all(table.distribution_key().to_vec()), // scan all vnodes Some(table.value_indices.clone()), ) .await @@ -78,7 +78,7 @@ pub fn make_storage_table(hummock: S, table: &TableCatalog) -> St StorageTable::new_partial( hummock, output_columns_ids, - Some(Distribution::all_vnodes()), + Some(TableDistribution::all_vnodes()), &table.table_desc().to_protobuf(), ) } diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 553bc581717b6..1ccbaa28e9da1 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -16,6 +16,7 @@ use itertools::Itertools as _; use num_integer::Integer as _; use risingwave_common::bail_no_function; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::hash::VirtualNode; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; @@ -578,7 +579,7 @@ fn infer_type_for_special( } ExprType::Vnode => { ensure_arity!("vnode", 1 <= | inputs |); - Ok(Some(DataType::Int16)) + Ok(Some(VirtualNode::RW_TYPE)) } ExprType::Greatest | ExprType::Least => { ensure_arity!("greatest/least", 1 <= | inputs |); diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index d3b17522bb464..5f94bea41dc91 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -47,7 +47,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; use crate::store::{PrefetchOptions, ReadOptions}; use crate::table::merge_sort::merge_sort; -use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter}; +use crate::table::{KeyedRow, TableDistribution, TableIter}; use crate::StateStore; /// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with @@ -89,16 +89,7 @@ pub struct StorageTableInner { // FIXME: revisit constructions and usages. pk_indices: Vec, - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the primary key columns by `pk_indices`. - dist_key_in_pk_indices: Vec, - - /// Virtual nodes that the table is partitioned into. - /// - /// Only the rows whose vnode of the primary key is in this set will be visible to the - /// executor. For READ_WRITE instances, the table will also check whether the written rows - /// confirm to this partition. - vnodes: Arc, + distribution: TableDistribution, /// Used for catalog table_properties table_option: TableOption, @@ -168,20 +159,12 @@ impl StorageTableInner { .collect_vec(); let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize; let versioned = table_desc.versioned; - let distribution = match vnodes { - None => Distribution::fallback(), - Some(vnodes) => { - let dist_key_in_pk_indices = table_desc - .dist_key_in_pk_indices - .iter() - .map(|&k| k as usize) - .collect_vec(); - Distribution { - dist_key_in_pk_indices, - vnodes, - } - } - }; + let dist_key_in_pk_indices = table_desc + .dist_key_in_pk_indices + .iter() + .map(|&k| k as usize) + .collect_vec(); + let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, None); Self::new_inner( store, @@ -214,7 +197,7 @@ impl StorageTableInner { output_column_ids, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), Default::default(), value_indices, 0, @@ -250,10 +233,7 @@ impl StorageTableInner { output_column_ids: Vec, order_types: Vec, pk_indices: Vec, - Distribution { - dist_key_in_pk_indices, - vnodes, - }: Distribution, + distribution: TableDistribution, table_option: TableOption, value_indices: Vec, read_prefix_len_hint: usize, @@ -316,8 +296,7 @@ impl StorageTableInner { mapping: Arc::new(mapping), row_serde: Arc::new(row_serde), pk_indices, - dist_key_in_pk_indices, - vnodes, + distribution, table_option, read_prefix_len_hint, } @@ -357,20 +336,6 @@ impl StorageTableInner { } /// Point get impl StorageTableInner { - /// Get vnode value with given primary key. - fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { - compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes) - } - - /// Try getting vnode value with given primary key prefix, used for `vnode_hint` in iterators. - /// Return `None` if the provided columns are not enough. - fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option { - self.dist_key_in_pk_indices - .iter() - .all(|&d| d < pk_prefix.len()) - .then(|| compute_vnode(pk_prefix, &self.dist_key_in_pk_indices, &self.vnodes)) - } - /// Get a single row by point get pub async fn get_row( &self, @@ -380,8 +345,11 @@ impl StorageTableInner { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); self.store.try_wait_epoch(wait_epoch).await?; - let serialized_pk = - serialize_pk_with_vnode(&pk, &self.pk_serializer, self.compute_vnode_by_pk(&pk)); + let serialized_pk = serialize_pk_with_vnode( + &pk, + &self.pk_serializer, + self.distribution.compute_vnode_by_pk(&pk), + ); assert!(pk.len() <= self.pk_indices.len()); let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len() @@ -444,8 +412,7 @@ impl StorageTableInner { /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap. #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"] pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { - assert_eq!(self.vnodes.len(), new_vnodes.len()); - std::mem::replace(&mut self.vnodes, new_vnodes) + self.distribution.update_vnode_bitmap(new_vnodes) } } @@ -494,7 +461,7 @@ impl StorageTableInner { // If `vnode_hint` is set, we can only access this single vnode. Some(vnode) => Either::Left(std::iter::once(vnode)), // Otherwise, we need to access all vnodes of this table. - None => Either::Right(self.vnodes.iter_vnodes()), + None => Either::Right(self.distribution.vnodes().iter_vnodes()), }; vnodes.map(|vnode| prefixed_range_with_vnode(encoded_key_range.clone(), vnode)) }; @@ -667,7 +634,7 @@ impl StorageTableInner { prefix_hint, (start_key, end_key), epoch, - self.try_compute_vnode_by_pk_prefix(pk_prefix), + self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix), ordered, prefetch_options, ) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 884915bdbbb3f..53c16ceea9885 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -15,13 +15,14 @@ pub mod batch_table; pub mod merge_sort; +use std::mem::replace; use std::ops::Deref; use std::sync::{Arc, LazyLock}; use bytes::Bytes; use futures::{Stream, StreamExt}; use itertools::Itertools; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{Array, DataChunk, PrimitiveArray}; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; use risingwave_common::hash::VirtualNode; @@ -29,46 +30,89 @@ use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::key::TableKey; +use tracing::warn; use crate::error::StorageResult; /// For tables without distribution (singleton), the `DEFAULT_VNODE` is encoded. pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO; +#[derive(Debug, Clone)] +enum ComputeVnode { + Singleton, + DistKeyIndices { + /// Indices of distribution key for computing vnode, based on the all columns of the table. + dist_key_in_pk_indices: Vec, + }, + VnodeColumnIndex { + /// Indices of vnode columns. + vnode_col_idx_in_pk: usize, + }, +} + +#[derive(Debug, Clone)] /// Represents the distribution for a specific table instance. -#[derive(Debug)] -pub struct Distribution { - /// Indices of distribution key for computing vnode, based on the all columns of the table. - pub dist_key_in_pk_indices: Vec, +pub struct TableDistribution { + /// The way to compute vnode provided primary key + compute_vnode: ComputeVnode, /// Virtual nodes that the table is partitioned into. - pub vnodes: Arc, + vnodes: Arc, } -impl Distribution { - /// Fallback distribution for singleton or tests. - pub fn fallback() -> Self { - /// A bitmap that only the default vnode is set. - static FALLBACK_VNODES: LazyLock> = LazyLock::new(|| { - let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); - vnodes.set(DEFAULT_VNODE.to_index(), true); - vnodes.finish().into() - }); +pub const SINGLETON_VNODE: VirtualNode = DEFAULT_VNODE; + +impl TableDistribution { + pub fn new( + vnodes: Option>, + dist_key_in_pk_indices: Vec, + vnode_col_idx_in_pk: Option, + ) -> Self { + let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { + ComputeVnode::VnodeColumnIndex { + vnode_col_idx_in_pk, + } + } else if !dist_key_in_pk_indices.is_empty() { + ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, + } + } else { + ComputeVnode::Singleton + }; + + let vnodes = vnodes.unwrap_or_else(Self::singleton_vnode_bitmap); + if let ComputeVnode::Singleton = &compute_vnode { + if &vnodes != Self::singleton_vnode_bitmap_ref() { + warn!( + ?vnodes, + "singleton distribution get non-singleton vnode bitmap" + ); + } + } + Self { - dist_key_in_pk_indices: vec![], - vnodes: FALLBACK_VNODES.clone(), + compute_vnode, + vnodes, } } - pub fn fallback_vnodes() -> Arc { + pub fn is_singleton(&self) -> bool { + matches!(&self.compute_vnode, ComputeVnode::Singleton) + } + + pub fn singleton_vnode_bitmap_ref() -> &'static Arc { /// A bitmap that only the default vnode is set. - static FALLBACK_VNODES: LazyLock> = LazyLock::new(|| { + static SINGLETON_VNODES: LazyLock> = LazyLock::new(|| { let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); - vnodes.set(DEFAULT_VNODE.to_index(), true); + vnodes.set(SINGLETON_VNODE.to_index(), true); vnodes.finish().into() }); - FALLBACK_VNODES.clone() + SINGLETON_VNODES.deref() + } + + pub fn singleton_vnode_bitmap() -> Arc { + Self::singleton_vnode_bitmap_ref().clone() } pub fn all_vnodes() -> Arc { @@ -81,10 +125,70 @@ impl Distribution { /// Distribution that accesses all vnodes, mainly used for tests. pub fn all(dist_key_in_pk_indices: Vec) -> Self { Self { - dist_key_in_pk_indices, + compute_vnode: ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, + }, vnodes: Self::all_vnodes(), } } + + /// Fallback distribution for singleton or tests. + pub fn singleton() -> Self { + Self { + compute_vnode: ComputeVnode::Singleton, + vnodes: Self::singleton_vnode_bitmap(), + } + } + + pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { + if self.is_singleton() && &new_vnodes != Self::singleton_vnode_bitmap_ref() { + warn!(?new_vnodes, "update vnode on singleton distribution"); + } + assert_eq!(self.vnodes.len(), new_vnodes.len()); + replace(&mut self.vnodes, new_vnodes) + } + + pub fn vnodes(&self) -> &Arc { + &self.vnodes + } + + /// Get vnode value with given primary key. + pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { + match &self.compute_vnode { + ComputeVnode::Singleton => SINGLETON_VNODE, + ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, + } => compute_vnode(pk, dist_key_in_pk_indices, &self.vnodes), + ComputeVnode::VnodeColumnIndex { + vnode_col_idx_in_pk, + } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, &self.vnodes), + } + } + + pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option { + match &self.compute_vnode { + ComputeVnode::Singleton => Some(SINGLETON_VNODE), + ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, + } => dist_key_in_pk_indices + .iter() + .all(|&d| d < pk_prefix.len()) + .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, &self.vnodes)), + ComputeVnode::VnodeColumnIndex { + vnode_col_idx_in_pk, + } => { + if *vnode_col_idx_in_pk >= pk_prefix.len() { + None + } else { + Some(get_vnode_from_row( + pk_prefix, + *vnode_col_idx_in_pk, + &self.vnodes, + )) + } + } + } + } } // TODO: GAT-ify this trait or remove this trait @@ -159,45 +263,74 @@ pub fn get_second(arg: Result<(T, U), E>) -> Result { /// Get vnode value with `indices` on the given `row`. pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode { - let vnode = if indices.is_empty() { - DEFAULT_VNODE - } else { - let vnode = VirtualNode::compute_row(&row, indices); - check_vnode_is_set(vnode, vnodes); - vnode - }; + assert!(!indices.is_empty()); + let vnode = VirtualNode::compute_row(&row, indices); + check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode); vnode } -/// Get vnode values with `indices` on the given `chunk`. -pub fn compute_chunk_vnode( - chunk: &DataChunk, - dist_key_in_pk_indices: &[usize], - pk_indices: &[usize], - vnodes: &Bitmap, -) -> Vec { - if dist_key_in_pk_indices.is_empty() { - vec![DEFAULT_VNODE; chunk.capacity()] - } else { - let dist_key_indices = dist_key_in_pk_indices - .iter() - .map(|idx| pk_indices[*idx]) - .collect_vec(); +pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode { + let vnode = VirtualNode::from_datum(row.datum_at(index)); + // TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark + // check_vnode_is_set(vnode, vnodes); - VirtualNode::compute_chunk(chunk, &dist_key_indices) - .into_iter() - .zip_eq_fast(chunk.visibility().iter()) - .map(|(vnode, vis)| { - // Ignore the invisible rows. - if vis { - check_vnode_is_set(vnode, vnodes); - } - vnode - }) - .collect() + tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode); + + vnode +} + +impl TableDistribution { + /// Get vnode values with `indices` on the given `chunk`. + /// + /// Vnode of invisible rows will be included. Only the vnode of visible row check if it's accessible + pub fn compute_chunk_vnode(&self, chunk: &DataChunk, pk_indices: &[usize]) -> Vec { + match &self.compute_vnode { + ComputeVnode::Singleton => { + vec![SINGLETON_VNODE; chunk.capacity()] + } + ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, + } => { + let dist_key_indices = dist_key_in_pk_indices + .iter() + .map(|idx| pk_indices[*idx]) + .collect_vec(); + + VirtualNode::compute_chunk(chunk, &dist_key_indices) + .into_iter() + .zip_eq_fast(chunk.visibility().iter()) + .map(|(vnode, vis)| { + // Ignore the invisible rows. + if vis { + check_vnode_is_set(vnode, &self.vnodes); + } + vnode + }) + .collect() + } + ComputeVnode::VnodeColumnIndex { + vnode_col_idx_in_pk, + } => { + let array: &PrimitiveArray = + chunk.columns()[pk_indices[*vnode_col_idx_in_pk]].as_int16(); + array + .raw_iter() + .zip_eq_fast(array.null_bitmap().iter()) + .zip_eq_fast(chunk.visibility().iter()) + .map(|((vnode, exist), vis)| { + let vnode = VirtualNode::from_scalar(vnode); + if vis { + assert!(exist); + check_vnode_is_set(vnode, &self.vnodes); + } + vnode + }) + .collect_vec() + } + } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index ad2d08b42f51c..8aa75a4552da5 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -47,7 +47,7 @@ use risingwave_storage::error::StorageError; use risingwave_storage::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::StateStoreReadIterStream; -use risingwave_storage::table::{compute_vnode, Distribution}; +use risingwave_storage::table::{compute_vnode, TableDistribution, SINGLETON_VNODE}; use crate::common::log_store_impl::kv_log_store::{ KvLogStoreReadMetrics, ReaderTruncationOffsetType, RowOpCodeType, SeqIdType, @@ -106,7 +106,7 @@ pub(crate) struct LogStoreRowSerde { /// Indices of distribution key for computing vnode. /// Note that the index is based on the all columns of the table, instead of the output ones. // FIXME: revisit constructions and usages. - dist_key_indices: Vec, + dist_key_indices: Option>, /// Virtual nodes that the table is partitioned into. /// @@ -156,7 +156,7 @@ impl LogStoreRowSerde { let vnodes = match vnodes { Some(vnodes) => vnodes, - None => Distribution::fallback_vnodes(), + None => TableDistribution::singleton_vnode_bitmap(), }; // epoch and seq_id. The seq_id of barrier is set null, and therefore the second order type @@ -169,6 +169,18 @@ impl LogStoreRowSerde { let epoch_serde = OrderedRowSerde::new(vec![EPOCH_COLUMN_TYPE], vec![OrderType::ascending()]); + let dist_key_indices = if dist_key_indices.is_empty() { + if &vnodes != TableDistribution::singleton_vnode_bitmap_ref() { + warn!( + ?vnodes, + "singleton log store gets non-singleton vnode bitmap" + ); + } + None + } else { + Some(dist_key_indices) + }; + Self { pk_serde, row_serde, @@ -197,6 +209,14 @@ impl LogStoreRowSerde { } impl LogStoreRowSerde { + fn compute_vnode(&self, row: impl Row) -> VirtualNode { + if let Some(dist_key_indices) = &self.dist_key_indices { + compute_vnode(row, dist_key_indices, &self.vnodes) + } else { + SINGLETON_VNODE + } + } + pub(crate) fn serialize_data_row( &self, epoch: u64, @@ -218,7 +238,7 @@ impl LogStoreRowSerde { .clone() .chain([Some(ScalarImpl::Int16(op_code))]) .chain(row); - let vnode = compute_vnode(&extended_row, &self.dist_key_indices, &self.vnodes); + let vnode = self.compute_vnode(&extended_row); let key_bytes = serialize_pk_with_vnode(&pk, &self.pk_serde, vnode); let value_bytes = self.row_serde.serialize(extended_row).into(); (vnode, key_bytes, value_bytes) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 82697294cf29f..269e1dd0490fc 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -58,7 +58,7 @@ use risingwave_storage::store::{ ReadOptions, SealCurrentEpochOptions, StateStoreIterItemStream, }; use risingwave_storage::table::merge_sort::merge_sort; -use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution, KeyedRow}; +use risingwave_storage::table::{KeyedRow, TableDistribution}; use risingwave_storage::StateStore; use tracing::{trace, Instrument}; @@ -108,31 +108,18 @@ pub struct StateTableInner< // FIXME: revisit constructions and usages. pk_indices: Vec, - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the all columns of the table, instead of the output ones. - // FIXME: revisit constructions and usages. - // dist_key_indices: Vec, - - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the primary key columns by `pk_indices`. - dist_key_in_pk_indices: Vec, - - prefix_hint_len: usize, - - /// Virtual nodes that the table is partitioned into. + /// Distribution of the state table. /// - /// Only the rows whose vnode of the primary key is in this set will be visible to the + /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the /// executor. The table will also check whether the written rows /// conform to this partition. - vnodes: Arc, + distribution: TableDistribution, + + prefix_hint_len: usize, /// Used for catalog table_properties table_option: TableOption, - /// An optional column index which is the vnode of each row computed by the table's consistent - /// hash distribution. - vnode_col_idx_in_pk: Option, - value_indices: Option>, /// Strategy to buffer watermark for lazy state cleaning. @@ -322,21 +309,20 @@ where .collect() }; + let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| { + let vnode_col_idx = *idx as usize; + pk_indices.iter().position(|&i| vnode_col_idx == i) + }); + + let distribution = + TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + let pk_data_types = pk_indices .iter() .map(|i| table_columns[*i].data_type.clone()) .collect(); let pk_serde = OrderedRowSerde::new(pk_data_types, order_types); - let vnodes = match vnodes { - Some(vnodes) => vnodes, - - None => Distribution::fallback_vnodes(), - }; - let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| { - let vnode_col_idx = *idx as usize; - pk_indices.iter().position(|&i| vnode_col_idx == i) - }); let input_value_indices = table_catalog .value_indices .iter() @@ -428,11 +414,9 @@ where pk_serde, row_serde, pk_indices, - dist_key_in_pk_indices, + distribution, prefix_hint_len, - vnodes, table_option, - vnode_col_idx_in_pk, value_indices, watermark_buffer_strategy: W::default(), state_clean_watermark: None, @@ -458,7 +442,7 @@ where columns, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), None, ) .await @@ -479,7 +463,7 @@ where columns, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), Some(value_indices), ) .await @@ -499,7 +483,7 @@ where columns, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), None, false, ) @@ -514,7 +498,7 @@ where table_columns: Vec, order_types: Vec, pk_indices: Vec, - distribution: Distribution, + distribution: TableDistribution, value_indices: Option>, ) -> Self { Self::new_with_distribution_inner( @@ -536,7 +520,7 @@ where table_columns: Vec, order_types: Vec, pk_indices: Vec, - distribution: Distribution, + distribution: TableDistribution, value_indices: Option>, ) -> Self { Self::new_with_distribution_inner( @@ -559,10 +543,7 @@ where table_columns: Vec, order_types: Vec, pk_indices: Vec, - Distribution { - dist_key_in_pk_indices, - vnodes, - }: Distribution, + distribution: TableDistribution, value_indices: Option>, is_consistent_op: bool, ) -> Self { @@ -612,11 +593,9 @@ where pk_serde, row_serde, pk_indices, - dist_key_in_pk_indices, + distribution, prefix_hint_len: 0, - vnodes, table_option: Default::default(), - vnode_col_idx_in_pk: None, value_indices, watermark_buffer_strategy: W::default(), state_clean_watermark: None, @@ -636,17 +615,6 @@ where self.table_id.table_id } - /// Returns whether the table is a singleton table. - fn is_singleton(&self) -> bool { - // If the table has a vnode column, it must be hash-distributed (but act like a singleton - // table). So we should return false here. Otherwise, we check the distribution key. - if self.vnode_col_idx_in_pk.is_some() { - false - } else { - self.dist_key_in_pk_indices.is_empty() - } - } - /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called pub fn epoch(&self) -> u64 { self.local_store.epoch() @@ -654,25 +622,14 @@ where /// Get the vnode value with given (prefix of) primary key fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode { - let prefix_len = pk_prefix.len(); - if let Some(vnode_col_idx_in_pk) = self.vnode_col_idx_in_pk { - let vnode = pk_prefix.datum_at(vnode_col_idx_in_pk).unwrap(); - VirtualNode::from_scalar(vnode.into_int16()) - } else { - // For streaming, the given prefix must be enough to calculate the vnode - assert!(self.dist_key_in_pk_indices.iter().all(|&d| d < prefix_len)); - compute_vnode(pk_prefix, &self.dist_key_in_pk_indices, &self.vnodes) - } + self.distribution + .try_compute_vnode_by_pk_prefix(pk_prefix) + .expect("For streaming, the given prefix must be enough to calculate the vnode") } - /// Get the vnode value of the given row - // pub fn compute_vnode(&self, row: impl Row) -> VirtualNode { - // compute_vnode(row, &self.dist_key_indices, &self.vnodes) - // } - - /// Get the vnode value of the given row + /// Get the vnode value of the given primary key pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { - compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes) + self.distribution.compute_vnode_by_pk(pk) } /// NOTE(kwannoel): This is used by backfill. @@ -696,12 +653,8 @@ where &self.pk_serde } - // pub fn dist_key_indices(&self) -> &[usize] { - // &self.dist_key_indices - // } - pub fn vnodes(&self) -> &Arc { - &self.vnodes + self.distribution.vnodes() } pub fn value_indices(&self) -> &Option> { @@ -711,10 +664,6 @@ where fn is_dirty(&self) -> bool { self.local_store.is_dirty() || self.state_clean_watermark.is_some() } - - pub fn vnode_bitmap(&self) -> &Bitmap { - &self.vnodes - } } impl StateTableInner @@ -774,7 +723,7 @@ where } let serialized_pk = - serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_prefix_vnode(&pk)); + serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk)); let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() { Some(serialized_pk.slice(VirtualNode::SIZE..)) @@ -822,15 +771,16 @@ where !self.is_dirty(), "vnode bitmap should only be updated when state table is clean" ); - if self.is_singleton() { + if self.distribution.is_singleton() { assert_eq!( - new_vnodes, self.vnodes, + &new_vnodes, + self.vnodes(), "should not update vnode bitmap for singleton table" ); } - assert_eq!(self.vnodes.len(), new_vnodes.len()); + assert_eq!(self.vnodes().len(), new_vnodes.len()); - let cache_may_stale = cache_may_stale(&self.vnodes, &new_vnodes); + let cache_may_stale = cache_may_stale(self.vnodes(), &new_vnodes); if cache_may_stale { self.state_clean_watermark = None; @@ -840,7 +790,7 @@ where } ( - std::mem::replace(&mut self.vnodes, new_vnodes), + self.distribution.update_vnode_bitmap(new_vnodes), cache_may_stale, ) } @@ -920,7 +870,7 @@ where self.watermark_cache.insert(&pk); } - let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_prefix_vnode(pk)); + let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk)); let value_bytes = self.serialize_value(value); self.insert_inner(key_bytes, value_bytes); } @@ -934,7 +884,7 @@ where self.watermark_cache.delete(&pk); } - let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_prefix_vnode(pk)); + let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk)); let value_bytes = self.serialize_value(old_value); self.delete_inner(key_bytes, value_bytes); } @@ -949,7 +899,7 @@ where ); let new_key_bytes = - serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_prefix_vnode(new_pk)); + serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk)); let old_value_bytes = self.serialize_value(old_value); let new_value_bytes = self.serialize_value(new_value); @@ -962,7 +912,7 @@ where pub fn update_without_old_value(&mut self, new_value: impl Row) { let new_pk = (&new_value).project(self.pk_indices()); let new_key_bytes = - serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_prefix_vnode(new_pk)); + serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk)); let new_value_bytes = self.serialize_value(new_value); self.update_inner(new_key_bytes, None, new_value_bytes); @@ -992,12 +942,9 @@ where }; let (chunk, op) = chunk.into_parts(); - let vnodes = compute_chunk_vnode( - &chunk, - &self.dist_key_in_pk_indices, - &self.pk_indices, - &self.vnodes, - ); + let vnodes = self + .distribution + .compute_chunk_vnode(&chunk, &self.pk_indices); let values = if let Some(ref value_indices) = self.value_indices { chunk.project(value_indices).serialize_with(&self.row_serde) @@ -1209,7 +1156,7 @@ where // Compute Delete Ranges if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix { trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{ - self.vnodes.iter_vnodes().collect_vec() + self.vnodes().iter_vnodes().collect_vec() }, "delete range"); if prefix_serializer .as_ref() @@ -1222,7 +1169,7 @@ where seal_watermark = Some(( WatermarkDirection::Ascending, VnodeWatermark::new( - self.vnodes.clone(), + self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), )); @@ -1230,7 +1177,7 @@ where seal_watermark = Some(( WatermarkDirection::Descending, VnodeWatermark::new( - self.vnodes.clone(), + self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), )); @@ -1416,10 +1363,6 @@ where .map_err(StreamExecutorError::from) } - pub fn get_vnodes(&self) -> Arc { - self.vnodes.clone() - } - /// Returns: /// false: the provided pk prefix is absent in state store. /// true: the provided pk prefix may or may not be present in state store. diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 20038c86a1b7d..1a19a3fe201fc 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -405,7 +405,7 @@ pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Optio pub(crate) async fn get_progress_per_vnode( state_table: &StateTableInner, ) -> StreamExecutorResult> { - debug_assert!(!state_table.vnode_bitmap().is_empty()); + debug_assert!(!state_table.vnodes().is_empty()); let vnodes = state_table.vnodes().iter_vnodes(); let mut result = Vec::with_capacity(state_table.vnodes().len()); let vnode_keys = vnodes.map(|vnode| { diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index c9dffddfbb7ee..1f2388f6ed4f0 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -453,7 +453,7 @@ impl DynamicFilterExecutor SortBuffer { ); let streams: Vec<_> = - futures::future::try_join_all(buffer_table.vnode_bitmap().iter_vnodes().map(|vnode| { + futures::future::try_join_all(buffer_table.vnodes().iter_vnodes().map(|vnode| { buffer_table.iter_with_vnode( vnode, &pk_range, diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 26daf633fe11d..454f69582981b 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -246,8 +246,7 @@ impl WatermarkFilterExecutor { last_checkpoint_watermark = current_watermark.clone(); // Persist the watermark when checkpoint arrives. if let Some(watermark) = current_watermark.clone() { - let vnodes = table.get_vnodes(); - for vnode in vnodes.iter_vnodes() { + for vnode in table.vnodes().clone().iter_vnodes() { let pk = Some(ScalarImpl::Int16(vnode.to_scalar())); let row = [pk, Some(watermark.clone())]; // This is an upsert. @@ -353,7 +352,7 @@ mod tests { use risingwave_common::types::Date; use risingwave_common::util::sort_util::OrderType; use risingwave_storage::memory::MemoryStateStore; - use risingwave_storage::table::Distribution; + use risingwave_storage::table::TableDistribution; use super::*; use crate::executor::test_utils::expr::build_from_pretty; @@ -383,7 +382,7 @@ mod tests { column_descs, order_types.to_vec(), pk_indices.to_vec(), - Distribution::all(vec![0]), + TableDistribution::all(vec![0]), Some(val_indices.to_vec()), ) .await