diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 9f58823284df7..89f30e770bd78 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use crate::array::{Array, DataChunk, PrimitiveArray}; use crate::bitmap::Bitmap; -use crate::hash::{IsSingleton, VirtualNode}; +use crate::hash::{VirtualNode, VnodeCountCompat}; use crate::row::Row; use crate::util::iter_util::ZipEqFast; @@ -66,7 +66,12 @@ impl TableDistribution { let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize); let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); - assert_eq!(this.is_singleton(), table_desc.is_singleton()); + assert_eq!( + this.vnode_count(), + table_desc.vnode_count(), + "vnode count mismatch, scanning table {} under wrong distribution?", + table_desc.table_id + ); this } @@ -142,6 +147,11 @@ impl TableDistribution { } } + /// Get vnode count (1 if singleton). Equivalent to `self.vnodes().len()`. + pub fn vnode_count(&self) -> usize { + self.vnodes().len() + } + /// Get vnode value with given primary key. pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { match &self.compute_vnode { diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 0ecb0eac90718..8bf2e2750f6da 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -29,7 +29,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption, }; -use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat}; use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -363,7 +363,12 @@ where let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); - assert_eq!(distribution.is_singleton(), table_catalog.is_singleton()); + assert_eq!( + distribution.vnode_count(), + table_catalog.vnode_count(), + "vnode count mismatch, scanning table {} under wrong distribution?", + table_catalog.name, + ); let pk_data_types = pk_indices .iter()