Skip to content

Commit

Permalink
assert vnode count when scan table
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 5, 2024
1 parent f5e511e commit 97da74d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
14 changes: 12 additions & 2 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use crate::array::{Array, DataChunk, PrimitiveArray};
use crate::bitmap::Bitmap;
use crate::hash::{IsSingleton, VirtualNode};
use crate::hash::{VirtualNode, VnodeCountCompat};
use crate::row::Row;
use crate::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -66,7 +66,12 @@ impl TableDistribution {
let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);

let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
assert_eq!(this.is_singleton(), table_desc.is_singleton());
assert_eq!(
this.vnode_count(),
table_desc.vnode_count(),
"vnode count mismatch, scanning table {} under wrong distribution?",
table_desc.table_id
);
this
}

Expand Down Expand Up @@ -142,6 +147,11 @@ impl TableDistribution {
}
}

/// Get vnode count (1 if singleton). Equivalent to `self.vnodes().len()`.
pub fn vnode_count(&self) -> usize {
self.vnodes().len()
}

/// Get vnode value with given primary key.
pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
match &self.compute_vnode {
Expand Down
9 changes: 7 additions & 2 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{
get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption,
};
use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeBitmapExt};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand Down Expand Up @@ -363,7 +363,12 @@ where

let distribution =
TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
assert_eq!(distribution.is_singleton(), table_catalog.is_singleton());
assert_eq!(
distribution.vnode_count(),
table_catalog.vnode_count(),
"vnode count mismatch, scanning table {} under wrong distribution?",
table_catalog.name,
);

let pk_data_types = pk_indices
.iter()
Expand Down

0 comments on commit 97da74d

Please sign in to comment.