From f3ca82d2ac708ccc0227acb24384f214b2dea033 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 17 Oct 2024 16:29:40 +0800 Subject: [PATCH 1/8] vnode count unset Signed-off-by: Bugen Zhao --- src/meta/model/src/table.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index 0a47208ff7351..fc0d23c399a44 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -209,7 +209,15 @@ impl From for ActiveModel { fn from(pb_table: PbTable) -> Self { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); - let vnode_count = pb_table.vnode_count(); + + // `PbTable` here should be sourced from the wire, not from persistence. + // An unset `maybe_vnode_count` field should be treated as `NotSet`, instead of calling + // the compatibility code. + let vnode_count = if let Some(vnode_count) = pb_table.maybe_vnode_count { + Set(vnode_count as _) + } else { + NotSet + }; let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { NotSet @@ -258,7 +266,7 @@ impl From for ActiveModel { retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), - vnode_count: Set(vnode_count as _), + vnode_count, } } } From c69d920241e0baaed09592de29990463b65e2da4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 17 Oct 2024 17:17:20 +0800 Subject: [PATCH 2/8] introduce vnode count enum Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/compat.rs | 62 +++++++++++++++++-- src/frontend/src/catalog/table_catalog.rs | 15 +++-- .../optimizer/plan_node/stream_materialize.rs | 3 +- src/frontend/src/optimizer/plan_node/utils.rs | 3 +- .../src/scheduler/distributed/query.rs | 4 +- src/meta/model/src/table.rs | 10 +-- 6 files changed, 76 insertions(+), 21 deletions(-) diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 0c86fbb12bcd4..ab061a584ad05 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -12,15 +12,70 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroUsize; + use super::vnode::VirtualNode; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum VnodeCount { + #[default] + Placeholder, + Set(NonZeroUsize), + Compat, +} + +impl VnodeCount { + pub fn set(v: impl TryInto) -> Self { + VnodeCount::Set( + v.try_into() + .unwrap_or_else(|_| panic!("vnode count must be non-zero")), + ) + } + + pub fn for_test() -> Self { + Self::set(VirtualNode::COUNT_FOR_TEST) + } + + pub fn from_protobuf(v: Option) -> Self { + match v { + None => VnodeCount::Compat, + Some(0) => VnodeCount::Placeholder, + Some(v) => VnodeCount::set(v as usize), + } + } + + pub fn to_protobuf(self) -> Option { + match self { + VnodeCount::Placeholder => Some(0), + VnodeCount::Set(v) => Some(v.get() as _), + VnodeCount::Compat => None, + } + } + + pub fn value(self) -> usize { + self.value_opt().expect("vnode count not set") + } + + pub fn value_opt(self) -> Option { + match self { + VnodeCount::Placeholder => None, + VnodeCount::Set(v) => Some(v.get()), + VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT), + } + } +} + /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { + fn vnode_count_inner(&self) -> VnodeCount; + /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, /// typically for backward compatibility. /// /// See the documentation on the field of the implementing type for more details. - fn vnode_count(&self) -> usize; + fn vnode_count(&self) -> usize { + self.vnode_count_inner().value() + } } /// Implement the trait for given types by delegating to the `maybe_vnode_count` field. @@ -36,9 +91,8 @@ macro_rules! impl_maybe_vnode_count_compat { ($($ty:ty),* $(,)?) => { $( impl VnodeCountCompat for $ty { - fn vnode_count(&self) -> usize { - self.maybe_vnode_count - .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) + fn vnode_count_inner(&self) -> VnodeCount { + VnodeCount::from_protobuf(self.maybe_vnode_count) } } )* diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 8c33e55a0b164..252d906b96295 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -20,7 +20,7 @@ use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId, }; -use risingwave_common::hash::VnodeCountCompat; +use risingwave_common::hash::{VnodeCount, VnodeCountCompat}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; @@ -186,7 +186,7 @@ pub struct TableCatalog { /// /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build - pub vnode_count: Option, + pub vnode_count: VnodeCount, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -408,8 +408,7 @@ impl TableCatalog { /// /// Panics if it's called on an incomplete (and not yet persisted) table catalog. pub fn vnode_count(&self) -> usize { - self.vnode_count - .expect("vnode count unset, called on an incomplete table catalog?") + self.vnode_count.value() } pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbTable { @@ -457,7 +456,7 @@ impl TableCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), - maybe_vnode_count: self.vnode_count.map(|v| v as _), + maybe_vnode_count: self.vnode_count.to_protobuf(), } } @@ -563,7 +562,7 @@ impl From for TableCatalog { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); let name = tb.name.clone(); - let vnode_count = tb.vnode_count(); + let vnode_count = tb.vnode_count_inner(); let mut col_names = HashSet::new(); let mut col_index: HashMap = HashMap::new(); @@ -634,7 +633,7 @@ impl From for TableCatalog { .map(TableId::from) .collect_vec(), cdc_table_id: tb.cdc_table_id, - vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */ + vnode_count, } } } @@ -789,7 +788,7 @@ mod tests { dependent_relations: vec![], version_column_index: None, cdc_table_id: None, - vnode_count: Some(233), + vnode_count: VnodeCount::set(233), } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 88878667b8411..fa5f60c2a99ef 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -21,6 +21,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -283,7 +284,7 @@ impl StreamMaterialize { created_at_cluster_version: None, retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, - vnode_count: None, // will be filled in by the meta service later + vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 4aefa1e54c882..a96e1284a9f7e 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ use risingwave_common::constants::log_store::v2::{ KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::TableType; @@ -179,7 +180,7 @@ impl TableCatalogBuilder { created_at_cluster_version: None, retention_seconds: None, cdc_table_id: None, - vnode_count: None, // will be filled in by the meta service later + vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index d169874d5b3f2..2d40328bab400 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -473,7 +473,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; + use risingwave_common::hash::{VirtualNode, VnodeCount, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; @@ -589,7 +589,7 @@ pub(crate) mod tests { initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, - vnode_count: Some(vnode_count), + vnode_count: VnodeCount::set(vnode_count), }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index fc0d23c399a44..202ad7f6c5bd0 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -213,11 +213,11 @@ impl From for ActiveModel { // `PbTable` here should be sourced from the wire, not from persistence. // An unset `maybe_vnode_count` field should be treated as `NotSet`, instead of calling // the compatibility code. - let vnode_count = if let Some(vnode_count) = pb_table.maybe_vnode_count { - Set(vnode_count as _) - } else { - NotSet - }; + let vnode_count = pb_table + .vnode_count_inner() + .value_opt() + .map(|v| v as _) + .map_or(NotSet, Set); let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { NotSet From b648e13fb4e6426e0f07f35d961b4f880aef637b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 17 Oct 2024 17:39:52 +0800 Subject: [PATCH 3/8] assert `From for TableCatalog` is not placeholder Signed-off-by: Bugen Zhao --- src/frontend/src/catalog/table_catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 252d906b96295..be5c5ec86cad8 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -562,7 +562,7 @@ impl From for TableCatalog { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); let name = tb.name.clone(); - let vnode_count = tb.vnode_count_inner(); + let vnode_count = tb.vnode_count(); let mut col_names = HashSet::new(); let mut col_index: HashMap = HashMap::new(); @@ -633,7 +633,7 @@ impl From for TableCatalog { .map(TableId::from) .collect_vec(), cdc_table_id: tb.cdc_table_id, - vnode_count, + vnode_count: VnodeCount::set(vnode_count), /* from existing (persisted) tables, vnode_count must be set */ } } } From fde3ed7556830cef28aa0fac632e3e00d12df225 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 17 Oct 2024 18:09:01 +0800 Subject: [PATCH 4/8] add some docs Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/compat.rs | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index ab061a584ad05..6d1a828952783 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -16,15 +16,20 @@ use std::num::NonZeroUsize; use super::vnode::VirtualNode; +/// The different cases of `maybe_vnode_count` field in the protobuf message. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] pub enum VnodeCount { + /// The field is a placeholder and has to be filled first before using it. #[default] Placeholder, + /// The field is set to a specific value. Set(NonZeroUsize), + /// The field is unset because it's persisted in an older version. Compat, } impl VnodeCount { + /// Creates a `VnodeCount` set to the given value. pub fn set(v: impl TryInto) -> Self { VnodeCount::Set( v.try_into() @@ -32,18 +37,7 @@ impl VnodeCount { ) } - pub fn for_test() -> Self { - Self::set(VirtualNode::COUNT_FOR_TEST) - } - - pub fn from_protobuf(v: Option) -> Self { - match v { - None => VnodeCount::Compat, - Some(0) => VnodeCount::Placeholder, - Some(v) => VnodeCount::set(v as usize), - } - } - + /// Converts to protobuf representation for `maybe_vnode_count`. pub fn to_protobuf(self) -> Option { match self { VnodeCount::Placeholder => Some(0), @@ -52,10 +46,16 @@ impl VnodeCount { } } - pub fn value(self) -> usize { - self.value_opt().expect("vnode count not set") + /// Converts from protobuf representation of `maybe_vnode_count`. + pub fn from_protobuf(v: Option) -> Self { + match v { + None => VnodeCount::Compat, + Some(0) => VnodeCount::Placeholder, + Some(v) => VnodeCount::set(v as usize), + } } + /// Returns the value of the vnode count, or `None` if it's a placeholder. pub fn value_opt(self) -> Option { match self { VnodeCount::Placeholder => None, @@ -63,14 +63,22 @@ impl VnodeCount { VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT), } } + + /// Returns the value of the vnode count. Panics if it's a placeholder. + pub fn value(self) -> usize { + self.value_opt().expect("vnode count not set") + } } /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { + /// Get the `maybe_vnode_count` field. fn vnode_count_inner(&self) -> VnodeCount; /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, - /// typically for backward compatibility. + /// typically for backward compatibility. Panics if the field is a placeholder. + /// + /// Equivalent to `self.vnode_count_inner().value()`. /// /// See the documentation on the field of the implementing type for more details. fn vnode_count(&self) -> usize { From 1f0ba69683c2479556248ff46254d7e2a7b7c13e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 17 Oct 2024 18:12:33 +0800 Subject: [PATCH 5/8] add more docs Signed-off-by: Bugen Zhao --- src/frontend/src/catalog/table_catalog.rs | 11 +++++------ src/meta/model/src/table.rs | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index be5c5ec86cad8..7fb847bdd3bc3 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -174,15 +174,14 @@ pub struct TableCatalog { /// Total vnode count of the table. /// - /// Can be unset if the catalog is generated by the frontend and not yet persisted. This is + /// Can be placeholder if the catalog is generated by the frontend and not yet persisted. This is /// because the vnode count of each fragment (then internal tables) is determined by the - /// meta service. See also [`StreamMaterialize::derive_table_catalog`] and - /// [`TableCatalogBuilder::build`]. + /// meta service. In this case, calling [`TableCatalog::vnode_count()`] will panic. + /// See also [`StreamMaterialize::derive_table_catalog`] and [`TableCatalogBuilder::build`]. /// - /// On the contrary, if this comes from a [`PbTable`], the field must be `Some` no matter + /// On the contrary, if this comes from a [`PbTable`], the field must have value no matter /// whether the table is created before or after the version we introduced variable vnode - /// count support. This is because we've already handled backward compatibility during - /// conversion. + /// count support. We will handle backward compatibility when obtaining the value. /// /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index 202ad7f6c5bd0..1d950f277396e 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -211,7 +211,7 @@ impl From for ActiveModel { let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); // `PbTable` here should be sourced from the wire, not from persistence. - // An unset `maybe_vnode_count` field should be treated as `NotSet`, instead of calling + // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling // the compatibility code. let vnode_count = pb_table .vnode_count_inner() From e81e9949a6faeb03eb4257f6e795e360d0769ed5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 15:33:42 +0800 Subject: [PATCH 6/8] use more vnode count struct Signed-off-by: Bugen Zhao --- src/common/src/catalog/physical_table.rs | 4 ++-- src/common/src/hash/consistent_hash/compat.rs | 16 +++++++++------- src/frontend/src/catalog/table_catalog.rs | 2 +- src/meta/src/controller/fragment.rs | 8 ++++---- src/meta/src/controller/mod.rs | 3 ++- src/meta/src/stream/stream_graph/fragment.rs | 3 ++- 6 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index df1b30fd41eeb..680f23fb0dc14 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; use crate::catalog::get_dist_key_in_pk_indices; -use crate::hash::VnodeCountCompat; +use crate::hash::{VnodeCount, VnodeCountCompat}; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -117,7 +117,7 @@ impl TableDesc { versioned: self.versioned, stream_key: self.stream_key.iter().map(|&x| x as u32).collect(), vnode_col_idx_in_pk, - maybe_vnode_count: Some(self.vnode_count as u32), + maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(), }) } diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 6d1a828952783..12f65631f3e88 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -30,11 +30,12 @@ pub enum VnodeCount { impl VnodeCount { /// Creates a `VnodeCount` set to the given value. - pub fn set(v: impl TryInto) -> Self { - VnodeCount::Set( - v.try_into() - .unwrap_or_else(|_| panic!("vnode count must be non-zero")), - ) + pub fn set(v: impl TryInto + Copy + std::fmt::Debug) -> Self { + let v = (v.try_into().ok()) + .filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v)) + .unwrap_or_else(|| panic!("invalid vnode count {v:?}")); + + VnodeCount::Set(NonZeroUsize::new(v).unwrap()) } /// Converts to protobuf representation for `maybe_vnode_count`. @@ -49,9 +50,9 @@ impl VnodeCount { /// Converts from protobuf representation of `maybe_vnode_count`. pub fn from_protobuf(v: Option) -> Self { match v { - None => VnodeCount::Compat, Some(0) => VnodeCount::Placeholder, Some(v) => VnodeCount::set(v as usize), + None => VnodeCount::Compat, } } @@ -66,7 +67,8 @@ impl VnodeCount { /// Returns the value of the vnode count. Panics if it's a placeholder. pub fn value(self) -> usize { - self.value_opt().expect("vnode count not set") + self.value_opt() + .expect("vnode count is a placeholder that must be filled by the meta service first") } } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 7fb847bdd3bc3..aa687a05c7e2b 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -723,7 +723,7 @@ mod tests { initialized_at_cluster_version: None, version_column_index: None, cdc_table_id: None, - maybe_vnode_count: Some(233), + maybe_vnode_count: VnodeCount::set(233).to_protobuf(), } .into(); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index e118ad3c9683c..dfdf91372ff0b 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::{VnodeCountCompat, WorkerSlotId}; +use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::util::worker_util::WorkerNodeId; use risingwave_meta_model::actor::ActorStatus; @@ -496,7 +496,7 @@ impl CatalogController { actors: pb_actors, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, - maybe_vnode_count: Some(vnode_count as _), + maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), }; Ok((pb_fragment, pb_actor_status, pb_actor_splits)) @@ -1497,7 +1497,7 @@ mod tests { use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; - use risingwave_common::hash::{ActorMapping, VirtualNode}; + use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model::actor::ActorStatus; @@ -1627,7 +1627,7 @@ mod tests { .values() .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), - maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as _), + maybe_vnode_count: VnodeCount::set(VirtualNode::COUNT_FOR_TEST).to_protobuf(), }; let pb_actor_status = (0..actor_count) diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index cd0bb8530acbb..c7cf45daad9e7 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use anyhow::anyhow; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::{ connection, database, function, index, object, schema, secret, sink, source, subscription, @@ -164,7 +165,7 @@ impl From> for PbTable { created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, - maybe_vnode_count: Some(value.0.vnode_count as _), + maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(), } } } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 211f609a13fe9..86a2197a9d5bc 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -24,6 +24,7 @@ use risingwave_common::bail; use risingwave_common::catalog::{ generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; @@ -1126,7 +1127,7 @@ impl CompleteStreamFragmentGraph { actors, state_table_ids, upstream_fragment_ids, - maybe_vnode_count: Some(vnode_count as _), + maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), } } From c6ca978e82adc3bb65660ee56f505550f9aa9e5e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 15:43:25 +0800 Subject: [PATCH 7/8] update docs and tolerate placeholder for incomplete catalogs Signed-off-by: Bugen Zhao --- proto/catalog.proto | 12 ++++++------ src/frontend/src/catalog/table_catalog.rs | 22 ++++++++++++---------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index f792eccc0cab6..8eeb758432446 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -419,13 +419,13 @@ message Table { // Total vnode count of the table. // - // Can be unset if... - // - The catalog is generated by the frontend and not yet persisted, this is - // because the vnode count of each fragment (then internal tables) is determined - // by the meta service. - // - The table is created in older versions where variable vnode count is not + // Use `VnodeCountCompat::vnode_count` to access it. + // + // - Can be unset if the table is created in older versions where variable vnode count is not // supported, in which case a default value of 256 should be used. - // Use `VnodeCountCompat::vnode_count` to access it. + // - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the + // corresponding job is still in `Creating` status, in which case calling `vnode_count` + // will panic. // // Please note that this field is not intended to describe the expected vnode count // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index aa687a05c7e2b..b533e6d956685 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::{HashMap, HashSet}; use fixedbitset::FixedBitSet; @@ -174,14 +175,9 @@ pub struct TableCatalog { /// Total vnode count of the table. /// - /// Can be placeholder if the catalog is generated by the frontend and not yet persisted. This is - /// because the vnode count of each fragment (then internal tables) is determined by the - /// meta service. In this case, calling [`TableCatalog::vnode_count()`] will panic. - /// See also [`StreamMaterialize::derive_table_catalog`] and [`TableCatalogBuilder::build`]. - /// - /// On the contrary, if this comes from a [`PbTable`], the field must have value no matter - /// whether the table is created before or after the version we introduced variable vnode - /// count support. We will handle backward compatibility when obtaining the value. + /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the + /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`] + /// will panic. /// /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build @@ -561,7 +557,13 @@ impl From for TableCatalog { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); let name = tb.name.clone(); - let vnode_count = tb.vnode_count(); + + let vnode_count = tb.vnode_count_inner(); + if let VnodeCount::Placeholder = vnode_count { + // Only allow placeholder vnode count for creating tables. + // After the table is created, an `Update` notification will be used to update the vnode count field. + assert_matches!(stream_job_status, PbStreamJobStatus::Creating); + } let mut col_names = HashSet::new(); let mut col_index: HashMap = HashMap::new(); @@ -632,7 +634,7 @@ impl From for TableCatalog { .map(TableId::from) .collect_vec(), cdc_table_id: tb.cdc_table_id, - vnode_count: VnodeCount::set(vnode_count), /* from existing (persisted) tables, vnode_count must be set */ + vnode_count, } } } From 932a1ca4ccecc1e1c0453dd76cca94509f4601e2 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 16:53:46 +0800 Subject: [PATCH 8/8] fix unit tests Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/compat.rs | 7 +++++++ src/frontend/src/handler/alter_table_column.rs | 3 ++- src/frontend/src/test_utils.rs | 4 ++++ src/meta/src/controller/fragment.rs | 2 +- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 12f65631f3e88..c659da2fc6059 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -38,6 +38,13 @@ impl VnodeCount { VnodeCount::Set(NonZeroUsize::new(v).unwrap()) } + /// Creates a `VnodeCount` set to the value for testing. + /// + /// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`. + pub fn for_test() -> Self { + Self::set(VirtualNode::COUNT_FOR_TEST) + } + /// Converts to protobuf representation for `maybe_vnode_count`. pub fn to_protobuf(self) -> Option { match self { diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index e9ecb5713cb10..88e886ad667bf 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -19,6 +19,7 @@ use anyhow::{anyhow, Context}; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::hash::VnodeCount; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::{bail, bail_not_implemented}; @@ -244,7 +245,7 @@ pub async fn get_replace_table_plan( // Set some fields ourselves so that the meta service does not need to maintain them. let mut table = table; table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); - table.maybe_vnode_count = Some(original_catalog.vnode_count() as _); + table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf(); Ok((source, table, graph, col_index_mapping, job_type)) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ac8784fde4dd3..14befbaeb7357 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -28,6 +28,7 @@ use risingwave_common::catalog::{ FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, }; +use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat}; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::cluster_limit::ClusterLimit; @@ -281,6 +282,7 @@ impl CatalogWriter for MockCatalogWriter { ) -> Result<()> { table.id = self.gen_id(); table.stream_job_status = PbStreamJobStatus::Created as _; + table.maybe_vnode_count = VnodeCount::for_test().to_protobuf(); self.catalog.write().create_table(&table); self.add_table_or_source_id(table.id, table.schema_id, table.database_id); self.hummock_snapshot_manager @@ -320,6 +322,7 @@ impl CatalogWriter for MockCatalogWriter { _job_type: TableJobType, ) -> Result<()> { table.stream_job_status = PbStreamJobStatus::Created as _; + assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST); self.catalog.write().update_table(&table); Ok(()) } @@ -353,6 +356,7 @@ impl CatalogWriter for MockCatalogWriter { ) -> Result<()> { index_table.id = self.gen_id(); index_table.stream_job_status = PbStreamJobStatus::Created as _; + index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf(); self.catalog.write().create_table(&index_table); self.add_table_or_index_id( index_table.id, diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index dfdf91372ff0b..a79b890cade20 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1627,7 +1627,7 @@ mod tests { .values() .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), - maybe_vnode_count: VnodeCount::set(VirtualNode::COUNT_FOR_TEST).to_protobuf(), + maybe_vnode_count: VnodeCount::for_test().to_protobuf(), }; let pb_actor_status = (0..actor_count)