From c4de0ca8959a0f3980ab1a9f8fb754ca18d08725 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 8 Nov 2024 16:07:45 +0800 Subject: [PATCH] fix: Single fragment scaling is too conservative (#19259) Signed-off-by: Shanicky Chen --- src/meta/src/stream/scale.rs | 74 +++++++++++++++--------------------- 1 file changed, 31 insertions(+), 43 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2dbd6364d4e2c..0aff0102ecb17 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -968,9 +968,9 @@ impl ScaleController { return; } - let fragment = ctx.fragment_map.get(fragment_id).unwrap(); + let fragment = &ctx.fragment_map[fragment_id]; - let upstream_fragment = ctx.fragment_map.get(upstream_fragment_id).unwrap(); + let upstream_fragment = &ctx.fragment_map[upstream_fragment_id]; // build actor group map for upstream_actor in &upstream_fragment.actors { @@ -993,8 +993,7 @@ impl ScaleController { (upstream_fragment.fragment_id, upstream_actor.actor_id), ); } else { - let root_actor_id = - *actor_group_map.get(&upstream_actor.actor_id).unwrap(); + let root_actor_id = actor_group_map[&upstream_actor.actor_id]; actor_group_map.insert(downstream_actor_id, root_actor_id); } @@ -1182,7 +1181,7 @@ impl ScaleController { .cloned() .unwrap_or_default(); - let fragment = ctx.fragment_map.get(fragment_id).unwrap(); + let fragment = &ctx.fragment_map[fragment_id]; assert!(!fragment.actors.is_empty()); @@ -1223,11 +1222,10 @@ impl ScaleController { // Because we are in the Pause state, so it's no problem to reallocate let mut fragment_actor_splits = HashMap::new(); for fragment_id in reschedules.keys() { - let actors_after_reschedule = - fragment_actors_after_reschedule.get(fragment_id).unwrap(); + let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id]; if ctx.stream_source_fragment_ids.contains(fragment_id) { - let fragment = ctx.fragment_map.get(fragment_id).unwrap(); + let fragment = &ctx.fragment_map[fragment_id]; let prev_actor_ids = fragment .actors @@ -1257,14 +1255,13 @@ impl ScaleController { // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors if !ctx.stream_source_backfill_fragment_ids.is_empty() { for fragment_id in reschedules.keys() { - let actors_after_reschedule = - fragment_actors_after_reschedule.get(fragment_id).unwrap(); + let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id]; if ctx .stream_source_backfill_fragment_ids .contains(fragment_id) { - let fragment = ctx.fragment_map.get(fragment_id).unwrap(); + let fragment = &ctx.fragment_map[fragment_id]; let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec(); @@ -1314,12 +1311,11 @@ impl ScaleController { .into_keys() .collect(); - let actors_after_reschedule = - fragment_actors_after_reschedule.get(&fragment_id).unwrap(); + let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id]; assert!(!actors_after_reschedule.is_empty()); - let fragment = ctx.fragment_map.get(&fragment_id).unwrap(); + let fragment = &ctx.fragment_map[&fragment_id]; let in_degree_types: HashSet<_> = fragment .upstream_fragment_ids @@ -1575,7 +1571,7 @@ impl ScaleController { no_shuffle_downstream_actors_map: &HashMap>, new_actor: &mut PbStreamActor, ) -> MetaResult<()> { - let fragment = &ctx.fragment_map.get(&new_actor.fragment_id).unwrap(); + let fragment = &ctx.fragment_map[&new_actor.fragment_id]; let mut applied_upstream_fragment_actor_ids = HashMap::new(); for upstream_fragment_id in &fragment.upstream_fragment_ids { @@ -1588,7 +1584,7 @@ impl ScaleController { match upstream_dispatch_type { DispatcherType::Unspecified => unreachable!(), DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => { - let upstream_fragment = &ctx.fragment_map.get(upstream_fragment_id).unwrap(); + let upstream_fragment = &ctx.fragment_map[upstream_fragment_id]; let mut upstream_actor_ids = upstream_fragment .actors .iter() @@ -1946,10 +1942,9 @@ impl ScaleController { let mut fragment_slots: BTreeMap = BTreeMap::new(); - for actor_id in fragment_actor_id_map.get(&fragment_id).unwrap() { - let worker_id = actor_location.get(actor_id).unwrap(); - - *fragment_slots.entry(*worker_id).or_default() += 1; + for actor_id in &fragment_actor_id_map[&fragment_id] { + let worker_id = actor_location[actor_id]; + *fragment_slots.entry(worker_id).or_default() += 1; } let all_available_slots: usize = schedulable_worker_slots.values().cloned().sum(); @@ -1961,22 +1956,19 @@ impl ScaleController { ); } - let &(dist, vnode_count) = fragment_distribution_map.get(&fragment_id).unwrap(); + let (dist, vnode_count) = fragment_distribution_map[&fragment_id]; let max_parallelism = vnode_count; match dist { FragmentDistributionType::Unspecified => unreachable!(), FragmentDistributionType::Single => { - let (single_worker_id, should_be_one) = - fragment_slots.iter().exactly_one().unwrap(); + let (single_worker_id, should_be_one) = fragment_slots + .iter() + .exactly_one() + .expect("single fragment should have only one worker slot"); assert_eq!(*should_be_one, 1); - if schedulable_worker_slots.contains_key(single_worker_id) { - // NOTE: shall we continue? - continue; - } - let units = schedule_units_for_slots(&schedulable_worker_slots, 1, table_id)?; @@ -1988,7 +1980,11 @@ impl ScaleController { })?; assert_eq!(*should_be_one, 1); - assert_ne!(*chosen_target_worker_id, *single_worker_id); + + if *chosen_target_worker_id == *single_worker_id { + tracing::debug!("single fragment {fragment_id} already on target worker {chosen_target_worker_id}"); + continue; + } target_plan.insert( fragment_id, @@ -2188,17 +2184,13 @@ impl ScaleController { } // for upstream - for upstream_fragment_id in &fragment_map - .get(&fragment_id) - .unwrap() - .upstream_fragment_ids - { + for upstream_fragment_id in &fragment_map[&fragment_id].upstream_fragment_ids { if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) { continue; } - let table_id = fragment_to_table.get(&fragment_id).unwrap(); - let upstream_table_id = fragment_to_table.get(upstream_fragment_id).unwrap(); + let table_id = &fragment_to_table[&fragment_id]; + let upstream_table_id = &fragment_to_table[upstream_fragment_id]; // Only custom parallelism will be propagated to the no shuffle upstream. if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) { @@ -2255,16 +2247,12 @@ impl ScaleController { } // for upstream - for upstream_fragment_id in &fragment_map - .get(&fragment_id) - .unwrap() - .upstream_fragment_ids - { + for upstream_fragment_id in &fragment_map[&fragment_id].upstream_fragment_ids { if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) { continue; } - let reschedule_plan = reschedule.get(&fragment_id).unwrap(); + let reschedule_plan = &reschedule[&fragment_id]; if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) { if upstream_reschedule_plan != reschedule_plan { @@ -2728,7 +2716,7 @@ impl ConsistentHashRing { let ring_range = self.ring.range(task_hash..).chain(self.ring.iter()); for (_, &worker_id) in ring_range { - let task_limit = *soft_limits.get(&worker_id).unwrap(); + let task_limit = soft_limits[&worker_id]; let worker_task_count = task_distribution.entry(worker_id).or_insert(0);