diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 5e17b72017b7a..ec809b58d5503 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -46,12 +46,22 @@ type HashMappingId = usize; /// The internal structure for processing scheduling requirements in the scheduler. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum Req { + /// The fragment must be singleton and is scheduled to the given worker slot. Singleton(WorkerSlotId), + /// The fragment must be hash-distributed and is scheduled by the given hash mapping. Hash(HashMappingId), - AnySingleton, + /// The fragment must have the given vnode count, but can be scheduled anywhere. + /// When the vnode count is 1, it means the fragment must be singleton. AnyVnodeCount(usize), } +impl Req { + /// Equivalent to `Req::AnyVnodeCount(1)`. + fn any_singleton() -> Self { + Self::AnyVnodeCount(1) + } +} + /// Facts as the input of the scheduler. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum Fact { @@ -80,7 +90,7 @@ crepe::crepe! { Requirement(id, req) <- Input(f), let Fact::Req { id, req } = f; // The downstream fragment of a `Simple` edge must be singleton. - Requirement(y, Req::AnySingleton) <- Edge(_, y, Simple); + Requirement(y, Req::any_singleton()) <- Edge(_, y, Simple); // Requirements propagate through `NoShuffle` edges. Requirement(x, d) <- Edge(x, y, NoShuffle), Requirement(y, d); Requirement(y, d) <- Edge(x, y, NoShuffle), Requirement(x, d); @@ -257,7 +267,7 @@ impl Scheduler { if fragment.requires_singleton { facts.push(Fact::Req { id, - req: Req::AnySingleton, + req: Req::any_singleton(), }); } } @@ -325,9 +335,9 @@ impl Scheduler { Some(reqs) => { let req = (reqs.iter().copied()) .try_reduce(|a, b| { - // Note that a and b are never the same as they originate from a set. + // Note that a and b are always different, as they come from a set. let merge = |a, b| match (a, b) { - (Req::AnySingleton, Req::Singleton(id)) => Some(Req::Singleton(id)), + (Req::AnyVnodeCount(1), Req::Singleton(id)) => Some(Req::Singleton(id)), (Req::AnyVnodeCount(count), Req::Hash(mapping)) if all_hash_mappings[mapping].len() == count => { @@ -349,7 +359,7 @@ impl Scheduler { Req::Hash(mapping) => { Distribution::Hash(all_hash_mappings[mapping].clone()) } - Req::AnySingleton => { + Req::AnyVnodeCount(1) => { Distribution::Singleton(self.default_singleton_worker_slot) } Req::AnyVnodeCount(vnode_count) => {