From 75c65afe0a1b02ea37edfceaad70a31df0ff179c Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 27 Mar 2024 04:47:17 +0800 Subject: [PATCH] support scaling source backfill - We must support this? Otherwise scale source will break... We can't migrate actors?? - NOT TESTED Signed-off-by: xxchan --- src/meta/src/barrier/command.rs | 3 +- src/meta/src/stream/scale.rs | 51 +++++++++++++++++++++++---- src/meta/src/stream/source_manager.rs | 48 ++++++++++++++++++++----- 3 files changed, 86 insertions(+), 16 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 865835c49edeb..c60fa79fa3c12 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -73,7 +73,8 @@ pub struct Reschedule { /// The downstream fragments of this fragment. pub downstream_fragment_ids: Vec, - /// Reassigned splits for source actors + /// Reassigned splits for source actors. + /// It becomes the `actor_splits` in `UpdateMutation`. pub actor_splits: HashMap>, /// Whether this fragment is injectable. The injectable means whether the fragment contains diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 5fc455f48c643..c08ee91bdf15b 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -199,8 +199,10 @@ pub struct RescheduleContext { worker_nodes: HashMap, /// Index of all `Actor` upstreams, specific to `Dispatcher` upstream_dispatchers: HashMap>, - /// Fragments with stream source + /// Fragments with `StreamSource` stream_source_fragment_ids: HashSet, + /// Fragments with `StreamSourceBackfill` + stream_source_backfill_fragment_ids: HashSet, /// Target fragments in `NoShuffle` relation no_shuffle_target_fragment_ids: HashSet, /// Source fragments in `NoShuffle` relation @@ -668,6 +670,7 @@ impl ScaleController { } let mut stream_source_fragment_ids = HashSet::new(); + let mut stream_source_backfill_fragment_ids = HashSet::new(); let mut no_shuffle_reschedule = HashMap::new(); for ( fragment_id, @@ -741,6 +744,12 @@ impl ScaleController { stream_source_fragment_ids.insert(*fragment_id); } } + if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 { + let stream_node = fragment.actor_template.nodes.as_ref().unwrap(); + if stream_node.find_source_backfill().is_some() { + stream_source_backfill_fragment_ids.insert(*fragment_id); + } + } // Check if the reschedule plan is valid. let current_parallel_units = fragment @@ -816,6 +825,7 @@ impl ScaleController { worker_nodes, upstream_dispatchers, stream_source_fragment_ids, + stream_source_backfill_fragment_ids, no_shuffle_target_fragment_ids, no_shuffle_source_fragment_ids, fragment_dispatcher_map, @@ -1233,9 +1243,9 @@ impl ScaleController { .await?; } - // For stream source fragments, we need to reallocate the splits. + // For stream source & source backfill fragments, we need to reallocate the splits. // Because we are in the Pause state, so it's no problem to reallocate - let mut fragment_stream_source_actor_splits = HashMap::new(); + let mut fragment_actor_splits = HashMap::new(); for fragment_id in reschedules.keys() { let actors_after_reschedule = fragment_actors_after_reschedule.get(fragment_id).unwrap(); @@ -1253,10 +1263,39 @@ impl ScaleController { let actor_splits = self .source_manager - .migrate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids) + .migrate_splits_for_source_actors( + *fragment_id, + &prev_actor_ids, + &curr_actor_ids, + ) .await?; - fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits); + fragment_actor_splits.insert(*fragment_id, actor_splits); + } + } + // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors + if !ctx.stream_source_backfill_fragment_ids.is_empty() { + for fragment_id in reschedules.keys() { + let actors_after_reschedule = + fragment_actors_after_reschedule.get(fragment_id).unwrap(); + + if ctx + .stream_source_backfill_fragment_ids + .contains(fragment_id) + { + let fragment = ctx.fragment_map.get(fragment_id).unwrap(); + + let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec(); + + let actor_splits = self.source_manager.migrate_splits_for_backfill_actors( + *fragment_id, + &fragment.upstream_fragment_ids, + &curr_actor_ids, + &fragment_actor_splits, + &no_shuffle_upstream_actor_map, + )?; + fragment_actor_splits.insert(*fragment_id, actor_splits); + } } } // TODO: support migrate splits for SourceBackfill @@ -1411,7 +1450,7 @@ impl ScaleController { let upstream_fragment_dispatcher_ids = upstream_fragment_dispatcher_set.into_iter().collect_vec(); - let actor_splits = fragment_stream_source_actor_splits + let actor_splits = fragment_actor_splits .get(&fragment_id) .cloned() .unwrap_or_default(); diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 0fe9d4a961427..ca769084a65e8 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -464,13 +464,13 @@ impl Default for SplitDiffOptions { } /// Reassigns splits if there are new splits or dropped splits, -/// i.e., `actor_splits` and `discovered_splits` differ. +/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled. /// /// The existing splits will remain unmoved in their currently assigned actor. /// /// If an actor has an upstream actor, it should be a backfill executor, -/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case. -/// Use `align_backfill_splits` instead. +/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case. +/// Use `align_backfill_splits` instead.** /// /// - `fragment_id`: just for logging /// @@ -760,11 +760,10 @@ impl SourceManager { /// Migrates splits from previous actors to the new actors for a rescheduled fragment. /// - /// Very occasionally split removal may happen - /// during scaling, in which case we need to use the old splits for reallocation instead of the - /// latest splits (which may be missing), so that we can resolve the split removal in the next - /// command. - pub async fn migrate_splits( + /// Very occasionally split removal may happen during scaling, in which case we need to + /// use the old splits for reallocation instead of the latest splits (which may be missing), + /// so that we can resolve the split removal in the next command. + pub async fn migrate_splits_for_source_actors( &self, fragment_id: FragmentId, prev_actor_ids: &[ActorId], @@ -787,7 +786,7 @@ impl SourceManager { fragment_id, empty_actor_splits, &prev_splits, - // pre-allocate splits is the first time getting splits and it does not have scale in scene + // pre-allocate splits is the first time getting splits and it does not have scale-in scene SplitDiffOptions::default(), ) .unwrap_or_default(); @@ -795,6 +794,37 @@ impl SourceManager { Ok(diff) } + /// Migrates splits from previous actors to the new actors for a rescheduled fragment. + pub fn migrate_splits_for_backfill_actors( + &self, + fragment_id: FragmentId, + upstream_fragment_ids: &Vec, + curr_actor_ids: &[ActorId], + fragment_actor_splits: &HashMap>>, + no_shuffle_upstream_actor_map: &HashMap>, + ) -> MetaResult>> { + // align splits for backfill fragments with its upstream source fragment + debug_assert!(upstream_fragment_ids.len() == 1); + let upstream_fragment_id = upstream_fragment_ids[0]; + let actors = no_shuffle_upstream_actor_map + .iter() + .filter(|(id, _)| curr_actor_ids.contains(id)) + .map(|(id, upstream_fragment_actors)| { + debug_assert!(upstream_fragment_actors.len() == 1); + ( + *id, + vec![*upstream_fragment_actors.get(&upstream_fragment_id).unwrap()], + ) + }); + let upstream_assignment = fragment_actor_splits.get(&upstream_fragment_id).unwrap(); + Ok(align_backfill_splits( + actors, + upstream_assignment, + fragment_id, + upstream_fragment_id, + )?) + } + /// Allocates splits to actors for a newly created source executor. pub async fn allocate_splits(&self, table_id: &TableId) -> MetaResult { let core = self.core.lock().await;