Skip to content

Commit

Permalink
extract vnode count requirement from table lookup
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 6, 2024
1 parent 289d0cb commit 2fd465b
Showing 1 changed file with 46 additions and 7 deletions.
53 changes: 46 additions & 7 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::util::stream_graph_visitor::visit_fragment;
use risingwave_common::{bail, hash};
use risingwave_meta_model::WorkerId;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand All @@ -50,7 +51,6 @@ enum Req {
Singleton(WorkerSlotId),
Hash(HashMappingId),
AnySingleton,
#[expect(dead_code)]
AnyVnodeCount(usize),
}

Expand Down Expand Up @@ -168,6 +168,8 @@ impl Distribution {

/// [`Scheduler`] schedules the distribution of fragments in a stream graph.
pub(super) struct Scheduler {
scheduled_worker_slots: Vec<WorkerSlotId>,

/// The default hash mapping for hash-distributed fragments, if there's no requirement derived.
default_hash_mapping: WorkerSlotMapping,

Expand Down Expand Up @@ -219,10 +221,10 @@ impl Scheduler {

let single_scheduled = schedule_units_for_slots(&slots, 1, streaming_job_id)?;
let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap();

let default_singleton_worker_slot = WorkerSlotId::new(default_single_worker_id as _, 0);

Ok(Self {
scheduled_worker_slots,
default_hash_mapping,
default_singleton_worker_slot,
})
Expand Down Expand Up @@ -251,7 +253,7 @@ impl Scheduler {

let mut facts = Vec::new();

// Singletons
// Singletons.
for (&id, fragment) in graph.building_fragments() {
if fragment.requires_singleton {
facts.push(Fact::Req {
Expand All @@ -260,15 +262,45 @@ impl Scheduler {
});
}
}
// External
// Vnode count requirements if looking up existing tables.
for (&id, fragment) in graph.building_fragments() {
visit_fragment(&mut (*fragment).clone(), |node| {
use risingwave_pb::stream_plan::stream_node::NodeBody;
let vnode_count = match node {
NodeBody::StreamScan(node) => {
if let Some(table) = &node.arrangement_table {
table.vnode_count()
} else if let Some(table) = &node.table_desc {
table.vnode_count()
} else {
return;
}
}
NodeBody::TemporalJoin(node) => node.get_table_desc().unwrap().vnode_count(),
NodeBody::BatchPlan(node) => node.get_table_desc().unwrap().vnode_count(),
NodeBody::Lookup(node) => node
.get_arrangement_table_info()
.unwrap()
.get_table_desc()
.unwrap()
.vnode_count(),
_ => return,
};
facts.push(Fact::Req {
id,
req: Req::AnyVnodeCount(vnode_count),
});
});
}
// Distributions of existing fragments.
for (id, dist) in existing_distribution {
let req = match dist {
Distribution::Singleton(worker_slot_id) => Req::Singleton(worker_slot_id),
Distribution::Hash(mapping) => Req::Hash(hash_mapping_id[&mapping]),
};
facts.push(Fact::Req { id, req });
}
// Edges
// Edges.
for (from, to, edge) in graph.all_edges() {
facts.push(Fact::Edge {
from,
Expand Down Expand Up @@ -317,7 +349,14 @@ impl Scheduler {
Req::AnySingleton => {
Distribution::Singleton(self.default_singleton_worker_slot)
}
Req::AnyVnodeCount(_) => todo!(),
Req::AnyVnodeCount(vnode_count) => {
let len = self.scheduled_worker_slots.len().min(vnode_count);
let mapping = WorkerSlotMapping::build_from_ids(
&self.scheduled_worker_slots[..len],
vnode_count,
);
Distribution::Hash(mapping)
}
}
}
None => Distribution::Hash(self.default_hash_mapping.clone()),
Expand Down

0 comments on commit 2fd465b

Please sign in to comment.