diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 1068ffd7f334..74d7843013e4 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -17,8 +17,9 @@ use std::mem::swap; use futures::pin_mut; use itertools::Itertools; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; -use risingwave_common::hash::{HashKey, HashKeyDispatcher}; +use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode}; use risingwave_common::memory::MemoryContext; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; @@ -30,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{TableDistribution, TableIter}; +use risingwave_storage::table::TableIter; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::Result; @@ -194,7 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { .collect(); // Lookup Join always contains distribution key, so we don't need vnode bitmap - let vnodes = Some(TableDistribution::all_vnodes()); + // TODO(var-vnode): use vnode count from table desc + let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into()); dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); let inner_side_builder = InnerSideExecutorBuilder::new( diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index a3be00fc39a2..7c7a08af5d87 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; use anyhow::Context; use itertools::Itertools; -use risingwave_common::bitmap::BitmapBuilder; +use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ @@ -408,12 +408,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { }) .collect(); + // TODO(var-vnode): use vnode count from table desc + let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into()); let inner_side_builder = InnerSideExecutorBuilder { table_desc: table_desc.clone(), - table_distribution: TableDistribution::new_from_storage_table_desc( - Some(TableDistribution::all_vnodes()), - table_desc, - ), + table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc), vnode_mapping, outer_side_key_types, inner_side_schema, diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 7106eaec1b76..be2a11b75694 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -22,13 +22,14 @@ use prometheus::Histogram; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema}; +use risingwave_common::hash::VirtualNode; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ScalarImpl; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{collect_data_chunk, TableDistribution}; +use risingwave_storage::table::collect_data_chunk; use risingwave_storage::{dispatch_state_store, StateStore}; use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; @@ -106,7 +107,8 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. // Or it's single distribution, e.g., distinct agg. We scan in a single executor. - None => Some(TableDistribution::all_vnodes()), + // TODO(var-vnode): use vnode count from table desc + None => Some(Bitmap::ones(VirtualNode::COUNT).into()), }; let chunk_size = source.context.get_config().developer.chunk_size as u32; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index b897dbd81378..7c7244d95476 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -21,6 +21,7 @@ use prometheus::Histogram; use risingwave_common::array::DataChunk; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Schema}; +use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -32,7 +33,6 @@ use risingwave_pb::plan_common::as_of::AsOfType; use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::TableDistribution; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::{BatchError, Result}; @@ -210,7 +210,8 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. // Or it's single distribution, e.g., distinct agg. We scan in a single executor. - None => Some(TableDistribution::all_vnodes()), + // TODO(var-vnode): use vnode count from table desc + None => Some(Bitmap::ones(VirtualNode::COUNT).into()), }; let scan_ranges = { diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index 773231ba36a8..eee6a64a2b42 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -15,6 +15,7 @@ use std::ops::RangeInclusive; use crate::bitmap::Bitmap; +use crate::hash::table_distribution::SINGLETON_VNODE; use crate::hash::VirtualNode; /// An extension trait for `Bitmap` to support virtual node operations. @@ -36,4 +37,17 @@ impl Bitmap { self.high_ranges() .map(|r| (VirtualNode::from_index(*r.start())..=VirtualNode::from_index(*r.end()))) } + + /// Returns whether only the [`SINGLETON_VNODE`] is set in the bitmap. + /// + /// Note that this method returning `true` does not imply that the bitmap was created by + /// [`VnodeBitmapExt::singleton`], or that the bitmap has length 1. + pub fn is_singleton(&self) -> bool { + self.count_ones() == 1 && self.iter_vnodes().next().unwrap() == SINGLETON_VNODE + } + + /// Creates a bitmap with length 1 and the single bit set. + pub fn singleton() -> Self { + Self::ones(1) + } } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index f528544689f3..dd4095535fdf 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -114,6 +114,11 @@ impl VirtualNode { } } +impl VirtualNode { + pub const COUNT_FOR_TEST: usize = Self::COUNT; + pub const MAX_FOR_TEST: VirtualNode = Self::MAX; +} + impl VirtualNode { // `compute_chunk` is used to calculate the `VirtualNode` for the columns in the // chunk. When only one column is provided and its type is `Serial`, we consider the column to diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 9be9cd2abafb..480483bc96a5 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -13,30 +13,34 @@ // limitations under the License. use std::mem::replace; -use std::ops::Deref; use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_pb::plan_common::StorageTableDesc; -use tracing::warn; use crate::array::{Array, DataChunk, PrimitiveArray}; -use crate::bitmap::{Bitmap, BitmapBuilder}; +use crate::bitmap::Bitmap; use crate::hash::VirtualNode; use crate::row::Row; use crate::util::iter_util::ZipEqFast; -/// For tables without distribution (singleton), the `DEFAULT_VNODE` is encoded. -pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO; +/// For tables without distribution (singleton), the `SINGLETON_VNODE` is encoded. +pub const SINGLETON_VNODE: VirtualNode = VirtualNode::ZERO; + +use super::VnodeBitmapExt; #[derive(Debug, Clone)] enum ComputeVnode { Singleton, DistKeyIndices { + /// Virtual nodes that the table is partitioned into. + vnodes: Arc, /// Indices of distribution key for computing vnode, based on the pk columns of the table. dist_key_in_pk_indices: Vec, }, VnodeColumnIndex { + /// Virtual nodes that the table is partitioned into. + vnodes: Arc, /// Index of vnode column. vnode_col_idx_in_pk: usize, }, @@ -47,13 +51,8 @@ enum ComputeVnode { pub struct TableDistribution { /// The way to compute vnode provided primary key compute_vnode: ComputeVnode, - - /// Virtual nodes that the table is partitioned into. - vnodes: Arc, } -pub const SINGLETON_VNODE: VirtualNode = DEFAULT_VNODE; - impl TableDistribution { pub fn new_from_storage_table_desc( vnodes: Option>, @@ -75,69 +74,32 @@ impl TableDistribution { ) -> Self { let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { ComputeVnode::VnodeColumnIndex { + vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton().into()), vnode_col_idx_in_pk, } } else if !dist_key_in_pk_indices.is_empty() { ComputeVnode::DistKeyIndices { + vnodes: vnodes.expect("vnodes must be `Some` as dist key indices are set"), dist_key_in_pk_indices, } } else { ComputeVnode::Singleton }; - let vnodes = vnodes.unwrap_or_else(Self::singleton_vnode_bitmap); - if let ComputeVnode::Singleton = &compute_vnode { - if &vnodes != Self::singleton_vnode_bitmap_ref() && &vnodes != Self::all_vnodes_ref() { - warn!( - ?vnodes, - "singleton distribution get non-singleton vnode bitmap" - ); - } - } - - Self { - compute_vnode, - vnodes, - } + Self { compute_vnode } } pub fn is_singleton(&self) -> bool { matches!(&self.compute_vnode, ComputeVnode::Singleton) } - pub fn singleton_vnode_bitmap_ref() -> &'static Arc { - /// A bitmap that only the default vnode is set. - static SINGLETON_VNODES: LazyLock> = LazyLock::new(|| { - let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); - vnodes.set(SINGLETON_VNODE.to_index(), true); - vnodes.finish().into() - }); - - SINGLETON_VNODES.deref() - } - - pub fn singleton_vnode_bitmap() -> Arc { - Self::singleton_vnode_bitmap_ref().clone() - } - - pub fn all_vnodes_ref() -> &'static Arc { - /// A bitmap that all vnodes are set. - static ALL_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); - &ALL_VNODES - } - - pub fn all_vnodes() -> Arc { - Self::all_vnodes_ref().clone() - } - /// Distribution that accesses all vnodes, mainly used for tests. - pub fn all(dist_key_in_pk_indices: Vec) -> Self { + pub fn all(dist_key_in_pk_indices: Vec, vnode_count: usize) -> Self { Self { compute_vnode: ComputeVnode::DistKeyIndices { + vnodes: Bitmap::ones(vnode_count).into(), dist_key_in_pk_indices, }, - vnodes: Self::all_vnodes(), } } @@ -145,20 +107,39 @@ impl TableDistribution { pub fn singleton() -> Self { Self { compute_vnode: ComputeVnode::Singleton, - vnodes: Self::singleton_vnode_bitmap(), } } pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { - if self.is_singleton() && &new_vnodes != Self::singleton_vnode_bitmap_ref() { - warn!(?new_vnodes, "update vnode on singleton distribution"); + match &mut self.compute_vnode { + ComputeVnode::Singleton => { + if !new_vnodes.is_singleton() { + panic!( + "update vnode bitmap on singleton distribution to non-singleton: {:?}", + new_vnodes + ); + } + self.vnodes().clone() // not updated + } + + ComputeVnode::DistKeyIndices { vnodes, .. } + | ComputeVnode::VnodeColumnIndex { vnodes, .. } => { + assert_eq!(vnodes.len(), new_vnodes.len()); + replace(vnodes, new_vnodes) + } } - assert_eq!(self.vnodes.len(), new_vnodes.len()); - replace(&mut self.vnodes, new_vnodes) } + /// Get vnode bitmap if distributed, or a dummy [`Bitmap::singleton()`] if singleton. pub fn vnodes(&self) -> &Arc { - &self.vnodes + static SINGLETON_VNODES: LazyLock> = + LazyLock::new(|| Bitmap::singleton().into()); + + match &self.compute_vnode { + ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes, + ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes, + ComputeVnode::Singleton => &SINGLETON_VNODES, + } } /// Get vnode value with given primary key. @@ -166,11 +147,13 @@ impl TableDistribution { match &self.compute_vnode { ComputeVnode::Singleton => SINGLETON_VNODE, ComputeVnode::DistKeyIndices { + vnodes, dist_key_in_pk_indices, - } => compute_vnode(pk, dist_key_in_pk_indices, &self.vnodes), + } => compute_vnode(pk, dist_key_in_pk_indices, vnodes), ComputeVnode::VnodeColumnIndex { + vnodes, vnode_col_idx_in_pk, - } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, &self.vnodes), + } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, vnodes), } } @@ -178,22 +161,20 @@ impl TableDistribution { match &self.compute_vnode { ComputeVnode::Singleton => Some(SINGLETON_VNODE), ComputeVnode::DistKeyIndices { + vnodes, dist_key_in_pk_indices, } => dist_key_in_pk_indices .iter() .all(|&d| d < pk_prefix.len()) - .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, &self.vnodes)), + .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, vnodes)), ComputeVnode::VnodeColumnIndex { + vnodes, vnode_col_idx_in_pk, } => { if *vnode_col_idx_in_pk >= pk_prefix.len() { None } else { - Some(get_vnode_from_row( - pk_prefix, - *vnode_col_idx_in_pk, - &self.vnodes, - )) + Some(get_vnode_from_row(pk_prefix, *vnode_col_idx_in_pk, vnodes)) } } } @@ -230,6 +211,7 @@ impl TableDistribution { vec![SINGLETON_VNODE; chunk.capacity()] } ComputeVnode::DistKeyIndices { + vnodes, dist_key_in_pk_indices, } => { let dist_key_indices = dist_key_in_pk_indices @@ -243,13 +225,14 @@ impl TableDistribution { .map(|(vnode, vis)| { // Ignore the invisible rows. if vis { - check_vnode_is_set(vnode, &self.vnodes); + check_vnode_is_set(vnode, vnodes); } vnode }) .collect() } ComputeVnode::VnodeColumnIndex { + vnodes, vnode_col_idx_in_pk, } => { let array: &PrimitiveArray = @@ -262,7 +245,7 @@ impl TableDistribution { let vnode = VirtualNode::from_scalar(vnode); if vis { assert!(exist); - check_vnode_is_set(vnode, &self.vnodes); + check_vnode_is_set(vnode, vnodes); } vnode }) diff --git a/src/common/src/util/scan_range.rs b/src/common/src/util/scan_range.rs index fd056f179044..5d5e84ed3208 100644 --- a/src/common/src/util/scan_range.rs +++ b/src/common/src/util/scan_range.rs @@ -159,7 +159,7 @@ mod tests { let pk = vec![1, 3, 2]; let dist_key_idx_in_pk = crate::catalog::get_dist_key_in_pk_indices(&dist_key, &pk).unwrap(); - let dist = TableDistribution::all(dist_key_idx_in_pk); + let dist = TableDistribution::all(dist_key_idx_in_pk, VirtualNode::COUNT_FOR_TEST); let mut scan_range = ScanRange::full_table_scan(); assert!(scan_range.try_compute_vnode(&dist).is_none()); @@ -185,7 +185,7 @@ mod tests { let pk = vec![1, 3, 2]; let dist_key_idx_in_pk = crate::catalog::get_dist_key_in_pk_indices(&dist_key, &pk).unwrap(); - let dist = TableDistribution::all(dist_key_idx_in_pk); + let dist = TableDistribution::all(dist_key_idx_in_pk, VirtualNode::COUNT_FOR_TEST); let mut scan_range = ScanRange::full_table_scan(); assert!(scan_range.try_compute_vnode(&dist).is_none()); diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index e5bba170bf97..f5cee710a40f 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -14,6 +14,8 @@ use anyhow::{anyhow, Result}; use futures::{pin_mut, StreamExt}; +use risingwave_common::bitmap::Bitmap; +use risingwave_common::hash::VirtualNode; use risingwave_frontend::TableCatalog; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_rpc_client::MetaClient; @@ -63,7 +65,8 @@ pub async fn make_state_table(hummock: S, table: &TableCatalog) - .collect(), table.pk().iter().map(|x| x.order_type).collect(), table.pk().iter().map(|x| x.column_index).collect(), - TableDistribution::all(table.distribution_key().to_vec()), // scan all vnodes + // TODO(var-vnode): use vnode count from table desc + TableDistribution::all(table.distribution_key().to_vec(), VirtualNode::COUNT), // scan all vnodes Some(table.value_indices.clone()), ) .await @@ -81,7 +84,8 @@ pub fn make_storage_table( Ok(StorageTable::new_partial( hummock, output_columns_ids, - Some(TableDistribution::all_vnodes()), + // TODO(var-vnode): use vnode count from table desc + Some(Bitmap::ones(VirtualNode::COUNT).into()), &table.table_desc().try_to_protobuf()?, )) } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 09e4cbc0bfa0..2ecae1d7f764 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1250,7 +1250,8 @@ fn derive_partitions( } let table_distribution = TableDistribution::new_from_storage_table_desc( - Some(TableDistribution::all_vnodes()), + // TODO(var-vnode): use vnode count from table desc + Some(Bitmap::ones(VirtualNode::COUNT).into()), &table_desc.try_to_protobuf()?, ); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 35f3d08a9ed8..67da2150735a 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -24,7 +24,6 @@ use futures::{pin_mut, StreamExt}; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt, MAX_EPOCH}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; @@ -1565,7 +1564,7 @@ async fn test_iter_log() { }, table_option: Default::default(), is_replicated: false, - vnodes: TableDistribution::all_vnodes(), + vnodes: Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into(), }) .await; @@ -1580,7 +1579,7 @@ async fn test_iter_log() { }, table_option: Default::default(), is_replicated: false, - vnodes: TableDistribution::all_vnodes(), + vnodes: Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into(), }) .await; // flush for about 10 times per epoch diff --git a/src/storage/src/hummock/iterator/change_log.rs b/src/storage/src/hummock/iterator/change_log.rs index 6fc99f29a80f..ae8061c37b07 100644 --- a/src/storage/src/hummock/iterator/change_log.rs +++ b/src/storage/src/hummock/iterator/change_log.rs @@ -527,8 +527,9 @@ mod tests { use bytes::Bytes; use itertools::Itertools; + use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; - use risingwave_common::hash::table_distribution::TableDistribution; + use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::key::{TableKey, UserKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -699,7 +700,7 @@ mod tests { }, table_option: Default::default(), is_replicated: false, - vnodes: TableDistribution::all_vnodes(), + vnodes: Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into(), }) .await; let logs = gen_test_data(epoch_count, 10000, 0.05, 0.2); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 92a3caf4cd2e..17ab103d758b 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -42,7 +42,7 @@ use risingwave_storage::error::StorageResult; use risingwave_storage::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::{StateStoreIterExt, StateStoreReadIter}; -use risingwave_storage::table::{compute_vnode, TableDistribution, SINGLETON_VNODE}; +use risingwave_storage::table::{compute_vnode, SINGLETON_VNODE}; use rw_futures_util::select_all; use crate::common::log_store_impl::kv_log_store::{ @@ -201,8 +201,7 @@ impl LogStoreRowSerde { let vnodes = match vnodes { Some(vnodes) => vnodes, - - None => TableDistribution::singleton_vnode_bitmap(), + None => Bitmap::singleton().into(), }; // epoch and seq_id. The seq_id of barrier is set null, and therefore the second order type @@ -216,7 +215,7 @@ impl LogStoreRowSerde { ); let dist_key_indices = if dist_key_indices.is_empty() { - if &vnodes != TableDistribution::singleton_vnode_bitmap_ref() { + if !vnodes.is_singleton() { warn!( ?vnodes, "singleton log store gets non-singleton vnode bitmap" @@ -946,7 +945,7 @@ mod tests { use risingwave_storage::store::{ FromStreamStateStoreIter, StateStoreIterItem, StateStoreReadIter, }; - use risingwave_storage::table::DEFAULT_VNODE; + use risingwave_storage::table::SINGLETON_VNODE; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; @@ -1024,7 +1023,7 @@ mod tests { seq_id += 1; } - let (key, encoded_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, false); + let (key, encoded_barrier) = serde.serialize_barrier(epoch, SINGLETON_VNODE, false); let key = remove_vnode_prefix(&key.0); match serde.deserialize(&encoded_barrier).unwrap() { (decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => { @@ -1062,7 +1061,8 @@ mod tests { seq_id += 1; } - let (key, encoded_checkpoint_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, true); + let (key, encoded_checkpoint_barrier) = + serde.serialize_barrier(epoch, SINGLETON_VNODE, true); let key = remove_vnode_prefix(&key.0); match serde.deserialize(&encoded_checkpoint_barrier).unwrap() { (decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => { @@ -1200,7 +1200,7 @@ mod tests { ) { let (ops, rows) = gen_test_data(base); let first_barrier = { - let (key, value) = serde.serialize_barrier(EPOCH0, DEFAULT_VNODE, true); + let (key, value) = serde.serialize_barrier(EPOCH0, SINGLETON_VNODE, true); Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH0), value)) }; let stream = stream::once(async move { first_barrier }); @@ -1210,7 +1210,7 @@ mod tests { let stream = stream.chain(stream::once({ let serde = serde.clone(); async move { - let (key, value) = serde.serialize_barrier(EPOCH1, DEFAULT_VNODE, false); + let (key, value) = serde.serialize_barrier(EPOCH1, SINGLETON_VNODE, false); Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH1), value)) } })); @@ -1218,7 +1218,7 @@ mod tests { gen_row_stream(serde.clone(), ops.clone(), rows.clone(), EPOCH2, seq_id); let stream = stream.chain(row_stream).chain(stream::once({ async move { - let (key, value) = serde.serialize_barrier(EPOCH2, DEFAULT_VNODE, true); + let (key, value) = serde.serialize_barrier(EPOCH2, SINGLETON_VNODE, true); Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH2), value)) } })); diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 098548c21ac9..dde0d8a58140 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -27,7 +27,7 @@ use risingwave_common::util::value_encoding::BasicSerde; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::table::DEFAULT_VNODE; +use risingwave_storage::table::SINGLETON_VNODE; use crate::common::table::state_table::{ ReplicatedStateTable, StateTable, WatermarkCacheStateTable, @@ -445,7 +445,7 @@ async fn test_state_table_iter_with_pk_range() { std::ops::Bound::Included(OwnedRow::new(vec![Some(4_i32.into())])), ); let iter = state_table - .iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default()) + .iter_with_vnode(SINGLETON_VNODE, &pk_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -470,7 +470,7 @@ async fn test_state_table_iter_with_pk_range() { std::ops::Bound::::Unbounded, ); let iter = state_table - .iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default()) + .iter_with_vnode(SINGLETON_VNODE, &pk_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -1976,11 +1976,11 @@ async fn test_replicated_state_table_replication() { std::ops::Bound::Included(OwnedRow::new(vec![Some(2_i32.into())])), ); let iter = state_table - .iter_with_vnode(DEFAULT_VNODE, &range_bounds, Default::default()) + .iter_with_vnode(SINGLETON_VNODE, &range_bounds, Default::default()) .await .unwrap(); let replicated_iter = replicated_state_table - .iter_with_vnode_and_output_indices(DEFAULT_VNODE, &range_bounds, Default::default()) + .iter_with_vnode_and_output_indices(SINGLETON_VNODE, &range_bounds, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -2039,7 +2039,7 @@ async fn test_replicated_state_table_replication() { ); let iter = state_table - .iter_with_vnode(DEFAULT_VNODE, &range_bounds, Default::default()) + .iter_with_vnode(SINGLETON_VNODE, &range_bounds, Default::default()) .await .unwrap(); @@ -2048,7 +2048,7 @@ async fn test_replicated_state_table_replication() { std::ops::Bound::Unbounded, ); let replicated_iter = replicated_state_table - .iter_with_vnode_and_output_indices(DEFAULT_VNODE, &range_bounds, Default::default()) + .iter_with_vnode_and_output_indices(SINGLETON_VNODE, &range_bounds, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -2079,7 +2079,7 @@ async fn test_replicated_state_table_replication() { let range_bounds: (Bound, Bound) = (std::ops::Bound::Unbounded, std::ops::Bound::Unbounded); let replicated_iter = replicated_state_table - .iter_with_vnode_and_output_indices(DEFAULT_VNODE, &range_bounds, Default::default()) + .iter_with_vnode_and_output_indices(SINGLETON_VNODE, &range_bounds, Default::default()) .await .unwrap(); pin_mut!(replicated_iter); diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 8f8b166626d2..01497c37fdab 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp; -use std::ops::Deref; use futures::future::{try_join, try_join_all}; use risingwave_common::hash::VnodeBitmapExt; @@ -27,7 +26,6 @@ use risingwave_expr::Result as ExprResult; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::expr::expr_node::Type; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::TableDistribution; use super::filter::FilterExecutor; use crate::executor::prelude::*; @@ -219,10 +217,7 @@ impl WatermarkFilterExecutor { let mut need_update_global_max_watermark = false; // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { - let other_vnodes_bitmap = Arc::new( - (!(*vnode_bitmap).clone()) - & TableDistribution::all_vnodes_ref().deref(), - ); + let other_vnodes_bitmap = Arc::new(!(*vnode_bitmap).clone()); let _ = global_watermark_table.update_vnode_bitmap(other_vnodes_bitmap); let (previous_vnode_bitmap, _cache_may_stale) = table.update_vnode_bitmap(vnode_bitmap.clone()); @@ -373,7 +368,9 @@ impl WatermarkFilterExecutor { #[cfg(test)] mod tests { use itertools::Itertools; + use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableDesc}; + use risingwave_common::hash::VirtualNode; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::Date; use risingwave_common::util::epoch::test_epoch; @@ -431,7 +428,7 @@ mod tests { let state_table = StateTable::from_table_catalog_inconsistent_op( &table, mem_state.clone(), - Some(TableDistribution::all_vnodes()), + Some(Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into()), ) .await; @@ -440,7 +437,7 @@ mod tests { let storage_table = StorageTable::new_partial( mem_state, val_indices.iter().map(|i| ColumnId::new(*i as _)).collect(), - Some(TableDistribution::all_vnodes()), + Some(Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into()), &desc, ); (storage_table, state_table) diff --git a/src/stream/src/from_proto/mview.rs b/src/stream/src/from_proto/mview.rs index 41fc47609fba..43fc929edf45 100644 --- a/src/stream/src/from_proto/mview.rs +++ b/src/stream/src/from_proto/mview.rs @@ -100,7 +100,7 @@ impl ExecutorBuilder for ArrangeExecutorBuilder { let table = node.get_table()?; // FIXME: Lookup is now implemented without cell-based table API and relies on all vnodes - // being `DEFAULT_VNODE`, so we need to make the Arrange a singleton. + // being `SINGLETON_VNODE`, so we need to make the Arrange a singleton. let vnodes = params.vnode_bitmap.map(Arc::new); let conflict_behavior = ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior()); diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index 0081f00cc39e..4e3147d10853 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Deref; use std::sync::Arc; use risingwave_common::catalog::{ColumnId, TableDesc}; use risingwave_expr::expr::build_non_strict_from_prost; use risingwave_pb::stream_plan::WatermarkFilterNode; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::TableDistribution; use super::*; use crate::common::table::state_table::StateTable; @@ -57,8 +55,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder { .iter() .map(|i| ColumnId::new(*i as _)) .collect_vec(); - let other_vnodes = - Arc::new((!(*vnodes).clone()) & TableDistribution::all_vnodes_ref().deref()); + let other_vnodes = Arc::new(!(*vnodes).clone()); let global_watermark_table = StorageTable::new_partial(store.clone(), column_ids, Some(other_vnodes), &desc);