Skip to content

Commit

Permalink
fix special cases for vnode count = 1
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 6, 2024
1 parent 9c7a418 commit c6edb30
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
15 changes: 8 additions & 7 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,7 @@ impl ActorGraphBuildStateInner {
.collect();
let actor_mapping = downstream
.distribution
.as_hash()
.unwrap()
.mapping()
.to_actor(&downstream_locations);

Self::new_hash_dispatcher(
Expand Down Expand Up @@ -877,16 +876,18 @@ impl ActorGraphBuilder {
// For building fragments, we need to generate the actor builders.
EitherFragment::Building(current_fragment) => {
let node = Arc::new(current_fragment.node.clone().unwrap());
let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
let vnode_count = distribution.vnode_count();
let bitmaps = distribution.mapping().to_bitmaps();

distribution
.worker_slots()
.map(|worker_slot| {
let actor_id = state.next_actor_id();
let vnode_bitmap = bitmaps
.as_ref()
.map(|m: &HashMap<WorkerSlotId, Bitmap>| &m[&worker_slot])
.cloned();
let vnode_bitmap = if vnode_count == 1 {
None
} else {
Some(bitmaps[&worker_slot].clone())
};

state.inner.add_actor(
actor_id,
Expand Down
14 changes: 10 additions & 4 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
reason = "generated by crepe"
)]

use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::num::NonZeroUsize;

use anyhow::Context;
use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping};
Expand Down Expand Up @@ -116,7 +116,7 @@ crepe::crepe! {
}

/// The distribution (scheduling result) of a fragment.
#[derive(Debug, Clone, EnumAsInner)]
#[derive(Debug, Clone)]
pub(super) enum Distribution {
/// The fragment is singleton and is scheduled to the given worker slot.
Singleton(WorkerSlotId),
Expand All @@ -139,6 +139,13 @@ impl Distribution {
}
}

pub fn mapping(&self) -> Cow<'_, WorkerSlotMapping> {
match self {
Distribution::Singleton(p) => Cow::Owned(WorkerSlotMapping::new_single(*p)),
Distribution::Hash(mapping) => Cow::Borrowed(mapping),
}
}

/// Get the vnode count of the distribution.
pub fn vnode_count(&self) -> usize {
match self {
Expand Down Expand Up @@ -269,8 +276,7 @@ impl Scheduler {
// Build an index map for all hash mappings.
let all_hash_mappings = existing_distribution
.values()
.flat_map(|dist| dist.as_hash())
.cloned()
.map(|dist| dist.mapping().into_owned())
.unique()
.collect_vec();
let hash_mapping_id: HashMap<_, _> = all_hash_mappings
Expand Down

0 comments on commit c6edb30

Please sign in to comment.