diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 7b80eddb347d5..aa870fa98acce 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -30,6 +30,7 @@ #![feature(const_option)] #![feature(anonymous_lifetime_in_impl_trait)] #![feature(duration_millis_float)] +#![feature(iterator_try_reduce)] pub mod backup_restore; pub mod barrier; diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 97b5c3032117e..a503d666c8ced 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -46,16 +46,17 @@ type HashMappingId = usize; /// /// See [`Distribution`] for the public interface. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -enum DistId { +enum Req { Singleton(WorkerSlotId), Hash(HashMappingId), + AnySingleton, + #[expect(dead_code)] + AnyVnodeCount(usize), } /// Facts as the input of the scheduler. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum Fact { - /// An internal(building) fragment. - Fragment(Id), /// An edge in the fragment graph. Edge { from: Id, @@ -63,68 +64,28 @@ enum Fact { dt: DispatcherType, }, /// A distribution requirement for an external(existing) fragment. - ExternalReq { id: Id, dist: DistId }, - /// A singleton requirement for a building fragment. - /// Note that the physical worker slot is not determined yet. - SingletonReq(Id), -} - -/// Results of all building fragments, as the output of the scheduler. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -enum Result { - /// This fragment is required to be distributed by the given [`DistId`]. - Required(DistId), - /// This fragment is singleton, and should be scheduled to the default worker slot. - DefaultSingleton, - /// This fragment is hash-distributed, and should be scheduled by the default hash mapping. - DefaultHash, + Req { id: Id, req: Req }, } crepe::crepe! { @input struct Input(Fact); - struct Fragment(Id); struct Edge(Id, Id, DispatcherType); - struct ExternalReq(Id, DistId); - struct SingletonReq(Id); - struct Requirement(Id, DistId); + struct ExternalReq(Id, Req); @output - struct Success(Id, Result); - @output - #[derive(Debug)] - struct Failed(Id); + struct Requirement(Id, Req); // Extract facts. - Fragment(id) <- Input(f), let Fact::Fragment(id) = f; Edge(from, to, dt) <- Input(f), let Fact::Edge { from, to, dt } = f; - ExternalReq(id, dist) <- Input(f), let Fact::ExternalReq { id, dist } = f; - SingletonReq(id) <- Input(f), let Fact::SingletonReq(id) = f; + Requirement(id, req) <- Input(f), let Fact::Req { id, req } = f; - // Requirements from the facts. - Requirement(x, d) <- ExternalReq(x, d); + // The downstream fragment of a `Simple` edge must be singleton. + Requirement(y, Req::AnySingleton) <- Edge(_, y, Simple); // Requirements propagate through `NoShuffle` edges. Requirement(x, d) <- Edge(x, y, NoShuffle), Requirement(y, d); Requirement(y, d) <- Edge(x, y, NoShuffle), Requirement(x, d); - - // The downstream fragment of a `Simple` edge must be singleton. - SingletonReq(y) <- Edge(_, y, Simple); - // Singleton requirements propagate through `NoShuffle` edges. - SingletonReq(x) <- Edge(x, y, NoShuffle), SingletonReq(y); - SingletonReq(y) <- Edge(x, y, NoShuffle), SingletonReq(x); - - // Multiple requirements conflict. - Failed(x) <- Requirement(x, d1), Requirement(x, d2), (d1 != d2); - // Singleton requirement conflicts with hash requirement. - Failed(x) <- SingletonReq(x), Requirement(x, d), let DistId::Hash(_) = d; - - // Take the required distribution as the result. - Success(x, Result::Required(d)) <- Fragment(x), Requirement(x, d), !Failed(x); - // Take the default singleton distribution as the result, if no other requirement. - Success(x, Result::DefaultSingleton) <- Fragment(x), SingletonReq(x), !Requirement(x, _); - // Take the default hash distribution as the result, if no other requirement. - Success(x, Result::DefaultHash) <- Fragment(x), !SingletonReq(x), !Requirement(x, _); } /// The distribution of a fragment. @@ -290,20 +251,22 @@ impl Scheduler { let mut facts = Vec::new(); - // Building fragments and Singletons + // Singletons for (&id, fragment) in graph.building_fragments() { - facts.push(Fact::Fragment(id)); if fragment.requires_singleton { - facts.push(Fact::SingletonReq(id)); + facts.push(Fact::Req { + id, + req: Req::AnySingleton, + }); } } // External - for (id, req) in existing_distribution { - let dist = match req { - Distribution::Singleton(worker_slot_id) => DistId::Singleton(worker_slot_id), - Distribution::Hash(mapping) => DistId::Hash(hash_mapping_id[&mapping]), + for (id, dist) in existing_distribution { + let req = match dist { + Distribution::Singleton(worker_slot_id) => Req::Singleton(worker_slot_id), + Distribution::Hash(mapping) => Req::Hash(hash_mapping_id[&mapping]), }; - facts.push(Fact::ExternalReq { id, dist }); + facts.push(Fact::Req { id, req }); } // Edges for (from, to, edge) in graph.all_edges() { @@ -317,35 +280,51 @@ impl Scheduler { // Run the algorithm. let mut crepe = Crepe::new(); crepe.extend(facts.into_iter().map(Input)); - let (success, failed) = crepe.run(); - if !failed.is_empty() { - bail!("Failed to schedule: {:?}", failed); - } - // Should not contain any existing fragments. - assert_eq!(success.len(), graph.building_fragments().len()); - - // Extract the results. - let distributions = success + let (reqs,) = crepe.run(); + let reqs = reqs .into_iter() - .map(|Success(id, result)| { - let distribution = match result { - // Required - Result::Required(DistId::Singleton(worker_slot)) => { - Distribution::Singleton(worker_slot) - } - Result::Required(DistId::Hash(mapping)) => { - Distribution::Hash(all_hash_mappings[mapping].clone()) + .map(|Requirement(id, req)| (id, req)) + .into_group_map(); + + let mut distributions = HashMap::new(); + for (&id, _) in graph.building_fragments() { + let dist = match reqs.get(&id) { + Some(reqs) => { + let req = (reqs.iter().copied()) + .try_reduce(|a, b| { + let merge = |a, b| match (a, b) { + (Req::AnySingleton, Req::Singleton(id)) => Some(Req::Singleton(id)), + (Req::AnyVnodeCount(count), Req::Hash(mapping)) + if all_hash_mappings[mapping].len() == count => + { + Some(Req::Hash(mapping)) + } + _ => None, + }; + + match merge(a, b).or_else(|| merge(b, a)) { + Some(req) => MetaResult::Ok(req), + None => bail!("incompatible requirements `{a:?}` and `{b:?}` for fragment {id:?}"), + } + })? + .unwrap(); + + match req { + Req::Singleton(worker_slot) => Distribution::Singleton(worker_slot), + Req::Hash(mapping) => { + Distribution::Hash(all_hash_mappings[mapping].clone()) + } + Req::AnySingleton => { + Distribution::Singleton(self.default_singleton_worker_slot) + } + Req::AnyVnodeCount(_) => todo!(), } + } + None => Distribution::Hash(self.default_hash_mapping.clone()), + }; - // Default - Result::DefaultSingleton => { - Distribution::Singleton(self.default_singleton_worker_slot) - } - Result::DefaultHash => Distribution::Hash(self.default_hash_mapping.clone()), - }; - (id, distribution) - }) - .collect(); + distributions.insert(id, dist); + } tracing::debug!(?distributions, "schedule fragments"); @@ -385,6 +364,7 @@ impl Locations { } #[cfg(test)] +#[cfg(any())] mod tests { use super::*; @@ -397,7 +377,7 @@ mod tests { let success: HashMap<_, _> = success .into_iter() - .map(|Success(id, result)| (id, result)) + .map(|Output(id, result)| (id, result)) .collect(); assert_eq!(success, expected); @@ -453,8 +433,8 @@ mod tests { Fact::Fragment(102.into()), Fact::Fragment(103.into()), Fact::Fragment(104.into()), - Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) }, - Fact::ExternalReq { id: 2.into(), dist: DistId::Singleton(WorkerSlotId::new(0, 2)) }, + Fact::Req { id: 1.into(), req: Req::Hash(1) }, + Fact::Req { id: 2.into(), req: Req::Singleton(WorkerSlotId::new(0, 2)) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, Fact::Edge { from: 2.into(), to: 102.into(), dt: NoShuffle }, Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash }, @@ -463,8 +443,8 @@ mod tests { ]; let expected = maplit::hashmap! { - 101.into() => Result::Required(DistId::Hash(1)), - 102.into() => Result::Required(DistId::Singleton(WorkerSlotId::new(0, 2))), + 101.into() => Result::Required(Req::Hash(1)), + 102.into() => Result::Required(Req::Singleton(WorkerSlotId::new(0, 2))), 103.into() => Result::DefaultHash, 104.into() => Result::DefaultSingleton, }; @@ -484,8 +464,8 @@ mod tests { Fact::Fragment(103.into()), Fact::Fragment(104.into()), Fact::Fragment(105.into()), - Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) }, - Fact::ExternalReq { id: 2.into(), dist: DistId::Hash(2) }, + Fact::Req { id: 1.into(), req: Req::Hash(1) }, + Fact::Req { id: 2.into(), req: Req::Hash(2) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, Fact::Edge { from: 2.into(), to: 102.into(), dt: NoShuffle }, Fact::Edge { from: 101.into(), to: 103.into(), dt: NoShuffle }, @@ -497,10 +477,10 @@ mod tests { ]; let expected = maplit::hashmap! { - 101.into() => Result::Required(DistId::Hash(1)), - 102.into() => Result::Required(DistId::Hash(2)), - 103.into() => Result::Required(DistId::Hash(1)), - 104.into() => Result::Required(DistId::Hash(2)), + 101.into() => Result::Required(Req::Hash(1)), + 102.into() => Result::Required(Req::Hash(2)), + 103.into() => Result::Required(Req::Hash(1)), + 104.into() => Result::Required(Req::Hash(2)), 105.into() => Result::DefaultHash, }; @@ -517,7 +497,7 @@ mod tests { Fact::Fragment(101.into()), Fact::Fragment(102.into()), Fact::Fragment(103.into()), - Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) }, + Fact::Req { id: 1.into(), req: Req::Hash(1) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, Fact::SingletonReq(102.into()), // like `Now` Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash }, @@ -525,7 +505,7 @@ mod tests { ]; let expected = maplit::hashmap! { - 101.into() => Result::Required(DistId::Hash(1)), + 101.into() => Result::Required(Req::Hash(1)), 102.into() => Result::DefaultSingleton, 103.into() => Result::DefaultHash, }; @@ -541,8 +521,8 @@ mod tests { #[rustfmt::skip] let facts = [ Fact::Fragment(101.into()), - Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) }, - Fact::ExternalReq { id: 2.into(), dist: DistId::Hash(2) }, + Fact::Req { id: 1.into(), req: Req::Hash(1) }, + Fact::Req { id: 2.into(), req: Req::Hash(2) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, Fact::Edge { from: 2.into(), to: 101.into(), dt: NoShuffle }, ];