Skip to content

Commit

Permalink
make new_single return 256 as a workaround
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 20, 2024
1 parent 510d2ea commit a01bbbf
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 22 deletions.
13 changes: 3 additions & 10 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,10 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}
}

/// Create a vnode mapping with vnode count 1 and the single item.
///
/// Should only be used for singleton distribution. This is to be consistent with
/// [`VnodeBitmapExt::singleton`].
/// Create a vnode mapping with the single item. Should only be used for singletons.
// TODO(var-vnode): make vnode count 1.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), 1)
}

/// Create a vnode mapping with the same item for all vnodes. Mainly used for testing.
pub fn new_all_same(item: T::Item, vnode_count: usize) -> Self {
Self::new_uniform(std::iter::once(item), vnode_count)
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ pub(crate) mod tests {
let workers = vec![worker1, worker2, worker3];
let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers));
let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false);
let mapping = WorkerSlotMapping::new_all_same(WorkerSlotId::new(0, 0), vnode_count);
let mapping =
WorkerSlotMapping::new_uniform(std::iter::once(WorkerSlotId::new(0, 0)), vnode_count);
worker_node_manager.insert_streaming_fragment_mapping(0, mapping.clone());
worker_node_manager.set_serving_fragment_mapping(vec![(0, mapping)].into_iter().collect());
let catalog = Arc::new(parking_lot::RwLock::new(Catalog::default()));
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,7 @@ impl ActorGraphBuilder {
// Fill the vnode count for each internal table, based on schedule result.
let mut fragment_graph = fragment_graph;
for (id, fragment) in fragment_graph.building_fragments_mut() {
let vnode_count = distributions[id]
.hash_vnode_count()
.unwrap_or(expected_vnode_count);
let vnode_count = distributions[id].vnode_count();
visit_tables(fragment, |table, _| {
table.maybe_vnode_count = Some(vnode_count as _);
})
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,9 +1093,7 @@ impl CompleteStreamFragmentGraph {
} = building_fragment;

let distribution_type = distribution.to_distribution_type() as i32;
let vnode_count = distribution
.hash_vnode_count()
.unwrap_or(self.expected_vnode_count());
let vnode_count = distribution.vnode_count();

let materialized_fragment_id =
if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 {
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::{bail, hash};
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::meta::table_fragments::fragment::{
Expand Down Expand Up @@ -151,13 +151,13 @@ impl Distribution {
}
}

/// Get the vnode count of the distribution, if it's hash-distributed.
/// Get the vnode count of the distribution.
// TODO(var-vnode): after `ServingVnodeMapping::upsert` is made vnode-count-aware,
// we may return 1 for singleton.
pub fn hash_vnode_count(&self) -> Option<usize> {
pub fn vnode_count(&self) -> usize {
match self {
Distribution::Singleton(_) => None,
Distribution::Hash(mapping) => Some(mapping.len()),
Distribution::Singleton(_) => VirtualNode::COUNT,
Distribution::Hash(mapping) => mapping.len(),
}
}

Expand Down

0 comments on commit a01bbbf

Please sign in to comment.