diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 395e462d4ad31..0040a79f780fa 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -92,6 +92,17 @@ impl VnodeCount { } } +/// A trait for checking whether a table/fragment is a singleton. +pub trait IsSingleton { + /// Returns `true` if the table/fragment is a singleton. + /// + /// By singleton, we mean that all data read from or written to the storage belongs to + /// the only `SINGLETON_VNODE`. This must be consistent with the behavior of + /// [`TableDistribution`](crate::hash::table_distribution::TableDistribution::new). + /// As a result, the `vnode_count` of such table/fragment can be `1`. + fn is_singleton(&self) -> bool; +} + /// A trait for accessing the vnode count field with backward compatibility. /// /// # `maybe_`? @@ -117,28 +128,37 @@ pub trait VnodeCountCompat { } } +impl IsSingleton for risingwave_pb::catalog::Table { + fn is_singleton(&self) -> bool { + self.distribution_key.is_empty() + && self.dist_key_in_pk.is_empty() + && self.vnode_col_index.is_none() + } +} impl VnodeCountCompat for risingwave_pb::catalog::Table { fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count, || { - self.distribution_key.is_empty() - && self.dist_key_in_pk.is_empty() - && self.vnode_col_index.is_none() - }) + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) } } +impl IsSingleton for risingwave_pb::plan_common::StorageTableDesc { + fn is_singleton(&self) -> bool { + self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() + } +} impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count, || { - self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() - }) + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) } } +impl IsSingleton for risingwave_pb::meta::table_fragments::Fragment { + fn is_singleton(&self) -> bool { + matches!(self.distribution_type(), FragmentDistributionType::Single) + } +} impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment { fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count, || { - matches!(self.distribution_type(), FragmentDistributionType::Single) - }) + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) } } diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 822db591c1577..a91f48f6589af 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use crate::array::{Array, DataChunk, PrimitiveArray}; use crate::bitmap::Bitmap; -use crate::hash::VirtualNode; +use crate::hash::{IsSingleton, VirtualNode}; use crate::row::Row; use crate::util::iter_util::ZipEqFast; @@ -64,7 +64,10 @@ impl TableDistribution { .map(|&k| k as usize) .collect_vec(); let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize); - Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk) + + let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + assert_eq!(this.is_singleton(), table_desc.is_singleton()); + this } pub fn new( diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 3ada0afd71625..55957af9e6e5b 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -29,7 +29,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption, }; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -363,6 +363,7 @@ where let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + assert_eq!(distribution.is_singleton(), table_catalog.is_singleton()); let pk_data_types = pk_indices .iter()