Skip to content

Commit

Permalink
extract is_singleton
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 22, 2024
1 parent 37d7b96 commit 72a8c8c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
42 changes: 31 additions & 11 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ impl VnodeCount {
}
}

/// A trait for checking whether a table/fragment is a singleton.
pub trait IsSingleton {
/// Returns `true` if the table/fragment is a singleton.
///
/// By singleton, we mean that all data read from or written to the storage belongs to
/// the only `SINGLETON_VNODE`. This must be consistent with the behavior of
/// [`TableDistribution`](crate::hash::table_distribution::TableDistribution::new).
/// As a result, the `vnode_count` of such table/fragment can be `1`.
fn is_singleton(&self) -> bool;
}

/// A trait for accessing the vnode count field with backward compatibility.
///
/// # `maybe_`?
Expand All @@ -117,28 +128,37 @@ pub trait VnodeCountCompat {
}
}

impl IsSingleton for risingwave_pb::catalog::Table {
fn is_singleton(&self) -> bool {
self.distribution_key.is_empty()
&& self.dist_key_in_pk.is_empty()
&& self.vnode_col_index.is_none()
}
}
impl VnodeCountCompat for risingwave_pb::catalog::Table {
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count, || {
self.distribution_key.is_empty()
&& self.dist_key_in_pk.is_empty()
&& self.vnode_col_index.is_none()
})
VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
}
}

impl IsSingleton for risingwave_pb::plan_common::StorageTableDesc {
fn is_singleton(&self) -> bool {
self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none()
}
}
impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc {
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count, || {
self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none()
})
VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
}
}

impl IsSingleton for risingwave_pb::meta::table_fragments::Fragment {
fn is_singleton(&self) -> bool {
matches!(self.distribution_type(), FragmentDistributionType::Single)
}
}
impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment {
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count, || {
matches!(self.distribution_type(), FragmentDistributionType::Single)
})
VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
}
}
7 changes: 5 additions & 2 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use crate::array::{Array, DataChunk, PrimitiveArray};
use crate::bitmap::Bitmap;
use crate::hash::VirtualNode;
use crate::hash::{IsSingleton, VirtualNode};
use crate::row::Row;
use crate::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -64,7 +64,10 @@ impl TableDistribution {
.map(|&k| k as usize)
.collect_vec();
let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk)

let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
assert_eq!(this.is_singleton(), table_desc.is_singleton());
this
}

pub fn new(
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{
get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption,
};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand Down Expand Up @@ -363,6 +363,7 @@ where

let distribution =
TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
assert_eq!(distribution.is_singleton(), table_catalog.is_singleton());

let pk_data_types = pk_indices
.iter()
Expand Down

0 comments on commit 72a8c8c

Please sign in to comment.