Skip to content

Commit

Permalink
fix(meta): support scheduling stream graph without edge (#8149)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
3 people authored Feb 25, 2023
1 parent e6aae70 commit ae0835f
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ enum DistId {
/// Facts as the input of the scheduler.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum Fact {
/// An edge in the stream graph.
/// An internal(building) fragment.
Fragment(Id),
/// An edge in the fragment graph.
Edge {
from: Id,
to: Id,
Expand Down Expand Up @@ -80,10 +82,10 @@ crepe::crepe! {
@input
struct Input(Fact);

struct Fragment(Id);
struct Edge(Id, Id, DispatcherType);
struct ExternalReq(Id, DistId);
struct SingletonReq(Id);
struct Fragment(Id);
struct Requirement(Id, DistId);

@output
Expand All @@ -93,14 +95,11 @@ crepe::crepe! {
struct Failed(Id);

// Extract facts.
Fragment(id) <- Input(f), let Fact::Fragment(id) = f;
Edge(from, to, dt) <- Input(f), let Fact::Edge { from, to, dt } = f;
ExternalReq(id, dist) <- Input(f), let Fact::ExternalReq { id, dist } = f;
SingletonReq(id) <- Input(f), let Fact::SingletonReq(id) = f;

// Internal fragments.
Fragment(x) <- Edge(x, _, _), !ExternalReq(x, _);
Fragment(y) <- Edge(_, y, _), !ExternalReq(y, _);

// Requirements from the facts.
Requirement(x, d) <- ExternalReq(x, d);
// Requirements of `NoShuffle` edges.
Expand Down Expand Up @@ -270,8 +269,9 @@ impl Scheduler {

let mut facts = Vec::new();

// Singletons
// Building fragments and Singletons
for (&id, fragment) in graph.building_fragments() {
facts.push(Fact::Fragment(id));
if fragment.is_singleton {
facts.push(Fact::SingletonReq(id));
}
Expand Down Expand Up @@ -388,13 +388,48 @@ mod tests {
assert!(!failed.is_empty());
}

// 101
#[test]
fn test_single_fragment_hash() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
];

let expected = maplit::hashmap! {
101.into() => Result::DefaultHash,
};

test_success(facts, expected);
}

// 101
#[test]
fn test_single_fragment_singleton() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::SingletonReq(101.into()),
];

let expected = maplit::hashmap! {
101.into() => Result::DefaultSingleton,
};

test_success(facts, expected);
}

// 1 -|-> 101 -->
// 103 --> 104
// 2 -|-> 102 -->
#[test]
fn test_scheduling_mv_on_mv() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::Fragment(104.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::ExternalReq { id: 2.into(), dist: DistId::Singleton(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Expand All @@ -421,6 +456,11 @@ mod tests {
fn test_delta_join() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::Fragment(104.into()),
Fact::Fragment(105.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::ExternalReq { id: 2.into(), dist: DistId::Hash(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Expand Down Expand Up @@ -451,6 +491,9 @@ mod tests {
fn test_singleton_leaf() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Fact::SingletonReq(102.into()), // like `Now`
Expand All @@ -474,6 +517,7 @@ mod tests {
fn test_upstream_hash_shard_failed() {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::ExternalReq { id: 2.into(), dist: DistId::Hash(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Expand Down

0 comments on commit ae0835f

Please sign in to comment.