From f227cec76c73ff0736a4f7d0097912709e4993f6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 12 Jan 2024 12:50:42 +0800 Subject: [PATCH 1/8] refactor: unify compute vnode logic --- .../executor/join/distributed_lookup_join.rs | 4 +- src/batch/src/executor/row_seq_scan.rs | 4 +- src/common/src/hash/consistent_hash/vnode.rs | 10 +- src/ctl/src/cmd_impl/table/scan.rs | 6 +- src/frontend/src/expr/type_inference/func.rs | 3 +- .../src/table/batch_table/storage_table.rs | 71 ++--- src/storage/src/table/mod.rs | 248 ++++++++++++++---- .../log_store_impl/kv_log_store/serde.rs | 36 ++- src/stream/src/common/table/state_table.rs | 171 +++++------- src/stream/src/executor/backfill/utils.rs | 2 +- src/stream/src/executor/dynamic_filter.rs | 2 +- src/stream/src/executor/sort_buffer.rs | 2 +- src/stream/src/executor/watermark_filter.rs | 7 +- 13 files changed, 321 insertions(+), 245 deletions(-) diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index f261c22ba8626..1ff32d9631a8f 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -30,7 +30,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{Distribution, TableIter}; +use risingwave_storage::table::{TableDistribution, TableIter}; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::Result; @@ -176,7 +176,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { .collect(); // Lookup Join always contains distribution key, so we don't need vnode bitmap - let vnodes = Some(Distribution::all_vnodes()); + let vnodes = Some(TableDistribution::all_vnodes()); dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 0e27a8d600d38..6a5ec3cdf704f 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -32,7 +32,7 @@ use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{collect_data_chunk, Distribution}; +use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::{BatchError, Result}; @@ -183,7 +183,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. // Or it's single distribution, e.g., distinct agg. We scan in a single executor. - None => Some(Distribution::all_vnodes()), + None => Some(TableDistribution::all_vnodes()), }; let scan_ranges = { diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index ac8719217e783..584c6cdd39198 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -18,7 +18,7 @@ use parse_display::Display; use crate::array::{Array, ArrayImpl, DataChunk}; use crate::hash::Crc32HashCode; use crate::row::{Row, RowExt}; -use crate::types::{DataType, ScalarRefImpl}; +use crate::types::{DataType, DatumRef, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; @@ -87,6 +87,14 @@ impl VirtualNode { Self(scalar as _) } + pub const fn vnode_data_type() -> DataType { + DataType::Int16 + } + + pub fn from_datum(datum: DatumRef<'_>) -> Self { + Self::from_scalar(datum.expect("should not be none").into_int16()) + } + /// Returns the scalar representation of the virtual node. Used by `VNODE` expression. pub const fn to_scalar(self) -> i16 { self.0 as _ diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index dcca81808159e..af268e7d33193 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -21,7 +21,7 @@ use risingwave_storage::hummock::HummockStorage; use risingwave_storage::monitor::MonitoredStateStore; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::Distribution; +use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use risingwave_stream::common::table::state_table::StateTable; @@ -63,7 +63,7 @@ pub async fn make_state_table(hummock: S, table: &TableCatalog) - .collect(), table.pk().iter().map(|x| x.order_type).collect(), table.pk().iter().map(|x| x.column_index).collect(), - Distribution::all(table.distribution_key().to_vec()), // scan all vnodes + TableDistribution::all(table.distribution_key().to_vec()), // scan all vnodes Some(table.value_indices.clone()), ) .await @@ -78,7 +78,7 @@ pub fn make_storage_table(hummock: S, table: &TableCatalog) -> St StorageTable::new_partial( hummock, output_columns_ids, - Some(Distribution::all_vnodes()), + Some(TableDistribution::all_vnodes()), &table.table_desc().to_protobuf(), ) } diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 553bc581717b6..5619661699233 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -16,6 +16,7 @@ use itertools::Itertools as _; use num_integer::Integer as _; use risingwave_common::bail_no_function; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::hash::VirtualNode; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; @@ -578,7 +579,7 @@ fn infer_type_for_special( } ExprType::Vnode => { ensure_arity!("vnode", 1 <= | inputs |); - Ok(Some(DataType::Int16)) + Ok(Some(VirtualNode::vnode_data_type())) } ExprType::Greatest | ExprType::Least => { ensure_arity!("greatest/least", 1 <= | inputs |); diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index d3b17522bb464..5f94bea41dc91 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -47,7 +47,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; use crate::store::{PrefetchOptions, ReadOptions}; use crate::table::merge_sort::merge_sort; -use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter}; +use crate::table::{KeyedRow, TableDistribution, TableIter}; use crate::StateStore; /// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with @@ -89,16 +89,7 @@ pub struct StorageTableInner { // FIXME: revisit constructions and usages. pk_indices: Vec, - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the primary key columns by `pk_indices`. - dist_key_in_pk_indices: Vec, - - /// Virtual nodes that the table is partitioned into. - /// - /// Only the rows whose vnode of the primary key is in this set will be visible to the - /// executor. For READ_WRITE instances, the table will also check whether the written rows - /// confirm to this partition. - vnodes: Arc, + distribution: TableDistribution, /// Used for catalog table_properties table_option: TableOption, @@ -168,20 +159,12 @@ impl StorageTableInner { .collect_vec(); let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize; let versioned = table_desc.versioned; - let distribution = match vnodes { - None => Distribution::fallback(), - Some(vnodes) => { - let dist_key_in_pk_indices = table_desc - .dist_key_in_pk_indices - .iter() - .map(|&k| k as usize) - .collect_vec(); - Distribution { - dist_key_in_pk_indices, - vnodes, - } - } - }; + let dist_key_in_pk_indices = table_desc + .dist_key_in_pk_indices + .iter() + .map(|&k| k as usize) + .collect_vec(); + let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, None); Self::new_inner( store, @@ -214,7 +197,7 @@ impl StorageTableInner { output_column_ids, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), Default::default(), value_indices, 0, @@ -250,10 +233,7 @@ impl StorageTableInner { output_column_ids: Vec, order_types: Vec, pk_indices: Vec, - Distribution { - dist_key_in_pk_indices, - vnodes, - }: Distribution, + distribution: TableDistribution, table_option: TableOption, value_indices: Vec, read_prefix_len_hint: usize, @@ -316,8 +296,7 @@ impl StorageTableInner { mapping: Arc::new(mapping), row_serde: Arc::new(row_serde), pk_indices, - dist_key_in_pk_indices, - vnodes, + distribution, table_option, read_prefix_len_hint, } @@ -357,20 +336,6 @@ impl StorageTableInner { } /// Point get impl StorageTableInner { - /// Get vnode value with given primary key. - fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { - compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes) - } - - /// Try getting vnode value with given primary key prefix, used for `vnode_hint` in iterators. - /// Return `None` if the provided columns are not enough. - fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option { - self.dist_key_in_pk_indices - .iter() - .all(|&d| d < pk_prefix.len()) - .then(|| compute_vnode(pk_prefix, &self.dist_key_in_pk_indices, &self.vnodes)) - } - /// Get a single row by point get pub async fn get_row( &self, @@ -380,8 +345,11 @@ impl StorageTableInner { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); self.store.try_wait_epoch(wait_epoch).await?; - let serialized_pk = - serialize_pk_with_vnode(&pk, &self.pk_serializer, self.compute_vnode_by_pk(&pk)); + let serialized_pk = serialize_pk_with_vnode( + &pk, + &self.pk_serializer, + self.distribution.compute_vnode_by_pk(&pk), + ); assert!(pk.len() <= self.pk_indices.len()); let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len() @@ -444,8 +412,7 @@ impl StorageTableInner { /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap. #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"] pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { - assert_eq!(self.vnodes.len(), new_vnodes.len()); - std::mem::replace(&mut self.vnodes, new_vnodes) + self.distribution.update_vnode_bitmap(new_vnodes) } } @@ -494,7 +461,7 @@ impl StorageTableInner { // If `vnode_hint` is set, we can only access this single vnode. Some(vnode) => Either::Left(std::iter::once(vnode)), // Otherwise, we need to access all vnodes of this table. - None => Either::Right(self.vnodes.iter_vnodes()), + None => Either::Right(self.distribution.vnodes().iter_vnodes()), }; vnodes.map(|vnode| prefixed_range_with_vnode(encoded_key_range.clone(), vnode)) }; @@ -667,7 +634,7 @@ impl StorageTableInner { prefix_hint, (start_key, end_key), epoch, - self.try_compute_vnode_by_pk_prefix(pk_prefix), + self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix), ordered, prefetch_options, ) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 884915bdbbb3f..240e2d2c85727 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -15,6 +15,7 @@ pub mod batch_table; pub mod merge_sort; +use std::mem::replace; use std::ops::Deref; use std::sync::{Arc, LazyLock}; @@ -29,46 +30,96 @@ use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::key::TableKey; +use tracing::warn; use crate::error::StorageResult; /// For tables without distribution (singleton), the `DEFAULT_VNODE` is encoded. pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO; +#[derive(Debug, Clone)] /// Represents the distribution for a specific table instance. -#[derive(Debug)] -pub struct Distribution { - /// Indices of distribution key for computing vnode, based on the all columns of the table. - pub dist_key_in_pk_indices: Vec, - - /// Virtual nodes that the table is partitioned into. - pub vnodes: Arc, +pub enum TableDistribution { + Singleton, + DistKeyInPkIndices { + /// Indices of distribution key for computing vnode, based on the all columns of the table. + dist_key_in_pk_indices: Vec, + + /// Virtual nodes that the table is partitioned into. + vnodes: Arc, + }, + VnodeColumnIndex { + /// Indices of vnode columns. + vnode_col_idx_in_pk: usize, + + /// Virtual nodes that the table is partitioned into. + vnodes: Arc, + }, } -impl Distribution { - /// Fallback distribution for singleton or tests. - pub fn fallback() -> Self { - /// A bitmap that only the default vnode is set. - static FALLBACK_VNODES: LazyLock> = LazyLock::new(|| { - let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); - vnodes.set(DEFAULT_VNODE.to_index(), true); - vnodes.finish().into() - }); - Self { - dist_key_in_pk_indices: vec![], - vnodes: FALLBACK_VNODES.clone(), +pub const SINGLETON_VNODE: VirtualNode = DEFAULT_VNODE; + +impl TableDistribution { + pub fn new( + vnodes: Option>, + dist_key_in_pk_indices: Vec, + vnode_col_idx_in_pk: Option, + ) -> Self { + match vnodes { + None => { + if !dist_key_in_pk_indices.is_empty() { + warn!( + ?dist_key_in_pk_indices, + "has dist key but no vnodes provided" + ); + } + if vnode_col_idx_in_pk.is_some() { + warn!( + vnode_col_idx_in_pk = vnode_col_idx_in_pk.unwrap(), + "has vnode col idx in pk but no vnodes provided" + ); + } + Self::Singleton + } + Some(vnodes) => { + if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { + Self::VnodeColumnIndex { + vnode_col_idx_in_pk, + vnodes, + } + } else if !dist_key_in_pk_indices.is_empty() { + Self::DistKeyInPkIndices { + dist_key_in_pk_indices, + vnodes, + } + } else { + warn!( + ?vnodes, + "no dist key or vnode col idx provided but provided vnodes" + ); + Self::Singleton + } + } } } - pub fn fallback_vnodes() -> Arc { + pub fn is_singleton(&self) -> bool { + matches!(self, Self::Singleton) + } + + pub fn singleton_vnode_bitmap_ref() -> &'static Arc { /// A bitmap that only the default vnode is set. - static FALLBACK_VNODES: LazyLock> = LazyLock::new(|| { + static SINGLETON_VNODES: LazyLock> = LazyLock::new(|| { let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); - vnodes.set(DEFAULT_VNODE.to_index(), true); + vnodes.set(SINGLETON_VNODE.to_index(), true); vnodes.finish().into() }); - FALLBACK_VNODES.clone() + SINGLETON_VNODES.deref() + } + + pub fn singleton_vnode_bitmap() -> Arc { + Self::singleton_vnode_bitmap_ref().clone() } pub fn all_vnodes() -> Arc { @@ -78,13 +129,81 @@ impl Distribution { ALL_VNODES.clone() } - /// Distribution that accesses all vnodes, mainly used for tests. + /// Distribution that accesses all vnodes pub fn all(dist_key_in_pk_indices: Vec) -> Self { - Self { + Self::DistKeyInPkIndices { dist_key_in_pk_indices, vnodes: Self::all_vnodes(), } } + + /// Fallback distribution for singleton or tests. + pub fn singleton() -> Self { + Self::Singleton + } + + pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { + match self { + TableDistribution::Singleton => { + let ret = Self::singleton_vnode_bitmap(); + if ret != new_vnodes { + warn!(?new_vnodes, "update vnode on singleton distribution"); + } + ret + } + TableDistribution::DistKeyInPkIndices { ref mut vnodes, .. } + | TableDistribution::VnodeColumnIndex { ref mut vnodes, .. } => { + assert_eq!(vnodes.len(), new_vnodes.len()); + replace(vnodes, new_vnodes) + } + } + } + + pub fn vnodes(&self) -> &Arc { + match self { + TableDistribution::Singleton => TableDistribution::singleton_vnode_bitmap_ref(), + TableDistribution::DistKeyInPkIndices { vnodes, .. } + | TableDistribution::VnodeColumnIndex { vnodes, .. } => vnodes, + } + } + + /// Get vnode value with given primary key. + pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { + match &self { + TableDistribution::Singleton => SINGLETON_VNODE, + TableDistribution::DistKeyInPkIndices { + dist_key_in_pk_indices, + vnodes, + } => compute_vnode(pk, dist_key_in_pk_indices, vnodes), + TableDistribution::VnodeColumnIndex { + vnode_col_idx_in_pk, + vnodes, + } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, vnodes), + } + } + + pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option { + match self { + TableDistribution::Singleton => Some(SINGLETON_VNODE), + TableDistribution::DistKeyInPkIndices { + dist_key_in_pk_indices, + vnodes, + } => dist_key_in_pk_indices + .iter() + .all(|&d| d < pk_prefix.len()) + .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, vnodes)), + TableDistribution::VnodeColumnIndex { + vnode_col_idx_in_pk, + vnodes, + } => { + if *vnode_col_idx_in_pk >= pk_prefix.len() { + None + } else { + Some(get_vnode_from_row(pk_prefix, *vnode_col_idx_in_pk, vnodes)) + } + } + } + } } // TODO: GAT-ify this trait or remove this trait @@ -159,45 +278,60 @@ pub fn get_second(arg: Result<(T, U), E>) -> Result { /// Get vnode value with `indices` on the given `row`. pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode { - let vnode = if indices.is_empty() { - DEFAULT_VNODE - } else { - let vnode = VirtualNode::compute_row(&row, indices); - check_vnode_is_set(vnode, vnodes); - vnode - }; + assert!(!indices.is_empty()); + let vnode = VirtualNode::compute_row(&row, indices); + check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode); vnode } -/// Get vnode values with `indices` on the given `chunk`. -pub fn compute_chunk_vnode( - chunk: &DataChunk, - dist_key_in_pk_indices: &[usize], - pk_indices: &[usize], - vnodes: &Bitmap, -) -> Vec { - if dist_key_in_pk_indices.is_empty() { - vec![DEFAULT_VNODE; chunk.capacity()] - } else { - let dist_key_indices = dist_key_in_pk_indices - .iter() - .map(|idx| pk_indices[*idx]) - .collect_vec(); +pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode { + let vnode = VirtualNode::from_datum(row.datum_at(index)); + check_vnode_is_set(vnode, vnodes); - VirtualNode::compute_chunk(chunk, &dist_key_indices) - .into_iter() - .zip_eq_fast(chunk.visibility().iter()) - .map(|(vnode, vis)| { - // Ignore the invisible rows. - if vis { - check_vnode_is_set(vnode, vnodes); - } - vnode - }) - .collect() + tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode); + + vnode +} + +impl TableDistribution { + /// Get vnode values with `indices` on the given `chunk`. + pub fn compute_chunk_vnode(&self, chunk: &DataChunk, pk_indices: &[usize]) -> Vec { + match self { + TableDistribution::Singleton => { + vec![SINGLETON_VNODE; chunk.capacity()] + } + TableDistribution::DistKeyInPkIndices { + dist_key_in_pk_indices, + vnodes, + } => { + let dist_key_indices = dist_key_in_pk_indices + .iter() + .map(|idx| pk_indices[*idx]) + .collect_vec(); + + VirtualNode::compute_chunk(chunk, &dist_key_indices) + .into_iter() + .zip_eq_fast(chunk.visibility().iter()) + .map(|(vnode, vis)| { + // Ignore the invisible rows. + if vis { + check_vnode_is_set(vnode, vnodes); + } + vnode + }) + .collect() + } + TableDistribution::VnodeColumnIndex { + vnode_col_idx_in_pk, + vnodes, + } => chunk + .rows() + .map(|row| get_vnode_from_row(row, pk_indices[*vnode_col_idx_in_pk], vnodes)) + .collect(), + } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index ad2d08b42f51c..6024e65b98645 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -47,7 +47,7 @@ use risingwave_storage::error::StorageError; use risingwave_storage::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::StateStoreReadIterStream; -use risingwave_storage::table::{compute_vnode, Distribution}; +use risingwave_storage::table::{compute_vnode, TableDistribution, SINGLETON_VNODE}; use crate::common::log_store_impl::kv_log_store::{ KvLogStoreReadMetrics, ReaderTruncationOffsetType, RowOpCodeType, SeqIdType, @@ -113,7 +113,7 @@ pub(crate) struct LogStoreRowSerde { /// Only the rows whose vnode of the primary key is in this set will be visible to the /// executor. The table will also check whether the written rows /// conform to this partition. - vnodes: Arc, + vnodes: Option>, /// The schema of payload payload_schema: Vec, @@ -153,12 +153,6 @@ impl LogStoreRowSerde { let row_serde = BasicSerde::new(input_value_indices.into(), table_columns.into()); - let vnodes = match vnodes { - Some(vnodes) => vnodes, - - None => Distribution::fallback_vnodes(), - }; - // epoch and seq_id. The seq_id of barrier is set null, and therefore the second order type // is nulls last let pk_serde = OrderedRowSerde::new( @@ -179,12 +173,23 @@ impl LogStoreRowSerde { } } - pub(crate) fn update_vnode_bitmap(&mut self, vnodes: Arc) { - self.vnodes = vnodes; + pub(crate) fn update_vnode_bitmap(&mut self, new_vnodes: Arc) { + match &mut self.vnodes { + Some(vnodes) => { + *vnodes = new_vnodes; + } + None => { + if new_vnodes != TableDistribution::singleton_vnode_bitmap() { + warn!(?new_vnodes, "call update vnode on singletone log store"); + } + } + } } pub(crate) fn vnodes(&self) -> &Arc { - &self.vnodes + self.vnodes + .as_ref() + .unwrap_or_else(|| TableDistribution::singleton_vnode_bitmap_ref()) } pub(crate) fn encode_epoch(epoch: u64) -> i64 { @@ -197,6 +202,13 @@ impl LogStoreRowSerde { } impl LogStoreRowSerde { + fn compute_vnode(&self, row: impl Row) -> VirtualNode { + match &self.vnodes { + None => SINGLETON_VNODE, + Some(vnodes) => compute_vnode(row, &self.dist_key_indices, vnodes), + } + } + pub(crate) fn serialize_data_row( &self, epoch: u64, @@ -218,7 +230,7 @@ impl LogStoreRowSerde { .clone() .chain([Some(ScalarImpl::Int16(op_code))]) .chain(row); - let vnode = compute_vnode(&extended_row, &self.dist_key_indices, &self.vnodes); + let vnode = self.compute_vnode(&extended_row); let key_bytes = serialize_pk_with_vnode(&pk, &self.pk_serde, vnode); let value_bytes = self.row_serde.serialize(extended_row).into(); (vnode, key_bytes, value_bytes) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 82697294cf29f..6657f27188179 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -58,7 +58,7 @@ use risingwave_storage::store::{ ReadOptions, SealCurrentEpochOptions, StateStoreIterItemStream, }; use risingwave_storage::table::merge_sort::merge_sort; -use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution, KeyedRow}; +use risingwave_storage::table::{KeyedRow, TableDistribution}; use risingwave_storage::StateStore; use tracing::{trace, Instrument}; @@ -108,31 +108,18 @@ pub struct StateTableInner< // FIXME: revisit constructions and usages. pk_indices: Vec, - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the all columns of the table, instead of the output ones. - // FIXME: revisit constructions and usages. - // dist_key_indices: Vec, - - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the primary key columns by `pk_indices`. - dist_key_in_pk_indices: Vec, - - prefix_hint_len: usize, - - /// Virtual nodes that the table is partitioned into. + /// Distribution of the state table. /// - /// Only the rows whose vnode of the primary key is in this set will be visible to the + /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the /// executor. The table will also check whether the written rows /// conform to this partition. - vnodes: Arc, + distribution: TableDistribution, + + prefix_hint_len: usize, /// Used for catalog table_properties table_option: TableOption, - /// An optional column index which is the vnode of each row computed by the table's consistent - /// hash distribution. - vnode_col_idx_in_pk: Option, - value_indices: Option>, /// Strategy to buffer watermark for lazy state cleaning. @@ -322,21 +309,20 @@ where .collect() }; + let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| { + let vnode_col_idx = *idx as usize; + pk_indices.iter().position(|&i| vnode_col_idx == i) + }); + + let distribution = + TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + let pk_data_types = pk_indices .iter() .map(|i| table_columns[*i].data_type.clone()) .collect(); let pk_serde = OrderedRowSerde::new(pk_data_types, order_types); - let vnodes = match vnodes { - Some(vnodes) => vnodes, - - None => Distribution::fallback_vnodes(), - }; - let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| { - let vnode_col_idx = *idx as usize; - pk_indices.iter().position(|&i| vnode_col_idx == i) - }); let input_value_indices = table_catalog .value_indices .iter() @@ -428,11 +414,9 @@ where pk_serde, row_serde, pk_indices, - dist_key_in_pk_indices, + distribution, prefix_hint_len, - vnodes, table_option, - vnode_col_idx_in_pk, value_indices, watermark_buffer_strategy: W::default(), state_clean_watermark: None, @@ -458,7 +442,7 @@ where columns, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), None, ) .await @@ -479,7 +463,7 @@ where columns, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), Some(value_indices), ) .await @@ -499,7 +483,7 @@ where columns, order_types, pk_indices, - Distribution::fallback(), + TableDistribution::singleton(), None, false, ) @@ -514,7 +498,7 @@ where table_columns: Vec, order_types: Vec, pk_indices: Vec, - distribution: Distribution, + distribution: TableDistribution, value_indices: Option>, ) -> Self { Self::new_with_distribution_inner( @@ -536,7 +520,7 @@ where table_columns: Vec, order_types: Vec, pk_indices: Vec, - distribution: Distribution, + distribution: TableDistribution, value_indices: Option>, ) -> Self { Self::new_with_distribution_inner( @@ -559,10 +543,7 @@ where table_columns: Vec, order_types: Vec, pk_indices: Vec, - Distribution { - dist_key_in_pk_indices, - vnodes, - }: Distribution, + distribution: TableDistribution, value_indices: Option>, is_consistent_op: bool, ) -> Self { @@ -612,11 +593,9 @@ where pk_serde, row_serde, pk_indices, - dist_key_in_pk_indices, + distribution, prefix_hint_len: 0, - vnodes, table_option: Default::default(), - vnode_col_idx_in_pk: None, value_indices, watermark_buffer_strategy: W::default(), state_clean_watermark: None, @@ -636,17 +615,6 @@ where self.table_id.table_id } - /// Returns whether the table is a singleton table. - fn is_singleton(&self) -> bool { - // If the table has a vnode column, it must be hash-distributed (but act like a singleton - // table). So we should return false here. Otherwise, we check the distribution key. - if self.vnode_col_idx_in_pk.is_some() { - false - } else { - self.dist_key_in_pk_indices.is_empty() - } - } - /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called pub fn epoch(&self) -> u64 { self.local_store.epoch() @@ -654,25 +622,9 @@ where /// Get the vnode value with given (prefix of) primary key fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode { - let prefix_len = pk_prefix.len(); - if let Some(vnode_col_idx_in_pk) = self.vnode_col_idx_in_pk { - let vnode = pk_prefix.datum_at(vnode_col_idx_in_pk).unwrap(); - VirtualNode::from_scalar(vnode.into_int16()) - } else { - // For streaming, the given prefix must be enough to calculate the vnode - assert!(self.dist_key_in_pk_indices.iter().all(|&d| d < prefix_len)); - compute_vnode(pk_prefix, &self.dist_key_in_pk_indices, &self.vnodes) - } - } - - /// Get the vnode value of the given row - // pub fn compute_vnode(&self, row: impl Row) -> VirtualNode { - // compute_vnode(row, &self.dist_key_indices, &self.vnodes) - // } - - /// Get the vnode value of the given row - pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { - compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes) + self.distribution + .try_compute_vnode_by_pk_prefix(pk_prefix) + .expect("For streaming, the given prefix must be enough to calculate the vnode") } /// NOTE(kwannoel): This is used by backfill. @@ -696,12 +648,8 @@ where &self.pk_serde } - // pub fn dist_key_indices(&self) -> &[usize] { - // &self.dist_key_indices - // } - pub fn vnodes(&self) -> &Arc { - &self.vnodes + self.distribution.vnodes() } pub fn value_indices(&self) -> &Option> { @@ -711,10 +659,6 @@ where fn is_dirty(&self) -> bool { self.local_store.is_dirty() || self.state_clean_watermark.is_some() } - - pub fn vnode_bitmap(&self) -> &Bitmap { - &self.vnodes - } } impl StateTableInner @@ -773,8 +717,11 @@ where debug_assert_eq!(self.prefix_hint_len, pk.len()); } - let serialized_pk = - serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_prefix_vnode(&pk)); + let serialized_pk = serialize_pk_with_vnode( + &pk, + &self.pk_serde, + self.distribution.compute_vnode_by_pk(&pk), + ); let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() { Some(serialized_pk.slice(VirtualNode::SIZE..)) @@ -822,15 +769,16 @@ where !self.is_dirty(), "vnode bitmap should only be updated when state table is clean" ); - if self.is_singleton() { + if self.distribution.is_singleton() { assert_eq!( - new_vnodes, self.vnodes, + &new_vnodes, + self.vnodes(), "should not update vnode bitmap for singleton table" ); } - assert_eq!(self.vnodes.len(), new_vnodes.len()); + assert_eq!(self.vnodes().len(), new_vnodes.len()); - let cache_may_stale = cache_may_stale(&self.vnodes, &new_vnodes); + let cache_may_stale = cache_may_stale(self.vnodes(), &new_vnodes); if cache_may_stale { self.state_clean_watermark = None; @@ -840,7 +788,7 @@ where } ( - std::mem::replace(&mut self.vnodes, new_vnodes), + self.distribution.update_vnode_bitmap(new_vnodes), cache_may_stale, ) } @@ -920,7 +868,11 @@ where self.watermark_cache.insert(&pk); } - let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_prefix_vnode(pk)); + let key_bytes = serialize_pk_with_vnode( + pk, + &self.pk_serde, + self.distribution.compute_vnode_by_pk(pk), + ); let value_bytes = self.serialize_value(value); self.insert_inner(key_bytes, value_bytes); } @@ -934,7 +886,11 @@ where self.watermark_cache.delete(&pk); } - let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_prefix_vnode(pk)); + let key_bytes = serialize_pk_with_vnode( + pk, + &self.pk_serde, + self.distribution.compute_vnode_by_pk(pk), + ); let value_bytes = self.serialize_value(old_value); self.delete_inner(key_bytes, value_bytes); } @@ -948,8 +904,11 @@ where "pk should not change: {old_pk:?} vs {new_pk:?}", ); - let new_key_bytes = - serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_prefix_vnode(new_pk)); + let new_key_bytes = serialize_pk_with_vnode( + new_pk, + &self.pk_serde, + self.distribution.compute_vnode_by_pk(new_pk), + ); let old_value_bytes = self.serialize_value(old_value); let new_value_bytes = self.serialize_value(new_value); @@ -961,8 +920,11 @@ where /// `op_consistency_level` should be set to `Inconsistent`. pub fn update_without_old_value(&mut self, new_value: impl Row) { let new_pk = (&new_value).project(self.pk_indices()); - let new_key_bytes = - serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_prefix_vnode(new_pk)); + let new_key_bytes = serialize_pk_with_vnode( + new_pk, + &self.pk_serde, + self.distribution.compute_vnode_by_pk(new_pk), + ); let new_value_bytes = self.serialize_value(new_value); self.update_inner(new_key_bytes, None, new_value_bytes); @@ -992,12 +954,9 @@ where }; let (chunk, op) = chunk.into_parts(); - let vnodes = compute_chunk_vnode( - &chunk, - &self.dist_key_in_pk_indices, - &self.pk_indices, - &self.vnodes, - ); + let vnodes = self + .distribution + .compute_chunk_vnode(&chunk, &self.pk_indices); let values = if let Some(ref value_indices) = self.value_indices { chunk.project(value_indices).serialize_with(&self.row_serde) @@ -1209,7 +1168,7 @@ where // Compute Delete Ranges if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix { trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{ - self.vnodes.iter_vnodes().collect_vec() + self.vnodes().iter_vnodes().collect_vec() }, "delete range"); if prefix_serializer .as_ref() @@ -1222,7 +1181,7 @@ where seal_watermark = Some(( WatermarkDirection::Ascending, VnodeWatermark::new( - self.vnodes.clone(), + self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), )); @@ -1230,7 +1189,7 @@ where seal_watermark = Some(( WatermarkDirection::Descending, VnodeWatermark::new( - self.vnodes.clone(), + self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), )); @@ -1416,10 +1375,6 @@ where .map_err(StreamExecutorError::from) } - pub fn get_vnodes(&self) -> Arc { - self.vnodes.clone() - } - /// Returns: /// false: the provided pk prefix is absent in state store. /// true: the provided pk prefix may or may not be present in state store. diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 20038c86a1b7d..1a19a3fe201fc 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -405,7 +405,7 @@ pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Optio pub(crate) async fn get_progress_per_vnode( state_table: &StateTableInner, ) -> StreamExecutorResult> { - debug_assert!(!state_table.vnode_bitmap().is_empty()); + debug_assert!(!state_table.vnodes().is_empty()); let vnodes = state_table.vnodes().iter_vnodes(); let mut result = Vec::with_capacity(state_table.vnodes().len()); let vnode_keys = vnodes.map(|vnode| { diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index c9dffddfbb7ee..1f2388f6ed4f0 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -453,7 +453,7 @@ impl DynamicFilterExecutor SortBuffer { ); let streams: Vec<_> = - futures::future::try_join_all(buffer_table.vnode_bitmap().iter_vnodes().map(|vnode| { + futures::future::try_join_all(buffer_table.vnodes().iter_vnodes().map(|vnode| { buffer_table.iter_with_vnode( vnode, &pk_range, diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 26daf633fe11d..454f69582981b 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -246,8 +246,7 @@ impl WatermarkFilterExecutor { last_checkpoint_watermark = current_watermark.clone(); // Persist the watermark when checkpoint arrives. if let Some(watermark) = current_watermark.clone() { - let vnodes = table.get_vnodes(); - for vnode in vnodes.iter_vnodes() { + for vnode in table.vnodes().clone().iter_vnodes() { let pk = Some(ScalarImpl::Int16(vnode.to_scalar())); let row = [pk, Some(watermark.clone())]; // This is an upsert. @@ -353,7 +352,7 @@ mod tests { use risingwave_common::types::Date; use risingwave_common::util::sort_util::OrderType; use risingwave_storage::memory::MemoryStateStore; - use risingwave_storage::table::Distribution; + use risingwave_storage::table::TableDistribution; use super::*; use crate::executor::test_utils::expr::build_from_pretty; @@ -383,7 +382,7 @@ mod tests { column_descs, order_types.to_vec(), pk_indices.to_vec(), - Distribution::all(vec![0]), + TableDistribution::all(vec![0]), Some(val_indices.to_vec()), ) .await From 319059fbee19c453ca910e3472b9a0278f4f269c Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 12 Jan 2024 13:36:20 +0800 Subject: [PATCH 2/8] refactor --- src/storage/src/table/mod.rs | 16 ++++++++-------- .../common/log_store_impl/kv_log_store/serde.rs | 8 +++++++- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 240e2d2c85727..a48563e7c97d5 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -41,7 +41,7 @@ pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO; /// Represents the distribution for a specific table instance. pub enum TableDistribution { Singleton, - DistKeyInPkIndices { + DistKeyIndices { /// Indices of distribution key for computing vnode, based on the all columns of the table. dist_key_in_pk_indices: Vec, @@ -88,7 +88,7 @@ impl TableDistribution { vnodes, } } else if !dist_key_in_pk_indices.is_empty() { - Self::DistKeyInPkIndices { + Self::DistKeyIndices { dist_key_in_pk_indices, vnodes, } @@ -131,7 +131,7 @@ impl TableDistribution { /// Distribution that accesses all vnodes pub fn all(dist_key_in_pk_indices: Vec) -> Self { - Self::DistKeyInPkIndices { + Self::DistKeyIndices { dist_key_in_pk_indices, vnodes: Self::all_vnodes(), } @@ -151,7 +151,7 @@ impl TableDistribution { } ret } - TableDistribution::DistKeyInPkIndices { ref mut vnodes, .. } + TableDistribution::DistKeyIndices { ref mut vnodes, .. } | TableDistribution::VnodeColumnIndex { ref mut vnodes, .. } => { assert_eq!(vnodes.len(), new_vnodes.len()); replace(vnodes, new_vnodes) @@ -162,7 +162,7 @@ impl TableDistribution { pub fn vnodes(&self) -> &Arc { match self { TableDistribution::Singleton => TableDistribution::singleton_vnode_bitmap_ref(), - TableDistribution::DistKeyInPkIndices { vnodes, .. } + TableDistribution::DistKeyIndices { vnodes, .. } | TableDistribution::VnodeColumnIndex { vnodes, .. } => vnodes, } } @@ -171,7 +171,7 @@ impl TableDistribution { pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { match &self { TableDistribution::Singleton => SINGLETON_VNODE, - TableDistribution::DistKeyInPkIndices { + TableDistribution::DistKeyIndices { dist_key_in_pk_indices, vnodes, } => compute_vnode(pk, dist_key_in_pk_indices, vnodes), @@ -185,7 +185,7 @@ impl TableDistribution { pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option { match self { TableDistribution::Singleton => Some(SINGLETON_VNODE), - TableDistribution::DistKeyInPkIndices { + TableDistribution::DistKeyIndices { dist_key_in_pk_indices, vnodes, } => dist_key_in_pk_indices @@ -303,7 +303,7 @@ impl TableDistribution { TableDistribution::Singleton => { vec![SINGLETON_VNODE; chunk.capacity()] } - TableDistribution::DistKeyInPkIndices { + TableDistribution::DistKeyIndices { dist_key_in_pk_indices, vnodes, } => { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 6024e65b98645..2da7754d8b780 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -205,7 +205,13 @@ impl LogStoreRowSerde { fn compute_vnode(&self, row: impl Row) -> VirtualNode { match &self.vnodes { None => SINGLETON_VNODE, - Some(vnodes) => compute_vnode(row, &self.dist_key_indices, vnodes), + Some(vnodes) => { + if self.dist_key_indices.is_empty() { + SINGLETON_VNODE + } else { + compute_vnode(row, &self.dist_key_indices, vnodes) + } + } } } From af6bc0738cec8458240e8668c0fe5e92de836bdb Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 12 Jan 2024 14:35:43 +0800 Subject: [PATCH 3/8] include invisible row in chunk vnode list --- src/common/src/array/mod.rs | 2 +- src/common/src/array/primitive_array.rs | 2 +- src/storage/src/table/mod.rs | 25 ++++++++++++++++++++----- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 64218af99f494..ef2caa8daa26a 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -238,7 +238,7 @@ pub trait Array: /// /// The raw iterator simply iterates values without checking the null bitmap. /// The returned value for NULL values is undefined. - fn raw_iter(&self) -> impl DoubleEndedIterator> { + fn raw_iter(&self) -> impl ExactSizeIterator> { (0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) }) } diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 62ea40ac6e19f..844774f9f69e8 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -208,7 +208,7 @@ impl Array for PrimitiveArray { *self.data.get_unchecked(idx) } - fn raw_iter(&self) -> impl DoubleEndedIterator> { + fn raw_iter(&self) -> impl ExactSizeIterator> { self.data.iter().cloned() } diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index a48563e7c97d5..918aba955502c 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -22,7 +22,7 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use futures::{Stream, StreamExt}; use itertools::Itertools; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{Array, DataChunk, PrimitiveArray}; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; use risingwave_common::hash::VirtualNode; @@ -298,6 +298,8 @@ pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> Virtu impl TableDistribution { /// Get vnode values with `indices` on the given `chunk`. + /// + /// Vnode of invisible rows will be included. Only the vnode of visible row check if it's accessible pub fn compute_chunk_vnode(&self, chunk: &DataChunk, pk_indices: &[usize]) -> Vec { match self { TableDistribution::Singleton => { @@ -327,10 +329,23 @@ impl TableDistribution { TableDistribution::VnodeColumnIndex { vnode_col_idx_in_pk, vnodes, - } => chunk - .rows() - .map(|row| get_vnode_from_row(row, pk_indices[*vnode_col_idx_in_pk], vnodes)) - .collect(), + } => { + let array: &PrimitiveArray = + chunk.columns()[pk_indices[*vnode_col_idx_in_pk]].as_int16(); + array + .raw_iter() + .zip_eq_fast(array.null_bitmap().iter()) + .zip_eq_fast(chunk.visibility().iter()) + .map(|((vnode, exist), vis)| { + let vnode = VirtualNode::from_scalar(vnode); + if vis { + assert!(exist); + check_vnode_is_set(vnode, vnodes); + } + vnode + }) + .collect_vec() + } } } } From aef7869f7e9adc195d3d3937534d207d9c3d74fc Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 12 Jan 2024 17:59:09 +0800 Subject: [PATCH 4/8] disable vnode check temporarily --- src/storage/src/table/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 918aba955502c..20293786d8903 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -287,9 +287,10 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu vnode } -pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode { +pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode { let vnode = VirtualNode::from_datum(row.datum_at(index)); - check_vnode_is_set(vnode, vnodes); + // TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark + // check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode); From e6feb44b9ae57c6e5622bf8388800c1b03f2b8de Mon Sep 17 00:00:00 2001 From: William Wen Date: Sat, 13 Jan 2024 14:16:13 +0800 Subject: [PATCH 5/8] decouple passed vnode bitmap and compute vnode method --- src/storage/src/table/mod.rs | 159 ++++++++---------- .../log_store_impl/kv_log_store/serde.rs | 52 +++--- src/stream/src/common/table/state_table.rs | 38 ++--- 3 files changed, 112 insertions(+), 137 deletions(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 20293786d8903..aee8fcbbd725e 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -38,25 +38,28 @@ use crate::error::StorageResult; pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO; #[derive(Debug, Clone)] -/// Represents the distribution for a specific table instance. -pub enum TableDistribution { +enum ComputeVnode { Singleton, DistKeyIndices { /// Indices of distribution key for computing vnode, based on the all columns of the table. dist_key_in_pk_indices: Vec, - - /// Virtual nodes that the table is partitioned into. - vnodes: Arc, }, VnodeColumnIndex { /// Indices of vnode columns. vnode_col_idx_in_pk: usize, - - /// Virtual nodes that the table is partitioned into. - vnodes: Arc, }, } +#[derive(Debug, Clone)] +/// Represents the distribution for a specific table instance. +pub struct TableDistribution { + /// The way to compute vnode provided primary key + compute_vnode: ComputeVnode, + + /// Virtual nodes that the table is partitioned into. + vnodes: Arc, +} + pub const SINGLETON_VNODE: VirtualNode = DEFAULT_VNODE; impl TableDistribution { @@ -65,46 +68,31 @@ impl TableDistribution { dist_key_in_pk_indices: Vec, vnode_col_idx_in_pk: Option, ) -> Self { - match vnodes { - None => { - if !dist_key_in_pk_indices.is_empty() { - warn!( - ?dist_key_in_pk_indices, - "has dist key but no vnodes provided" - ); - } - if vnode_col_idx_in_pk.is_some() { - warn!( - vnode_col_idx_in_pk = vnode_col_idx_in_pk.unwrap(), - "has vnode col idx in pk but no vnodes provided" - ); - } - Self::Singleton + let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { + ComputeVnode::VnodeColumnIndex { + vnode_col_idx_in_pk, } - Some(vnodes) => { - if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { - Self::VnodeColumnIndex { - vnode_col_idx_in_pk, - vnodes, - } - } else if !dist_key_in_pk_indices.is_empty() { - Self::DistKeyIndices { - dist_key_in_pk_indices, - vnodes, - } - } else { - warn!( - ?vnodes, - "no dist key or vnode col idx provided but provided vnodes" - ); - Self::Singleton - } + } else if !dist_key_in_pk_indices.is_empty() { + ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, } + } else { + ComputeVnode::Singleton + }; + + let vnodes = vnodes.unwrap_or_else(Self::singleton_vnode_bitmap); + if let ComputeVnode::Singleton = &compute_vnode { + assert!(vnodes.is_set(SINGLETON_VNODE.to_index())); + } + + Self { + compute_vnode, + vnodes, } } pub fn is_singleton(&self) -> bool { - matches!(self, Self::Singleton) + matches!(&self.compute_vnode, ComputeVnode::Singleton) } pub fn singleton_vnode_bitmap_ref() -> &'static Arc { @@ -129,77 +117,76 @@ impl TableDistribution { ALL_VNODES.clone() } - /// Distribution that accesses all vnodes + /// Distribution that accesses all vnodes, mainly used for tests. pub fn all(dist_key_in_pk_indices: Vec) -> Self { - Self::DistKeyIndices { - dist_key_in_pk_indices, + Self { + compute_vnode: ComputeVnode::DistKeyIndices { + dist_key_in_pk_indices, + }, vnodes: Self::all_vnodes(), } } /// Fallback distribution for singleton or tests. pub fn singleton() -> Self { - Self::Singleton + Self { + compute_vnode: ComputeVnode::Singleton, + vnodes: Self::singleton_vnode_bitmap(), + } } pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { - match self { - TableDistribution::Singleton => { - let ret = Self::singleton_vnode_bitmap(); - if ret != new_vnodes { - warn!(?new_vnodes, "update vnode on singleton distribution"); - } - ret - } - TableDistribution::DistKeyIndices { ref mut vnodes, .. } - | TableDistribution::VnodeColumnIndex { ref mut vnodes, .. } => { - assert_eq!(vnodes.len(), new_vnodes.len()); - replace(vnodes, new_vnodes) + if self.is_singleton() { + if &new_vnodes != Self::singleton_vnode_bitmap_ref() { + warn!(?new_vnodes, "update vnode on singleton distribution"); } + assert!( + new_vnodes.is_set(SINGLETON_VNODE.to_index()), + "singleton distribution get vnode bitmap without SINGLETON_VNODE: {:?}", + new_vnodes + ); } + assert_eq!(self.vnodes.len(), new_vnodes.len()); + replace(&mut self.vnodes, new_vnodes) } pub fn vnodes(&self) -> &Arc { - match self { - TableDistribution::Singleton => TableDistribution::singleton_vnode_bitmap_ref(), - TableDistribution::DistKeyIndices { vnodes, .. } - | TableDistribution::VnodeColumnIndex { vnodes, .. } => vnodes, - } + &self.vnodes } /// Get vnode value with given primary key. pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { - match &self { - TableDistribution::Singleton => SINGLETON_VNODE, - TableDistribution::DistKeyIndices { + match &self.compute_vnode { + ComputeVnode::Singleton => SINGLETON_VNODE, + ComputeVnode::DistKeyIndices { dist_key_in_pk_indices, - vnodes, - } => compute_vnode(pk, dist_key_in_pk_indices, vnodes), - TableDistribution::VnodeColumnIndex { + } => compute_vnode(pk, dist_key_in_pk_indices, &self.vnodes), + ComputeVnode::VnodeColumnIndex { vnode_col_idx_in_pk, - vnodes, - } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, vnodes), + } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, &self.vnodes), } } pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option { - match self { - TableDistribution::Singleton => Some(SINGLETON_VNODE), - TableDistribution::DistKeyIndices { + match &self.compute_vnode { + ComputeVnode::Singleton => Some(SINGLETON_VNODE), + ComputeVnode::DistKeyIndices { dist_key_in_pk_indices, - vnodes, } => dist_key_in_pk_indices .iter() .all(|&d| d < pk_prefix.len()) - .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, vnodes)), - TableDistribution::VnodeColumnIndex { + .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, &self.vnodes)), + ComputeVnode::VnodeColumnIndex { vnode_col_idx_in_pk, - vnodes, } => { if *vnode_col_idx_in_pk >= pk_prefix.len() { None } else { - Some(get_vnode_from_row(pk_prefix, *vnode_col_idx_in_pk, vnodes)) + Some(get_vnode_from_row( + pk_prefix, + *vnode_col_idx_in_pk, + &self.vnodes, + )) } } } @@ -302,13 +289,12 @@ impl TableDistribution { /// /// Vnode of invisible rows will be included. Only the vnode of visible row check if it's accessible pub fn compute_chunk_vnode(&self, chunk: &DataChunk, pk_indices: &[usize]) -> Vec { - match self { - TableDistribution::Singleton => { + match &self.compute_vnode { + ComputeVnode::Singleton => { vec![SINGLETON_VNODE; chunk.capacity()] } - TableDistribution::DistKeyIndices { + ComputeVnode::DistKeyIndices { dist_key_in_pk_indices, - vnodes, } => { let dist_key_indices = dist_key_in_pk_indices .iter() @@ -321,15 +307,14 @@ impl TableDistribution { .map(|(vnode, vis)| { // Ignore the invisible rows. if vis { - check_vnode_is_set(vnode, vnodes); + check_vnode_is_set(vnode, &self.vnodes); } vnode }) .collect() } - TableDistribution::VnodeColumnIndex { + ComputeVnode::VnodeColumnIndex { vnode_col_idx_in_pk, - vnodes, } => { let array: &PrimitiveArray = chunk.columns()[pk_indices[*vnode_col_idx_in_pk]].as_int16(); @@ -341,7 +326,7 @@ impl TableDistribution { let vnode = VirtualNode::from_scalar(vnode); if vis { assert!(exist); - check_vnode_is_set(vnode, vnodes); + check_vnode_is_set(vnode, &self.vnodes); } vnode }) diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 2da7754d8b780..8aa75a4552da5 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -106,14 +106,14 @@ pub(crate) struct LogStoreRowSerde { /// Indices of distribution key for computing vnode. /// Note that the index is based on the all columns of the table, instead of the output ones. // FIXME: revisit constructions and usages. - dist_key_indices: Vec, + dist_key_indices: Option>, /// Virtual nodes that the table is partitioned into. /// /// Only the rows whose vnode of the primary key is in this set will be visible to the /// executor. The table will also check whether the written rows /// conform to this partition. - vnodes: Option>, + vnodes: Arc, /// The schema of payload payload_schema: Vec, @@ -153,6 +153,12 @@ impl LogStoreRowSerde { let row_serde = BasicSerde::new(input_value_indices.into(), table_columns.into()); + let vnodes = match vnodes { + Some(vnodes) => vnodes, + + None => TableDistribution::singleton_vnode_bitmap(), + }; + // epoch and seq_id. The seq_id of barrier is set null, and therefore the second order type // is nulls last let pk_serde = OrderedRowSerde::new( @@ -163,6 +169,18 @@ impl LogStoreRowSerde { let epoch_serde = OrderedRowSerde::new(vec![EPOCH_COLUMN_TYPE], vec![OrderType::ascending()]); + let dist_key_indices = if dist_key_indices.is_empty() { + if &vnodes != TableDistribution::singleton_vnode_bitmap_ref() { + warn!( + ?vnodes, + "singleton log store gets non-singleton vnode bitmap" + ); + } + None + } else { + Some(dist_key_indices) + }; + Self { pk_serde, row_serde, @@ -173,23 +191,12 @@ impl LogStoreRowSerde { } } - pub(crate) fn update_vnode_bitmap(&mut self, new_vnodes: Arc) { - match &mut self.vnodes { - Some(vnodes) => { - *vnodes = new_vnodes; - } - None => { - if new_vnodes != TableDistribution::singleton_vnode_bitmap() { - warn!(?new_vnodes, "call update vnode on singletone log store"); - } - } - } + pub(crate) fn update_vnode_bitmap(&mut self, vnodes: Arc) { + self.vnodes = vnodes; } pub(crate) fn vnodes(&self) -> &Arc { - self.vnodes - .as_ref() - .unwrap_or_else(|| TableDistribution::singleton_vnode_bitmap_ref()) + &self.vnodes } pub(crate) fn encode_epoch(epoch: u64) -> i64 { @@ -203,15 +210,10 @@ impl LogStoreRowSerde { impl LogStoreRowSerde { fn compute_vnode(&self, row: impl Row) -> VirtualNode { - match &self.vnodes { - None => SINGLETON_VNODE, - Some(vnodes) => { - if self.dist_key_indices.is_empty() { - SINGLETON_VNODE - } else { - compute_vnode(row, &self.dist_key_indices, vnodes) - } - } + if let Some(dist_key_indices) = &self.dist_key_indices { + compute_vnode(row, dist_key_indices, &self.vnodes) + } else { + SINGLETON_VNODE } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 6657f27188179..269e1dd0490fc 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -627,6 +627,11 @@ where .expect("For streaming, the given prefix must be enough to calculate the vnode") } + /// Get the vnode value of the given primary key + pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { + self.distribution.compute_vnode_by_pk(pk) + } + /// NOTE(kwannoel): This is used by backfill. /// We want to check pk indices of upstream table. pub fn pk_indices(&self) -> &[usize] { @@ -717,11 +722,8 @@ where debug_assert_eq!(self.prefix_hint_len, pk.len()); } - let serialized_pk = serialize_pk_with_vnode( - &pk, - &self.pk_serde, - self.distribution.compute_vnode_by_pk(&pk), - ); + let serialized_pk = + serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk)); let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() { Some(serialized_pk.slice(VirtualNode::SIZE..)) @@ -868,11 +870,7 @@ where self.watermark_cache.insert(&pk); } - let key_bytes = serialize_pk_with_vnode( - pk, - &self.pk_serde, - self.distribution.compute_vnode_by_pk(pk), - ); + let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk)); let value_bytes = self.serialize_value(value); self.insert_inner(key_bytes, value_bytes); } @@ -886,11 +884,7 @@ where self.watermark_cache.delete(&pk); } - let key_bytes = serialize_pk_with_vnode( - pk, - &self.pk_serde, - self.distribution.compute_vnode_by_pk(pk), - ); + let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk)); let value_bytes = self.serialize_value(old_value); self.delete_inner(key_bytes, value_bytes); } @@ -904,11 +898,8 @@ where "pk should not change: {old_pk:?} vs {new_pk:?}", ); - let new_key_bytes = serialize_pk_with_vnode( - new_pk, - &self.pk_serde, - self.distribution.compute_vnode_by_pk(new_pk), - ); + let new_key_bytes = + serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk)); let old_value_bytes = self.serialize_value(old_value); let new_value_bytes = self.serialize_value(new_value); @@ -920,11 +911,8 @@ where /// `op_consistency_level` should be set to `Inconsistent`. pub fn update_without_old_value(&mut self, new_value: impl Row) { let new_pk = (&new_value).project(self.pk_indices()); - let new_key_bytes = serialize_pk_with_vnode( - new_pk, - &self.pk_serde, - self.distribution.compute_vnode_by_pk(new_pk), - ); + let new_key_bytes = + serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk)); let new_value_bytes = self.serialize_value(new_value); self.update_inner(new_key_bytes, None, new_value_bytes); From d346fc9563ce4cfc0b445c20e26096e921de2c7b Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 15 Jan 2024 12:04:44 +0800 Subject: [PATCH 6/8] use RW_TYPE --- src/common/src/hash/consistent_hash/vnode.rs | 4 ---- src/frontend/src/expr/type_inference/func.rs | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 584c6cdd39198..19b8975f775af 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -87,10 +87,6 @@ impl VirtualNode { Self(scalar as _) } - pub const fn vnode_data_type() -> DataType { - DataType::Int16 - } - pub fn from_datum(datum: DatumRef<'_>) -> Self { Self::from_scalar(datum.expect("should not be none").into_int16()) } diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 5619661699233..1ccbaa28e9da1 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -579,7 +579,7 @@ fn infer_type_for_special( } ExprType::Vnode => { ensure_arity!("vnode", 1 <= | inputs |); - Ok(Some(VirtualNode::vnode_data_type())) + Ok(Some(VirtualNode::RW_TYPE)) } ExprType::Greatest | ExprType::Least => { ensure_arity!("greatest/least", 1 <= | inputs |); From 75471c631245c416b270e40089cd96e85d508637 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 15 Jan 2024 15:33:44 +0800 Subject: [PATCH 7/8] warn instead of panic --- src/storage/src/table/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index aee8fcbbd725e..2212fccdf7f9f 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -82,7 +82,12 @@ impl TableDistribution { let vnodes = vnodes.unwrap_or_else(Self::singleton_vnode_bitmap); if let ComputeVnode::Singleton = &compute_vnode { - assert!(vnodes.is_set(SINGLETON_VNODE.to_index())); + if &vnodes != Self::singleton_vnode_bitmap_ref() { + warn!( + ?vnodes, + "singleton distribution get non-singleton vnode bitmap" + ); + } } Self { From 38c77d154c0329c7d370f90007c271ab416f5877 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 15 Jan 2024 15:37:07 +0800 Subject: [PATCH 8/8] remove assert --- src/storage/src/table/mod.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 2212fccdf7f9f..53c16ceea9885 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -141,15 +141,8 @@ impl TableDistribution { } pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { - if self.is_singleton() { - if &new_vnodes != Self::singleton_vnode_bitmap_ref() { - warn!(?new_vnodes, "update vnode on singleton distribution"); - } - assert!( - new_vnodes.is_set(SINGLETON_VNODE.to_index()), - "singleton distribution get vnode bitmap without SINGLETON_VNODE: {:?}", - new_vnodes - ); + if self.is_singleton() && &new_vnodes != Self::singleton_vnode_bitmap_ref() { + warn!(?new_vnodes, "update vnode on singleton distribution"); } assert_eq!(self.vnodes.len(), new_vnodes.len()); replace(&mut self.vnodes, new_vnodes)