diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index ec809b58d5503..5da9ede5dd1ee 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -21,6 +21,7 @@ use std::collections::{BTreeMap, HashMap}; use std::num::NonZeroUsize; +use anyhow::Context; use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; @@ -57,9 +58,25 @@ enum Req { impl Req { /// Equivalent to `Req::AnyVnodeCount(1)`. - fn any_singleton() -> Self { + const fn any_singleton() -> Self { Self::AnyVnodeCount(1) } + + fn merge(a: Self, b: Self, mapping_len: impl Fn(usize) -> usize) -> MetaResult { + // Note that a and b are always different, as they come from a set. + let merge = |a, b| match (a, b) { + (Self::AnyVnodeCount(1), Self::Singleton(id)) => Some(Self::Singleton(id)), + (Self::AnyVnodeCount(count), Self::Hash(id)) if mapping_len(id) == count => { + Some(Self::Hash(id)) + } + _ => None, + }; + + match merge(a, b).or_else(|| merge(b, a)) { + Some(req) => Ok(req), + None => bail!("incompatible requirements `{a:?}` and `{b:?}`"), + } + } } /// Facts as the input of the scheduler. @@ -334,22 +351,9 @@ impl Scheduler { // Merge all requirements. Some(reqs) => { let req = (reqs.iter().copied()) - .try_reduce(|a, b| { - // Note that a and b are always different, as they come from a set. - let merge = |a, b| match (a, b) { - (Req::AnyVnodeCount(1), Req::Singleton(id)) => Some(Req::Singleton(id)), - (Req::AnyVnodeCount(count), Req::Hash(mapping)) - if all_hash_mappings[mapping].len() == count => - { - Some(Req::Hash(mapping)) - } - _ => None, - }; - - match merge(a, b).or_else(|| merge(b, a)) { - Some(req) => MetaResult::Ok(req), - None => bail!("incompatible requirements `{a:?}` and `{b:?}` for fragment {id:?}"), - } + .try_reduce(|a, b| Req::merge(a, b, |id| all_hash_mappings[id].len())) + .with_context(|| { + format!("cannot fulfill scheduling requirements for fragment {id:?}") })? .unwrap(); @@ -417,40 +421,66 @@ impl Locations { } #[cfg(test)] -#[cfg(any())] mod tests { use super::*; - fn test_success(facts: impl IntoIterator, expected: HashMap) { + #[derive(Debug)] + enum Result { + DefaultHash, + Required(Req), + } + + impl Result { + fn default_singleton() -> Self { + Result::Required(Req::any_singleton()) + } + } + + fn run_and_merge(facts: impl IntoIterator) -> MetaResult> { let mut crepe = Crepe::new(); crepe.extend(facts.into_iter().map(Input)); - let (success, failed) = crepe.run(); - - assert!(failed.is_empty()); + let (reqs,) = crepe.run(); - let success: HashMap<_, _> = success + let reqs = reqs .into_iter() - .map(|Output(id, result)| (id, result)) - .collect(); + .map(|Requirement(id, req)| (id, req)) + .into_group_map(); + + let mut merged = HashMap::new(); + for (id, reqs) in reqs { + let req = (reqs.iter().copied()) + .try_reduce(|a, b| Req::merge(a, b, |_| todo!())) + .with_context(|| { + format!("cannot fulfill scheduling requirements for fragment {id:?}") + })? + .unwrap(); + merged.insert(id, req); + } - assert_eq!(success, expected); + Ok(merged) } - fn test_failed(facts: impl IntoIterator) { - let mut crepe = Crepe::new(); - crepe.extend(facts.into_iter().map(Input)); - let (_success, failed) = crepe.run(); + fn test_success(facts: impl IntoIterator, expected: HashMap) { + let reqs = run_and_merge(facts).unwrap(); - assert!(!failed.is_empty()); + for (id, expected) in expected { + match (reqs.get(&id), expected) { + (None, Result::DefaultHash) => {} + (Some(actual), Result::Required(expected)) if *actual == expected => {} + (actual, expected) => panic!("unexpected result for fragment {id:?}\nactual: {actual:?}\nexpected: {expected:?}"), + } + } + } + + fn test_failed(facts: impl IntoIterator) { + run_and_merge(facts).unwrap_err(); } // 101 #[test] fn test_single_fragment_hash() { #[rustfmt::skip] - let facts = [ - Fact::Fragment(101.into()), - ]; + let facts = []; let expected = maplit::hashmap! { 101.into() => Result::DefaultHash, @@ -464,12 +494,11 @@ mod tests { fn test_single_fragment_singleton() { #[rustfmt::skip] let facts = [ - Fact::Fragment(101.into()), - Fact::SingletonReq(101.into()), + Fact::Req { id: 101.into(), req: Req::any_singleton() }, ]; let expected = maplit::hashmap! { - 101.into() => Result::DefaultSingleton, + 101.into() => Result::default_singleton(), }; test_success(facts, expected); @@ -482,10 +511,6 @@ mod tests { fn test_scheduling_mv_on_mv() { #[rustfmt::skip] let facts = [ - Fact::Fragment(101.into()), - Fact::Fragment(102.into()), - Fact::Fragment(103.into()), - Fact::Fragment(104.into()), Fact::Req { id: 1.into(), req: Req::Hash(1) }, Fact::Req { id: 2.into(), req: Req::Singleton(WorkerSlotId::new(0, 2)) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, @@ -499,7 +524,7 @@ mod tests { 101.into() => Result::Required(Req::Hash(1)), 102.into() => Result::Required(Req::Singleton(WorkerSlotId::new(0, 2))), 103.into() => Result::DefaultHash, - 104.into() => Result::DefaultSingleton, + 104.into() => Result::default_singleton(), }; test_success(facts, expected); @@ -512,11 +537,6 @@ mod tests { fn test_delta_join() { #[rustfmt::skip] let facts = [ - Fact::Fragment(101.into()), - Fact::Fragment(102.into()), - Fact::Fragment(103.into()), - Fact::Fragment(104.into()), - Fact::Fragment(105.into()), Fact::Req { id: 1.into(), req: Req::Hash(1) }, Fact::Req { id: 2.into(), req: Req::Hash(2) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, @@ -547,19 +567,16 @@ mod tests { fn test_singleton_leaf() { #[rustfmt::skip] let facts = [ - Fact::Fragment(101.into()), - Fact::Fragment(102.into()), - Fact::Fragment(103.into()), Fact::Req { id: 1.into(), req: Req::Hash(1) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle }, - Fact::SingletonReq(102.into()), // like `Now` + Fact::Req { id: 102.into(), req: Req::any_singleton() }, // like `Now` Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash }, Fact::Edge { from: 102.into(), to: 103.into(), dt: Broadcast }, ]; let expected = maplit::hashmap! { 101.into() => Result::Required(Req::Hash(1)), - 102.into() => Result::DefaultSingleton, + 102.into() => Result::default_singleton(), 103.into() => Result::DefaultHash, }; @@ -573,7 +590,6 @@ mod tests { fn test_upstream_hash_shard_failed() { #[rustfmt::skip] let facts = [ - Fact::Fragment(101.into()), Fact::Req { id: 1.into(), req: Req::Hash(1) }, Fact::Req { id: 2.into(), req: Req::Hash(2) }, Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },