From 2fd465b60cf08c9b8abe40ab98e4a73d031831d7 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 6 Nov 2024 14:43:48 +0800 Subject: [PATCH] extract vnode count requirement from table lookup Signed-off-by: Bugen Zhao --- src/meta/src/stream/stream_graph/schedule.rs | 53 +++++++++++++++++--- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 0301170f29316..a9a4cb8e8ab36 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,8 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::util::stream_graph_visitor::visit_fragment; use risingwave_common::{bail, hash}; use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -50,7 +51,6 @@ enum Req { Singleton(WorkerSlotId), Hash(HashMappingId), AnySingleton, - #[expect(dead_code)] AnyVnodeCount(usize), } @@ -168,6 +168,8 @@ impl Distribution { /// [`Scheduler`] schedules the distribution of fragments in a stream graph. pub(super) struct Scheduler { + scheduled_worker_slots: Vec, + /// The default hash mapping for hash-distributed fragments, if there's no requirement derived. default_hash_mapping: WorkerSlotMapping, @@ -219,10 +221,10 @@ impl Scheduler { let single_scheduled = schedule_units_for_slots(&slots, 1, streaming_job_id)?; let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap(); - let default_singleton_worker_slot = WorkerSlotId::new(default_single_worker_id as _, 0); Ok(Self { + scheduled_worker_slots, default_hash_mapping, default_singleton_worker_slot, }) @@ -251,7 +253,7 @@ impl Scheduler { let mut facts = Vec::new(); - // Singletons + // Singletons. for (&id, fragment) in graph.building_fragments() { if fragment.requires_singleton { facts.push(Fact::Req { @@ -260,7 +262,37 @@ impl Scheduler { }); } } - // External + // Vnode count requirements if looking up existing tables. + for (&id, fragment) in graph.building_fragments() { + visit_fragment(&mut (*fragment).clone(), |node| { + use risingwave_pb::stream_plan::stream_node::NodeBody; + let vnode_count = match node { + NodeBody::StreamScan(node) => { + if let Some(table) = &node.arrangement_table { + table.vnode_count() + } else if let Some(table) = &node.table_desc { + table.vnode_count() + } else { + return; + } + } + NodeBody::TemporalJoin(node) => node.get_table_desc().unwrap().vnode_count(), + NodeBody::BatchPlan(node) => node.get_table_desc().unwrap().vnode_count(), + NodeBody::Lookup(node) => node + .get_arrangement_table_info() + .unwrap() + .get_table_desc() + .unwrap() + .vnode_count(), + _ => return, + }; + facts.push(Fact::Req { + id, + req: Req::AnyVnodeCount(vnode_count), + }); + }); + } + // Distributions of existing fragments. for (id, dist) in existing_distribution { let req = match dist { Distribution::Singleton(worker_slot_id) => Req::Singleton(worker_slot_id), @@ -268,7 +300,7 @@ impl Scheduler { }; facts.push(Fact::Req { id, req }); } - // Edges + // Edges. for (from, to, edge) in graph.all_edges() { facts.push(Fact::Edge { from, @@ -317,7 +349,14 @@ impl Scheduler { Req::AnySingleton => { Distribution::Singleton(self.default_singleton_worker_slot) } - Req::AnyVnodeCount(_) => todo!(), + Req::AnyVnodeCount(vnode_count) => { + let len = self.scheduled_worker_slots.len().min(vnode_count); + let mapping = WorkerSlotMapping::build_from_ids( + &self.scheduled_worker_slots[..len], + vnode_count, + ); + Distribution::Hash(mapping) + } } } None => Distribution::Hash(self.default_hash_mapping.clone()),