From 235ef353fb89f18c4aa4013531fc6967350aca98 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 2 Feb 2024 17:21:08 +0800 Subject: [PATCH] Refactor ScaleCtrl loop & clarify ID check --- src/meta/src/stream/scale.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 5b1999ae7385b..eca6a78f35e94 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1701,16 +1701,20 @@ impl ScaleController { fragment_actor_id_map: &mut HashMap>, table_fragments: &BTreeMap, ) { - for (table_id, table_fragments) in table_fragments { - let mut actor_fragment_id_map_for_check = HashMap::new(); + // This is only for assertion purposes and will be removed once the dispatcher_id is guaranteed to always correspond to the downstream fragment_id, + // such as through the foreign key constraints in the SQL backend. + let mut actor_fragment_id_map_for_check = HashMap::new(); + for table_fragments in table_fragments.values() { for (fragment_id, fragment) in &table_fragments.fragments { for actor in &fragment.actors { - actor_fragment_id_map_for_check + debug_assert!(actor_fragment_id_map_for_check .insert(actor.actor_id, *fragment_id) - .expect("actor id should be unique"); + .is_none()); } } + } + for (table_id, table_fragments) in table_fragments { for (fragment_id, fragment) in &table_fragments.fragments { for actor in &fragment.actors { fragment_actor_id_map @@ -1724,7 +1728,7 @@ impl ScaleController { .insert(actor.fragment_id as FragmentId); let downstream_actor_id = - dispatcher.downstream_actor_id.first().expect( + dispatcher.downstream_actor_id.iter().exactly_one().expect( "no shuffle should have exactly one downstream actor id", );