Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 6, 2024
1 parent 3adb6d4 commit c26f3ab
Showing 1 changed file with 69 additions and 53 deletions.
122 changes: 69 additions & 53 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::collections::{BTreeMap, HashMap};
use std::num::NonZeroUsize;

use anyhow::Context;
use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
Expand Down Expand Up @@ -57,9 +58,25 @@ enum Req {

impl Req {
/// Equivalent to `Req::AnyVnodeCount(1)`.
fn any_singleton() -> Self {
const fn any_singleton() -> Self {
Self::AnyVnodeCount(1)
}

fn merge(a: Self, b: Self, mapping_len: impl Fn(usize) -> usize) -> MetaResult<Self> {
// Note that a and b are always different, as they come from a set.
let merge = |a, b| match (a, b) {
(Self::AnyVnodeCount(1), Self::Singleton(id)) => Some(Self::Singleton(id)),
(Self::AnyVnodeCount(count), Self::Hash(id)) if mapping_len(id) == count => {
Some(Self::Hash(id))
}
_ => None,
};

match merge(a, b).or_else(|| merge(b, a)) {
Some(req) => Ok(req),
None => bail!("incompatible requirements `{a:?}` and `{b:?}`"),
}
}
}

/// Facts as the input of the scheduler.
Expand Down Expand Up @@ -334,22 +351,9 @@ impl Scheduler {
// Merge all requirements.
Some(reqs) => {
let req = (reqs.iter().copied())
.try_reduce(|a, b| {
// Note that a and b are always different, as they come from a set.
let merge = |a, b| match (a, b) {
(Req::AnyVnodeCount(1), Req::Singleton(id)) => Some(Req::Singleton(id)),
(Req::AnyVnodeCount(count), Req::Hash(mapping))
if all_hash_mappings[mapping].len() == count =>
{
Some(Req::Hash(mapping))
}
_ => None,
};

match merge(a, b).or_else(|| merge(b, a)) {
Some(req) => MetaResult::Ok(req),
None => bail!("incompatible requirements `{a:?}` and `{b:?}` for fragment {id:?}"),
}
.try_reduce(|a, b| Req::merge(a, b, |id| all_hash_mappings[id].len()))
.with_context(|| {
format!("cannot fulfill scheduling requirements for fragment {id:?}")
})?
.unwrap();

Expand Down Expand Up @@ -417,40 +421,66 @@ impl Locations {
}

#[cfg(test)]
#[cfg(any())]
mod tests {
use super::*;

fn test_success(facts: impl IntoIterator<Item = Fact>, expected: HashMap<Id, Result>) {
#[derive(Debug)]
enum Result {
DefaultHash,
Required(Req),
}

impl Result {
fn default_singleton() -> Self {
Result::Required(Req::any_singleton())
}
}

fn run_and_merge(facts: impl IntoIterator<Item = Fact>) -> MetaResult<HashMap<Id, Req>> {
let mut crepe = Crepe::new();
crepe.extend(facts.into_iter().map(Input));
let (success, failed) = crepe.run();

assert!(failed.is_empty());
let (reqs,) = crepe.run();

let success: HashMap<_, _> = success
let reqs = reqs
.into_iter()
.map(|Output(id, result)| (id, result))
.collect();
.map(|Requirement(id, req)| (id, req))
.into_group_map();

let mut merged = HashMap::new();
for (id, reqs) in reqs {
let req = (reqs.iter().copied())
.try_reduce(|a, b| Req::merge(a, b, |_| todo!()))
.with_context(|| {
format!("cannot fulfill scheduling requirements for fragment {id:?}")
})?
.unwrap();
merged.insert(id, req);
}

assert_eq!(success, expected);
Ok(merged)
}

fn test_failed(facts: impl IntoIterator<Item = Fact>) {
let mut crepe = Crepe::new();
crepe.extend(facts.into_iter().map(Input));
let (_success, failed) = crepe.run();
fn test_success(facts: impl IntoIterator<Item = Fact>, expected: HashMap<Id, Result>) {
let reqs = run_and_merge(facts).unwrap();

assert!(!failed.is_empty());
for (id, expected) in expected {
match (reqs.get(&id), expected) {
(None, Result::DefaultHash) => {}
(Some(actual), Result::Required(expected)) if *actual == expected => {}
(actual, expected) => panic!("unexpected result for fragment {id:?}\nactual: {actual:?}\nexpected: {expected:?}"),
}
}
}

fn test_failed(facts: impl IntoIterator<Item = Fact>) {
run_and_merge(facts).unwrap_err();
}

// 101
#[test]
fn test_single_fragment_hash() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
];
let facts = [];

let expected = maplit::hashmap! {
101.into() => Result::DefaultHash,
Expand All @@ -464,12 +494,11 @@ mod tests {
fn test_single_fragment_singleton() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::SingletonReq(101.into()),
Fact::Req { id: 101.into(), req: Req::any_singleton() },
];

let expected = maplit::hashmap! {
101.into() => Result::DefaultSingleton,
101.into() => Result::default_singleton(),
};

test_success(facts, expected);
Expand All @@ -482,10 +511,6 @@ mod tests {
fn test_scheduling_mv_on_mv() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::Fragment(104.into()),
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Req { id: 2.into(), req: Req::Singleton(WorkerSlotId::new(0, 2)) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Expand All @@ -499,7 +524,7 @@ mod tests {
101.into() => Result::Required(Req::Hash(1)),
102.into() => Result::Required(Req::Singleton(WorkerSlotId::new(0, 2))),
103.into() => Result::DefaultHash,
104.into() => Result::DefaultSingleton,
104.into() => Result::default_singleton(),
};

test_success(facts, expected);
Expand All @@ -512,11 +537,6 @@ mod tests {
fn test_delta_join() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::Fragment(104.into()),
Fact::Fragment(105.into()),
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Req { id: 2.into(), req: Req::Hash(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Expand Down Expand Up @@ -547,19 +567,16 @@ mod tests {
fn test_singleton_leaf() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Fact::SingletonReq(102.into()), // like `Now`
Fact::Req { id: 102.into(), req: Req::any_singleton() }, // like `Now`
Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash },
Fact::Edge { from: 102.into(), to: 103.into(), dt: Broadcast },
];

let expected = maplit::hashmap! {
101.into() => Result::Required(Req::Hash(1)),
102.into() => Result::DefaultSingleton,
102.into() => Result::default_singleton(),
103.into() => Result::DefaultHash,
};

Expand All @@ -573,7 +590,6 @@ mod tests {
fn test_upstream_hash_shard_failed() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Req { id: 2.into(), req: Req::Hash(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Expand Down

0 comments on commit c26f3ab

Please sign in to comment.