From 7e4cf713a0fc7cb717b7c3f7f89199ca28905156 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 20 Mar 2024 23:20:29 +0800 Subject: [PATCH] feat: throw an error if unable to find downstream actor index during the scaling process. (#15720) Co-authored-by: lmatz --- src/meta/src/stream/scale.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index b3f62fe609990..8aae5c95c5552 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1828,16 +1828,17 @@ impl ScaleController { table_fragment_id_map: &mut HashMap>, fragment_actor_id_map: &mut HashMap>, table_fragments: &BTreeMap, - ) { + ) -> MetaResult<()> { // This is only for assertion purposes and will be removed once the dispatcher_id is guaranteed to always correspond to the downstream fragment_id, // such as through the foreign key constraints in the SQL backend. let mut actor_fragment_id_map_for_check = HashMap::new(); for table_fragments in table_fragments.values() { for (fragment_id, fragment) in &table_fragments.fragments { for actor in &fragment.actors { - debug_assert!(actor_fragment_id_map_for_check - .insert(actor.actor_id, *fragment_id) - .is_none()); + let prev = + actor_fragment_id_map_for_check.insert(actor.actor_id, *fragment_id); + + debug_assert!(prev.is_none()); } } } @@ -1870,9 +1871,10 @@ impl ScaleController { dispatcher.dispatcher_id as FragmentId ); } else { - tracing::warn!( - "downstream actor id {} not found in fragment_actor_id_map", - downstream_actor_id + bail!( + "downstream actor id {} from actor {} not found in fragment_actor_id_map", + downstream_actor_id, + actor.actor_id, ); } @@ -1892,6 +1894,8 @@ impl ScaleController { actor_status.extend(table_fragments.actor_status.clone()); } + + Ok(()) } match &self.metadata_manager { @@ -1905,7 +1909,7 @@ impl ScaleController { &mut table_fragment_id_map, &mut fragment_actor_id_map, guard.table_fragments(), - ); + )?; } MetadataManager::V2(_) => { let all_table_fragments = self.list_all_table_fragments().await?; @@ -1922,7 +1926,7 @@ impl ScaleController { &mut table_fragment_id_map, &mut fragment_actor_id_map, &all_table_fragments, - ); + )?; } }