diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f4052c404cdc5..54ecb41408ea5 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -665,16 +665,23 @@ impl GlobalBarrierManagerContext { let mut compared_table_parallelisms = table_parallelisms.clone(); - let (reschedule_fragment, _) = self - .scale_controller - .prepare_reschedule_command( - plan, - RescheduleOptions { - resolve_no_shuffle_upstream: true, - }, - Some(&mut compared_table_parallelisms), - ) - .await?; + // skip reschedule if no reschedule is generated. + let reschedule_fragment = if plan.is_empty() { + HashMap::new() + } else { + let (reschedule_fragment, _) = self + .scale_controller + .prepare_reschedule_command( + plan, + RescheduleOptions { + resolve_no_shuffle_upstream: true, + }, + Some(&mut compared_table_parallelisms), + ) + .await?; + + reschedule_fragment + }; // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. debug_assert_eq!(compared_table_parallelisms, table_parallelisms); @@ -794,16 +801,20 @@ impl GlobalBarrierManagerContext { let mut compared_table_parallelisms = table_parallelisms.clone(); - let (reschedule_fragment, applied_reschedules) = self - .scale_controller - .prepare_reschedule_command( - plan, - RescheduleOptions { - resolve_no_shuffle_upstream: true, - }, - Some(&mut compared_table_parallelisms), - ) - .await?; + // skip reschedule if no reschedule is generated. + let (reschedule_fragment, applied_reschedules) = if plan.is_empty() { + (HashMap::new(), HashMap::new()) + } else { + self.scale_controller + .prepare_reschedule_command( + plan, + RescheduleOptions { + resolve_no_shuffle_upstream: true, + }, + Some(&mut compared_table_parallelisms), + ) + .await? + }; // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. debug_assert_eq!(compared_table_parallelisms, table_parallelisms); diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 57f64b7560bc1..0ee1ba6eb4a37 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1695,60 +1695,139 @@ impl ScaleController { }) .collect::>(); - let all_table_fragments = self.list_all_table_fragments().await?; + // index for no shuffle relation + let mut no_shuffle_source_fragment_ids = HashSet::new(); + let mut no_shuffle_target_fragment_ids = HashSet::new(); - // FIXME: only need actor id and dispatcher info, avoid clone it. - let mut actor_map = HashMap::new(); + // index for fragment_id -> distribution_type + let mut fragment_distribution_map = HashMap::new(); + // index for actor -> parallel_unit let mut actor_status = HashMap::new(); - // FIXME: only need fragment distribution info, should avoid clone it. + // index for table_id -> [fragment_id] + let mut table_fragment_id_map = HashMap::new(); + // index for fragment_id -> [actor_id] + let mut fragment_actor_id_map = HashMap::new(); + + // internal helper func for building index + fn build_index( + no_shuffle_source_fragment_ids: &mut HashSet, + no_shuffle_target_fragment_ids: &mut HashSet, + fragment_distribution_map: &mut HashMap, + actor_status: &mut HashMap, + table_fragment_id_map: &mut HashMap>, + fragment_actor_id_map: &mut HashMap>, + table_fragments: &BTreeMap, + ) { + // This is only for assertion purposes and will be removed once the dispatcher_id is guaranteed to always correspond to the downstream fragment_id, + // such as through the foreign key constraints in the SQL backend. + let mut actor_fragment_id_map_for_check = HashMap::new(); + for table_fragments in table_fragments.values() { + for (fragment_id, fragment) in &table_fragments.fragments { + for actor in &fragment.actors { + debug_assert!(actor_fragment_id_map_for_check + .insert(actor.actor_id, *fragment_id) + .is_none()); + } + } + } - let mut table_fragment_map = HashMap::new(); - for table_fragments in all_table_fragments { - let table_id = table_fragments.table_id().table_id; - for (fragment_id, fragment) in table_fragments.fragments { - fragment - .actors - .iter() - .map(|actor| (actor.actor_id, actor)) - .for_each(|(id, actor)| { - actor_map.insert(id as ActorId, actor.clone()); - }); + for (table_id, table_fragments) in table_fragments { + for (fragment_id, fragment) in &table_fragments.fragments { + for actor in &fragment.actors { + fragment_actor_id_map + .entry(*fragment_id) + .or_default() + .insert(actor.actor_id); + + for dispatcher in &actor.dispatcher { + if dispatcher.r#type() == DispatcherType::NoShuffle { + no_shuffle_source_fragment_ids + .insert(actor.fragment_id as FragmentId); + + let downstream_actor_id = + dispatcher.downstream_actor_id.iter().exactly_one().expect( + "no shuffle should have exactly one downstream actor id", + ); + + let downstream_fragment_id = actor_fragment_id_map_for_check + .get(downstream_actor_id) + .unwrap(); + + // dispatcher_id of dispatcher should be exactly same as downstream fragment id + // but we need to check it to make sure + debug_assert_eq!( + *downstream_fragment_id, + dispatcher.dispatcher_id as FragmentId + ); - table_fragment_map - .entry(table_id) - .or_insert(HashMap::new()) - .insert(fragment_id, fragment); - } + no_shuffle_target_fragment_ids + .insert(dispatcher.dispatcher_id as FragmentId); + } + } + } - actor_status.extend(table_fragments.actor_status); - } + fragment_distribution_map.insert(*fragment_id, fragment.distribution_type()); - let mut no_shuffle_source_fragment_ids = HashSet::new(); - let mut no_shuffle_target_fragment_ids = HashSet::new(); + table_fragment_id_map + .entry(table_id.table_id()) + .or_default() + .insert(*fragment_id); + } - Self::build_no_shuffle_relation_index( - &actor_map, - &mut no_shuffle_source_fragment_ids, - &mut no_shuffle_target_fragment_ids, - ); + actor_status.extend(table_fragments.actor_status.clone()); + } + } + + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let guard = mgr.fragment_manager.get_fragment_read_guard().await; + build_index( + &mut no_shuffle_source_fragment_ids, + &mut no_shuffle_target_fragment_ids, + &mut fragment_distribution_map, + &mut actor_status, + &mut table_fragment_id_map, + &mut fragment_actor_id_map, + guard.table_fragments(), + ); + } + MetadataManager::V2(_) => { + let all_table_fragments = self.list_all_table_fragments().await?; + let all_table_fragments = all_table_fragments + .into_iter() + .map(|table_fragments| (table_fragments.table_id(), table_fragments)) + .collect::>(); + + build_index( + &mut no_shuffle_source_fragment_ids, + &mut no_shuffle_target_fragment_ids, + &mut fragment_distribution_map, + &mut actor_status, + &mut table_fragment_id_map, + &mut fragment_actor_id_map, + &all_table_fragments, + ); + } + } let mut target_plan = HashMap::new(); for (table_id, parallelism) in table_parallelisms { - let fragment_map = table_fragment_map.remove(&table_id).unwrap(); + let fragment_map = table_fragment_id_map.remove(&table_id).unwrap(); - for (fragment_id, fragment) in fragment_map { + for fragment_id in fragment_map { // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream. if no_shuffle_target_fragment_ids.contains(&fragment_id) { continue; } - let fragment_parallel_unit_ids: BTreeSet<_> = fragment - .actors + let fragment_parallel_unit_ids: BTreeSet = fragment_actor_id_map + .get(&fragment_id) + .unwrap() .iter() - .map(|actor| { + .map(|actor_id| { actor_status - .get(&actor.actor_id) + .get(actor_id) .and_then(|status| status.parallel_unit.clone()) .unwrap() .id as ParallelUnitId @@ -1765,7 +1844,7 @@ impl ScaleController { ); } - match fragment.get_distribution_type().unwrap() { + match fragment_distribution_map.get(&fragment_id).unwrap() { FragmentDistributionType::Unspecified => unreachable!(), FragmentDistributionType::Single => { let single_parallel_unit_id =