diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index df1b30fd41eeb..680f23fb0dc14 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; use crate::catalog::get_dist_key_in_pk_indices; -use crate::hash::VnodeCountCompat; +use crate::hash::{VnodeCount, VnodeCountCompat}; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -117,7 +117,7 @@ impl TableDesc { versioned: self.versioned, stream_key: self.stream_key.iter().map(|&x| x as u32).collect(), vnode_col_idx_in_pk, - maybe_vnode_count: Some(self.vnode_count as u32), + maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(), }) } diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 6d1a828952783..12f65631f3e88 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -30,11 +30,12 @@ pub enum VnodeCount { impl VnodeCount { /// Creates a `VnodeCount` set to the given value. - pub fn set(v: impl TryInto) -> Self { - VnodeCount::Set( - v.try_into() - .unwrap_or_else(|_| panic!("vnode count must be non-zero")), - ) + pub fn set(v: impl TryInto + Copy + std::fmt::Debug) -> Self { + let v = (v.try_into().ok()) + .filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v)) + .unwrap_or_else(|| panic!("invalid vnode count {v:?}")); + + VnodeCount::Set(NonZeroUsize::new(v).unwrap()) } /// Converts to protobuf representation for `maybe_vnode_count`. @@ -49,9 +50,9 @@ impl VnodeCount { /// Converts from protobuf representation of `maybe_vnode_count`. pub fn from_protobuf(v: Option) -> Self { match v { - None => VnodeCount::Compat, Some(0) => VnodeCount::Placeholder, Some(v) => VnodeCount::set(v as usize), + None => VnodeCount::Compat, } } @@ -66,7 +67,8 @@ impl VnodeCount { /// Returns the value of the vnode count. Panics if it's a placeholder. pub fn value(self) -> usize { - self.value_opt().expect("vnode count not set") + self.value_opt() + .expect("vnode count is a placeholder that must be filled by the meta service first") } } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 7fb847bdd3bc3..aa687a05c7e2b 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -723,7 +723,7 @@ mod tests { initialized_at_cluster_version: None, version_column_index: None, cdc_table_id: None, - maybe_vnode_count: Some(233), + maybe_vnode_count: VnodeCount::set(233).to_protobuf(), } .into(); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index e118ad3c9683c..dfdf91372ff0b 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::{VnodeCountCompat, WorkerSlotId}; +use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::util::worker_util::WorkerNodeId; use risingwave_meta_model::actor::ActorStatus; @@ -496,7 +496,7 @@ impl CatalogController { actors: pb_actors, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, - maybe_vnode_count: Some(vnode_count as _), + maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), }; Ok((pb_fragment, pb_actor_status, pb_actor_splits)) @@ -1497,7 +1497,7 @@ mod tests { use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; - use risingwave_common::hash::{ActorMapping, VirtualNode}; + use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model::actor::ActorStatus; @@ -1627,7 +1627,7 @@ mod tests { .values() .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), - maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as _), + maybe_vnode_count: VnodeCount::set(VirtualNode::COUNT_FOR_TEST).to_protobuf(), }; let pb_actor_status = (0..actor_count) diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index cd0bb8530acbb..c7cf45daad9e7 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use anyhow::anyhow; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::{ connection, database, function, index, object, schema, secret, sink, source, subscription, @@ -164,7 +165,7 @@ impl From> for PbTable { created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, - maybe_vnode_count: Some(value.0.vnode_count as _), + maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(), } } } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 211f609a13fe9..86a2197a9d5bc 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -24,6 +24,7 @@ use risingwave_common::bail; use risingwave_common::catalog::{ generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; @@ -1126,7 +1127,7 @@ impl CompleteStreamFragmentGraph { actors, state_table_ids, upstream_fragment_ids, - maybe_vnode_count: Some(vnode_count as _), + maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), } }