Skip to content

Commit

Permalink
remove any singleton
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 6, 2024
1 parent 5eacfcf commit 3adb6d4
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,22 @@ type HashMappingId = usize;
/// The internal structure for processing scheduling requirements in the scheduler.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum Req {
/// The fragment must be singleton and is scheduled to the given worker slot.
Singleton(WorkerSlotId),
/// The fragment must be hash-distributed and is scheduled by the given hash mapping.
Hash(HashMappingId),
AnySingleton,
/// The fragment must have the given vnode count, but can be scheduled anywhere.
/// When the vnode count is 1, it means the fragment must be singleton.
AnyVnodeCount(usize),
}

impl Req {
/// Equivalent to `Req::AnyVnodeCount(1)`.
fn any_singleton() -> Self {
Self::AnyVnodeCount(1)
}
}

/// Facts as the input of the scheduler.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum Fact {
Expand Down Expand Up @@ -80,7 +90,7 @@ crepe::crepe! {
Requirement(id, req) <- Input(f), let Fact::Req { id, req } = f;

// The downstream fragment of a `Simple` edge must be singleton.
Requirement(y, Req::AnySingleton) <- Edge(_, y, Simple);
Requirement(y, Req::any_singleton()) <- Edge(_, y, Simple);
// Requirements propagate through `NoShuffle` edges.
Requirement(x, d) <- Edge(x, y, NoShuffle), Requirement(y, d);
Requirement(y, d) <- Edge(x, y, NoShuffle), Requirement(x, d);
Expand Down Expand Up @@ -257,7 +267,7 @@ impl Scheduler {
if fragment.requires_singleton {
facts.push(Fact::Req {
id,
req: Req::AnySingleton,
req: Req::any_singleton(),
});
}
}
Expand Down Expand Up @@ -325,9 +335,9 @@ impl Scheduler {
Some(reqs) => {
let req = (reqs.iter().copied())
.try_reduce(|a, b| {
// Note that a and b are never the same as they originate from a set.
// Note that a and b are always different, as they come from a set.
let merge = |a, b| match (a, b) {
(Req::AnySingleton, Req::Singleton(id)) => Some(Req::Singleton(id)),
(Req::AnyVnodeCount(1), Req::Singleton(id)) => Some(Req::Singleton(id)),
(Req::AnyVnodeCount(count), Req::Hash(mapping))
if all_hash_mappings[mapping].len() == count =>
{
Expand All @@ -349,7 +359,7 @@ impl Scheduler {
Req::Hash(mapping) => {
Distribution::Hash(all_hash_mappings[mapping].clone())
}
Req::AnySingleton => {
Req::AnyVnodeCount(1) => {
Distribution::Singleton(self.default_singleton_worker_slot)
}
Req::AnyVnodeCount(vnode_count) => {
Expand Down

0 comments on commit 3adb6d4

Please sign in to comment.