From b1163d2117210171fe66e69d4593500e623abbb4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 11 Dec 2024 16:48:08 +0800 Subject: [PATCH 1/2] fix(streaming): fix compatibility upgrading from v2.1.0 for singleton vnode count Signed-off-by: Bugen Zhao --- .../src/hash/consistent_hash/vnode_count.rs | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/common/src/hash/consistent_hash/vnode_count.rs b/src/common/src/hash/consistent_hash/vnode_count.rs index 0040a79f780fa..30c896ca9a8df 100644 --- a/src/common/src/hash/consistent_hash/vnode_count.rs +++ b/src/common/src/hash/consistent_hash/vnode_count.rs @@ -24,14 +24,13 @@ pub enum VnodeCount { /// The field is a placeholder and has to be filled first before using it. #[default] Placeholder, + /// The table/fragment is a singleton, thus the value should always be interpreted as `1`. + Singleton, /// The field is set to a specific value. Set(NonZeroUsize), /// The field is unset because the table/fragment is persisted as hash-distributed /// in an older version. CompatHash, - /// The field is unset because the table/fragment is persisted as singleton - /// in an older version. - CompatSingleton, } impl VnodeCount { @@ -51,18 +50,26 @@ impl VnodeCount { Self::set(VirtualNode::COUNT_FOR_TEST) } - /// Converts from protobuf representation of `maybe_vnode_count`. If the value is not set, - /// call `compat_is_singleton` to determine whether it should be treated as a singleton - /// when it comes to backward compatibility. - fn from_protobuf(v: Option, compat_is_singleton: impl FnOnce() -> bool) -> Self { + /// Converts from protobuf representation of `maybe_vnode_count`. + /// + /// The value will be ignored if `is_singleton` returns `true`. + fn from_protobuf(v: Option, is_singleton: impl FnOnce() -> bool) -> Self { match v { Some(0) => VnodeCount::Placeholder, - Some(v) => VnodeCount::set(v as usize), - None => { - if compat_is_singleton() { - VnodeCount::CompatSingleton + _ => { + if is_singleton() { + if let Some(v) = v + && v != 1 + { + tracing::debug!( + vnode_count = v, + "singleton has vnode count set to non-1, \ + ignoring as it could be due to backward compatibility" + ); + } + VnodeCount::Singleton } else { - VnodeCount::CompatHash + v.map_or(VnodeCount::CompatHash, VnodeCount::set) } } } @@ -79,9 +86,9 @@ impl VnodeCount { pub fn value_opt(self) -> Option { match self { VnodeCount::Placeholder => None, + VnodeCount::Singleton => Some(1), VnodeCount::Set(v) => Some(v.get()), VnodeCount::CompatHash => Some(VirtualNode::COUNT_FOR_COMPAT), - VnodeCount::CompatSingleton => Some(1), } } From 8958f648bcb2a51b858a9bcec0b49351fbb71e4f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 11 Dec 2024 17:40:31 +0800 Subject: [PATCH 2/2] fix unit test Signed-off-by: Bugen Zhao --- src/frontend/src/catalog/table_catalog.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index ba09d34b1dd53..5e680aed618b6 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -759,7 +759,7 @@ mod tests { pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()], stream_key: vec![0], dependent_relations: vec![], - distribution_key: vec![], + distribution_key: vec![0], optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233) .into(), append_only: false, @@ -779,7 +779,7 @@ mod tests { }), watermark_indices: vec![], handle_pk_conflict_behavior: 3, - dist_key_in_pk: vec![], + dist_key_in_pk: vec![0], cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, @@ -833,7 +833,7 @@ mod tests { ], stream_key: vec![0], pk: vec![ColumnOrder::new(0, OrderType::ascending())], - distribution_key: vec![], + distribution_key: vec![0], append_only: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, retention_seconds: Some(300), @@ -847,7 +847,7 @@ mod tests { read_prefix_len_hint: 0, version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))), watermark_columns: FixedBitSet::with_capacity(3), - dist_key_in_pk: vec![], + dist_key_in_pk: vec![0], cardinality: Cardinality::unknown(), created_at_epoch: None, initialized_at_epoch: None,