Skip to content

Commit

Permalink
refactor(meta): extend streaming scheduler to be aware of vnode count
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Nov 5, 2024
1 parent f3e9a3b commit 90ddcfc
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 98 deletions.
1 change: 1 addition & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#![feature(const_option)]
#![feature(anonymous_lifetime_in_impl_trait)]
#![feature(duration_millis_float)]
#![feature(iterator_try_reduce)]

pub mod backup_restore;
pub mod barrier;
Expand Down
176 changes: 78 additions & 98 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,85 +46,46 @@ type HashMappingId = usize;
///
/// See [`Distribution`] for the public interface.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum DistId {
enum Req {
Singleton(WorkerSlotId),
Hash(HashMappingId),
AnySingleton,
#[expect(dead_code)]
AnyVnodeCount(usize),
}

/// Facts as the input of the scheduler.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum Fact {
/// An internal(building) fragment.
Fragment(Id),
/// An edge in the fragment graph.
Edge {
from: Id,
to: Id,
dt: DispatcherType,
},
/// A distribution requirement for an external(existing) fragment.
ExternalReq { id: Id, dist: DistId },
/// A singleton requirement for a building fragment.
/// Note that the physical worker slot is not determined yet.
SingletonReq(Id),
}

/// Results of all building fragments, as the output of the scheduler.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum Result {
/// This fragment is required to be distributed by the given [`DistId`].
Required(DistId),
/// This fragment is singleton, and should be scheduled to the default worker slot.
DefaultSingleton,
/// This fragment is hash-distributed, and should be scheduled by the default hash mapping.
DefaultHash,
Req { id: Id, req: Req },
}

crepe::crepe! {
@input
struct Input(Fact);

struct Fragment(Id);
struct Edge(Id, Id, DispatcherType);
struct ExternalReq(Id, DistId);
struct SingletonReq(Id);
struct Requirement(Id, DistId);
struct ExternalReq(Id, Req);

@output
struct Success(Id, Result);
@output
#[derive(Debug)]
struct Failed(Id);
struct Requirement(Id, Req);

// Extract facts.
Fragment(id) <- Input(f), let Fact::Fragment(id) = f;
Edge(from, to, dt) <- Input(f), let Fact::Edge { from, to, dt } = f;
ExternalReq(id, dist) <- Input(f), let Fact::ExternalReq { id, dist } = f;
SingletonReq(id) <- Input(f), let Fact::SingletonReq(id) = f;
Requirement(id, req) <- Input(f), let Fact::Req { id, req } = f;

// Requirements from the facts.
Requirement(x, d) <- ExternalReq(x, d);
// The downstream fragment of a `Simple` edge must be singleton.
Requirement(y, Req::AnySingleton) <- Edge(_, y, Simple);
// Requirements propagate through `NoShuffle` edges.
Requirement(x, d) <- Edge(x, y, NoShuffle), Requirement(y, d);
Requirement(y, d) <- Edge(x, y, NoShuffle), Requirement(x, d);

// The downstream fragment of a `Simple` edge must be singleton.
SingletonReq(y) <- Edge(_, y, Simple);
// Singleton requirements propagate through `NoShuffle` edges.
SingletonReq(x) <- Edge(x, y, NoShuffle), SingletonReq(y);
SingletonReq(y) <- Edge(x, y, NoShuffle), SingletonReq(x);

// Multiple requirements conflict.
Failed(x) <- Requirement(x, d1), Requirement(x, d2), (d1 != d2);
// Singleton requirement conflicts with hash requirement.
Failed(x) <- SingletonReq(x), Requirement(x, d), let DistId::Hash(_) = d;

// Take the required distribution as the result.
Success(x, Result::Required(d)) <- Fragment(x), Requirement(x, d), !Failed(x);
// Take the default singleton distribution as the result, if no other requirement.
Success(x, Result::DefaultSingleton) <- Fragment(x), SingletonReq(x), !Requirement(x, _);
// Take the default hash distribution as the result, if no other requirement.
Success(x, Result::DefaultHash) <- Fragment(x), !SingletonReq(x), !Requirement(x, _);
}

/// The distribution of a fragment.
Expand Down Expand Up @@ -290,20 +251,22 @@ impl Scheduler {

let mut facts = Vec::new();

// Building fragments and Singletons
// Singletons
for (&id, fragment) in graph.building_fragments() {
facts.push(Fact::Fragment(id));
if fragment.requires_singleton {
facts.push(Fact::SingletonReq(id));
facts.push(Fact::Req {
id,
req: Req::AnySingleton,
});
}
}
// External
for (id, req) in existing_distribution {
let dist = match req {
Distribution::Singleton(worker_slot_id) => DistId::Singleton(worker_slot_id),
Distribution::Hash(mapping) => DistId::Hash(hash_mapping_id[&mapping]),
for (id, dist) in existing_distribution {
let req = match dist {
Distribution::Singleton(worker_slot_id) => Req::Singleton(worker_slot_id),
Distribution::Hash(mapping) => Req::Hash(hash_mapping_id[&mapping]),
};
facts.push(Fact::ExternalReq { id, dist });
facts.push(Fact::Req { id, req });
}
// Edges
for (from, to, edge) in graph.all_edges() {
Expand All @@ -317,35 +280,51 @@ impl Scheduler {
// Run the algorithm.
let mut crepe = Crepe::new();
crepe.extend(facts.into_iter().map(Input));
let (success, failed) = crepe.run();
if !failed.is_empty() {
bail!("Failed to schedule: {:?}", failed);
}
// Should not contain any existing fragments.
assert_eq!(success.len(), graph.building_fragments().len());

// Extract the results.
let distributions = success
let (reqs,) = crepe.run();
let reqs = reqs
.into_iter()
.map(|Success(id, result)| {
let distribution = match result {
// Required
Result::Required(DistId::Singleton(worker_slot)) => {
Distribution::Singleton(worker_slot)
}
Result::Required(DistId::Hash(mapping)) => {
Distribution::Hash(all_hash_mappings[mapping].clone())
.map(|Requirement(id, req)| (id, req))
.into_group_map();

let mut distributions = HashMap::new();
for (&id, _) in graph.building_fragments() {
let dist = match reqs.get(&id) {
Some(reqs) => {
let req = (reqs.iter().copied())
.try_reduce(|a, b| {
let merge = |a, b| match (a, b) {
(Req::AnySingleton, Req::Singleton(id)) => Some(Req::Singleton(id)),
(Req::AnyVnodeCount(count), Req::Hash(mapping))
if all_hash_mappings[mapping].len() == count =>
{
Some(Req::Hash(mapping))
}
_ => None,
};

match merge(a, b).or_else(|| merge(b, a)) {
Some(req) => MetaResult::Ok(req),
None => bail!("incompatible requirements `{a:?}` and `{b:?}` for fragment {id:?}"),
}
})?
.unwrap();

match req {
Req::Singleton(worker_slot) => Distribution::Singleton(worker_slot),
Req::Hash(mapping) => {
Distribution::Hash(all_hash_mappings[mapping].clone())
}
Req::AnySingleton => {
Distribution::Singleton(self.default_singleton_worker_slot)
}
Req::AnyVnodeCount(_) => todo!(),
}
}
None => Distribution::Hash(self.default_hash_mapping.clone()),
};

// Default
Result::DefaultSingleton => {
Distribution::Singleton(self.default_singleton_worker_slot)
}
Result::DefaultHash => Distribution::Hash(self.default_hash_mapping.clone()),
};
(id, distribution)
})
.collect();
distributions.insert(id, dist);
}

tracing::debug!(?distributions, "schedule fragments");

Expand Down Expand Up @@ -385,6 +364,7 @@ impl Locations {
}

#[cfg(test)]
#[cfg(any())]
mod tests {
use super::*;

Expand All @@ -397,7 +377,7 @@ mod tests {

let success: HashMap<_, _> = success
.into_iter()
.map(|Success(id, result)| (id, result))
.map(|Output(id, result)| (id, result))
.collect();

assert_eq!(success, expected);
Expand Down Expand Up @@ -453,8 +433,8 @@ mod tests {
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::Fragment(104.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::ExternalReq { id: 2.into(), dist: DistId::Singleton(WorkerSlotId::new(0, 2)) },
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Req { id: 2.into(), req: Req::Singleton(WorkerSlotId::new(0, 2)) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Fact::Edge { from: 2.into(), to: 102.into(), dt: NoShuffle },
Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash },
Expand All @@ -463,8 +443,8 @@ mod tests {
];

let expected = maplit::hashmap! {
101.into() => Result::Required(DistId::Hash(1)),
102.into() => Result::Required(DistId::Singleton(WorkerSlotId::new(0, 2))),
101.into() => Result::Required(Req::Hash(1)),
102.into() => Result::Required(Req::Singleton(WorkerSlotId::new(0, 2))),
103.into() => Result::DefaultHash,
104.into() => Result::DefaultSingleton,
};
Expand All @@ -484,8 +464,8 @@ mod tests {
Fact::Fragment(103.into()),
Fact::Fragment(104.into()),
Fact::Fragment(105.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::ExternalReq { id: 2.into(), dist: DistId::Hash(2) },
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Req { id: 2.into(), req: Req::Hash(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Fact::Edge { from: 2.into(), to: 102.into(), dt: NoShuffle },
Fact::Edge { from: 101.into(), to: 103.into(), dt: NoShuffle },
Expand All @@ -497,10 +477,10 @@ mod tests {
];

let expected = maplit::hashmap! {
101.into() => Result::Required(DistId::Hash(1)),
102.into() => Result::Required(DistId::Hash(2)),
103.into() => Result::Required(DistId::Hash(1)),
104.into() => Result::Required(DistId::Hash(2)),
101.into() => Result::Required(Req::Hash(1)),
102.into() => Result::Required(Req::Hash(2)),
103.into() => Result::Required(Req::Hash(1)),
104.into() => Result::Required(Req::Hash(2)),
105.into() => Result::DefaultHash,
};

Expand All @@ -517,15 +497,15 @@ mod tests {
Fact::Fragment(101.into()),
Fact::Fragment(102.into()),
Fact::Fragment(103.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Fact::SingletonReq(102.into()), // like `Now`
Fact::Edge { from: 101.into(), to: 103.into(), dt: Hash },
Fact::Edge { from: 102.into(), to: 103.into(), dt: Broadcast },
];

let expected = maplit::hashmap! {
101.into() => Result::Required(DistId::Hash(1)),
101.into() => Result::Required(Req::Hash(1)),
102.into() => Result::DefaultSingleton,
103.into() => Result::DefaultHash,
};
Expand All @@ -541,8 +521,8 @@ mod tests {
#[rustfmt::skip]
let facts = [
Fact::Fragment(101.into()),
Fact::ExternalReq { id: 1.into(), dist: DistId::Hash(1) },
Fact::ExternalReq { id: 2.into(), dist: DistId::Hash(2) },
Fact::Req { id: 1.into(), req: Req::Hash(1) },
Fact::Req { id: 2.into(), req: Req::Hash(2) },
Fact::Edge { from: 1.into(), to: 101.into(), dt: NoShuffle },
Fact::Edge { from: 2.into(), to: 101.into(), dt: NoShuffle },
];
Expand Down

0 comments on commit 90ddcfc

Please sign in to comment.