From 5cb3e8f4cfddf084408f6939fe92b11f5b5b3da6 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sun, 29 Sep 2024 14:18:31 +0800 Subject: [PATCH] refactor: use 1 for vnode count of singletons for backward compatibility --- proto/catalog.proto | 2 +- proto/meta.proto | 2 +- proto/plan_common.proto | 2 +- src/common/src/hash/consistent_hash/bitmap.rs | 6 +- src/common/src/hash/consistent_hash/compat.rs | 58 ++++++++++++++++--- .../src/hash/consistent_hash/mapping.rs | 7 ++- src/frontend/src/optimizer/plan_node/utils.rs | 15 +++++ .../m20240911_083152_variable_vnode_count.rs | 28 +++++++++ src/meta/src/stream/stream_graph/schedule.rs | 6 +- src/stream/src/executor/actor.rs | 6 +- 10 files changed, 107 insertions(+), 25 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 8eeb75843244..0c67a92f23cd 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -422,7 +422,7 @@ message Table { // Use `VnodeCountCompat::vnode_count` to access it. // // - Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the // corresponding job is still in `Creating` status, in which case calling `vnode_count` // will panic. diff --git a/proto/meta.proto b/proto/meta.proto index aa006a3400b1..bbd1265c4426 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -92,7 +92,7 @@ message TableFragments { // Duplicated from the length of the vnode bitmap in any actor of the fragment. // // Can be unset if the fragment is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; } diff --git a/proto/plan_common.proto b/proto/plan_common.proto index f561ee427ea4..487ab54e2a66 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -100,7 +100,7 @@ message StorageTableDesc { // Total vnode count of the table. // // Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 12; } diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index a40946273a0a..0ba54d8e0637 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); + let mut builder = BitmapBuilder::zeroed(1); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index c659da2fc605..ce2c75c47e92 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -80,14 +80,23 @@ impl VnodeCount { } /// A trait for accessing the vnode count field with backward compatibility. +/// +/// # `maybe_`? +/// +/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter +/// method with the same name as the field will be generated for `prost` structs. +/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning +/// `0` when the field is unset, which can be misleading sometimes. +/// +/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` +/// through this trait, ensuring that backward compatibility is handled properly. pub trait VnodeCountCompat { /// Get the `maybe_vnode_count` field. fn vnode_count_inner(&self) -> VnodeCount; - /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, - /// typically for backward compatibility. Panics if the field is a placeholder. - /// - /// Equivalent to `self.vnode_count_inner().value()`. + /// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`] + /// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward + /// compatibility. Panics if the field is a placeholder. /// /// See the documentation on the field of the implementing type for more details. fn vnode_count(&self) -> usize { @@ -116,8 +125,39 @@ macro_rules! impl_maybe_vnode_count_compat { }; } -impl_maybe_vnode_count_compat!( - risingwave_pb::plan_common::StorageTableDesc, - risingwave_pb::catalog::Table, - risingwave_pb::meta::table_fragments::Fragment, -); +// TODO!!!!!!!!!!!!!!!!! + +// impl VnodeCountCompat for risingwave_pb::catalog::Table { +// fn vnode_count(&self) -> usize { +// if let Some(vnode_count) = self.maybe_vnode_count { +// return vnode_count as _; +// } + +// // Compatibility: derive vnode count from distribution. +// if self.distribution_key.is_empty() +// && self.dist_key_in_pk.is_empty() +// && self.vnode_col_index.is_none() +// { +// // Singleton table. +// 1 +// } else { +// VirtualNode::COUNT_FOR_COMPAT +// } +// } +// } + +// impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { +// fn vnode_count(&self) -> usize { +// if let Some(vnode_count) = self.maybe_vnode_count { +// return vnode_count as _; +// } + +// // Compatibility: derive vnode count from distribution. +// if self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() { +// // Singleton table. +// 1 +// } else { +// VirtualNode::COUNT_FOR_COMPAT +// } +// } +// } diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 1e7bca125fc5..2deed907103f 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -139,11 +139,12 @@ impl VnodeMapping { } } - /// Create a vnode mapping with the single item. Should only be used for singletons. + /// Create a vnode mapping with the single item and length of 1. /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. + /// Should only be used for singletons. If you want a different vnode count, call + /// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) + Self::new_uniform(std::iter::once(item), 1) } /// The length (or count) of the vnode in this mapping. diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index a96e1284a9f7..6ca8f6edb51e 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -138,6 +138,21 @@ impl TableCatalogBuilder { Some(w) => w, None => FixedBitSet::with_capacity(self.columns.len()), }; + + // If `dist_key_in_pk` is set, check if it matches with `distribution_key`. + // Note that we cannot derive in the opposite direction, because there can be a column + // appearing multiple times in the PK. + if let Some(dist_key_in_pk) = &self.dist_key_in_pk { + let derived_dist_key = dist_key_in_pk + .iter() + .map(|idx| self.pk[*idx].column_index) + .collect_vec(); + assert_eq!( + derived_dist_key, distribution_key, + "dist_key mismatch with dist_key_in_pk" + ); + } + TableCatalog { id: TableId::placeholder(), associated_source_id: None, diff --git a/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs index 5b0aad3ea558..9f7498830abd 100644 --- a/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs +++ b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs @@ -21,6 +21,19 @@ impl MigrationTrait for Migration { ) .await?; + // Fill vnode count with 1 for singleton tables. + manager + .exec_stmt( + UpdateStatement::new() + .table(Table::Table) + .values([(Table::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Table::DistributionKey).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::DistKeyInPk).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::VnodeColIndex).is_null()) + .to_owned(), + ) + .await?; + manager .alter_table( MigrationTable::alter() @@ -30,6 +43,17 @@ impl MigrationTrait for Migration { ) .await?; + // Fill vnode count with 1 for singleton fragments. + manager + .exec_stmt( + UpdateStatement::new() + .table(Fragment::Table) + .values([(Fragment::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Fragment::DistributionType).eq(Expr::value("SINGLE"))) + .to_owned(), + ) + .await?; + manager .alter_table( MigrationTable::alter() @@ -74,12 +98,16 @@ impl MigrationTrait for Migration { enum Fragment { Table, VnodeCount, + DistributionType, } #[derive(DeriveIden)] enum Table { Table, VnodeCount, + DistributionKey, + DistKeyInPk, + VnodeColIndex, } #[derive(DeriveIden)] diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 5d465b19d195..97b5c3032117 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,7 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -152,11 +152,9 @@ impl Distribution { } /// Get the vnode count of the distribution. - /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, + Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE` Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 0aa5ebca9007..65f7e55cdd4e 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -103,9 +103,9 @@ impl ActorContext { fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) - // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. - .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), + // An unset `vnode_bitmap` means the actor is a singleton, + // where only `SINGLETON_VNODE` is set. + .map_or(1, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val,