From a01bbbf7682f534b751c75ce234c2e726932850f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 11 Sep 2024 17:58:51 +0800 Subject: [PATCH] make new_single return 256 as a workaround Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/mapping.rs | 13 +++---------- src/frontend/src/scheduler/distributed/query.rs | 3 ++- src/meta/src/stream/stream_graph/actor.rs | 4 +--- src/meta/src/stream/stream_graph/fragment.rs | 4 +--- src/meta/src/stream/stream_graph/schedule.rs | 10 +++++----- 5 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 4ea934414bb82..584e3d6276fd5 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -139,17 +139,10 @@ impl VnodeMapping { } } - /// Create a vnode mapping with vnode count 1 and the single item. - /// - /// Should only be used for singleton distribution. This is to be consistent with - /// [`VnodeBitmapExt::singleton`]. + /// Create a vnode mapping with the single item. Should only be used for singletons. + // TODO(var-vnode): make vnode count 1. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), 1) - } - - /// Create a vnode mapping with the same item for all vnodes. Mainly used for testing. - pub fn new_all_same(item: T::Item, vnode_count: usize) -> Self { - Self::new_uniform(std::iter::once(item), vnode_count) + Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) } /// The length (or count) of the vnode in this mapping. diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 3a5ac38db92f7..4d054b2d752e7 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -728,7 +728,8 @@ pub(crate) mod tests { let workers = vec![worker1, worker2, worker3]; let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers)); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); - let mapping = WorkerSlotMapping::new_all_same(WorkerSlotId::new(0, 0), vnode_count); + let mapping = + WorkerSlotMapping::new_uniform(std::iter::once(WorkerSlotId::new(0, 0)), vnode_count); worker_node_manager.insert_streaming_fragment_mapping(0, mapping.clone()); worker_node_manager.set_serving_fragment_mapping(vec![(0, mapping)].into_iter().collect()); let catalog = Arc::new(parking_lot::RwLock::new(Catalog::default())); diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 936019625402c..6799526b71e4b 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -681,9 +681,7 @@ impl ActorGraphBuilder { // Fill the vnode count for each internal table, based on schedule result. let mut fragment_graph = fragment_graph; for (id, fragment) in fragment_graph.building_fragments_mut() { - let vnode_count = distributions[id] - .hash_vnode_count() - .unwrap_or(expected_vnode_count); + let vnode_count = distributions[id].vnode_count(); visit_tables(fragment, |table, _| { table.maybe_vnode_count = Some(vnode_count as _); }) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 7d902eb3e1889..b997b54a3bbe4 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -1093,9 +1093,7 @@ impl CompleteStreamFragmentGraph { } = building_fragment; let distribution_type = distribution.to_distribution_type() as i32; - let vnode_count = distribution - .hash_vnode_count() - .unwrap_or(self.expected_vnode_count()); + let vnode_count = distribution.vnode_count(); let materialized_fragment_id = if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 { diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index ad68e463d0d63..f67d8547e28a8 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,7 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::{ @@ -151,13 +151,13 @@ impl Distribution { } } - /// Get the vnode count of the distribution, if it's hash-distributed. + /// Get the vnode count of the distribution. // TODO(var-vnode): after `ServingVnodeMapping::upsert` is made vnode-count-aware, // we may return 1 for singleton. - pub fn hash_vnode_count(&self) -> Option { + pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => None, - Distribution::Hash(mapping) => Some(mapping.len()), + Distribution::Singleton(_) => VirtualNode::COUNT, + Distribution::Hash(mapping) => mapping.len(), } }