From cf160eee720ef8a93725e7d7b7a65a3ef84d1f8c Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 23 Dec 2023 16:32:24 +0800 Subject: [PATCH] feat(meta): align splits with source actor with an upstream --- src/meta/src/barrier/command.rs | 4 +- src/meta/src/manager/catalog/fragment.rs | 25 +++++++- src/meta/src/stream/source_manager.rs | 80 +++++++++++++++++++++--- 3 files changed, 97 insertions(+), 12 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 42d16a436b3b..1bda0d193a5a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -461,7 +461,7 @@ impl CommandContext { // Find the actors of the upstream fragment. let upstream_actor_ids = self .fragment_manager - .get_running_actors_of_fragment(upstream_fragment_id) + .get_running_actor_ids_of_fragment(upstream_fragment_id) .await?; // Record updates for all actors. @@ -495,7 +495,7 @@ impl CommandContext { // Find the actors of the downstream fragment. let downstream_actor_ids = self .fragment_manager - .get_running_actors_of_fragment(downstream_fragment_id) + .get_running_actor_ids_of_fragment(downstream_fragment_id) .await?; // Downstream removed actors should be skipped diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index f093bbc9081d..7479512a5e50 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1034,7 +1034,7 @@ impl FragmentManager { } /// Get the actor ids of the fragment with `fragment_id` with `Running` status. - pub async fn get_running_actors_of_fragment( + pub async fn get_running_actor_ids_of_fragment( &self, fragment_id: FragmentId, ) -> MetaResult> { @@ -1055,6 +1055,29 @@ impl FragmentManager { bail!("fragment not found: {}", fragment_id) } + pub async fn get_running_actors_and_upstream_fragment_of_fragment( + &self, + fragment_id: FragmentId, + ) -> MetaResult<(Vec, Vec)> { + let map = &self.core.read().await.table_fragments; + + for table_fragment in map.values() { + if let Some(fragment) = table_fragment.fragments.get(&fragment_id) { + let running_actors = fragment + .actors + .iter() + .filter(|a| { + table_fragment.actor_status[&a.actor_id].state == ActorState::Running as i32 + }) + .cloned() + .collect(); + return Ok((running_actors, fragment.upstream_fragment_ids.clone())); + } + } + + bail!("fragment not found: {}", fragment_id) + } + /// Add the newly added Actor to the `FragmentManager` pub async fn pre_apply_reschedules( &self, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 4d225e53bba3..03c1b1f9dfb1 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -273,28 +273,63 @@ impl SourceManagerCore { }; let Some(discovered_splits) = handle.discovered_splits().await else { - return Ok(split_assignment); + continue; }; if discovered_splits.is_empty() { tracing::warn!("No splits discovered for source {}", source_id); } + let mut source_fragments = vec![]; + let mut backfill_fragments = vec![]; + for fragment_id in fragment_ids { - let actor_ids = match self + let (actors, upstream_fragment_ids) = match self .fragment_manager - .get_running_actors_of_fragment(*fragment_id) + .get_running_actors_and_upstream_fragment_of_fragment(*fragment_id) .await { - Ok(actor_ids) => actor_ids, + Ok((actors, upstream_fragment_ids)) => { + if actors.is_empty() { + tracing::warn!("No actors found for fragment {}", fragment_id); + continue; + } + (actors, upstream_fragment_ids) + } Err(err) => { tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string()); continue; } }; - let prev_actor_splits: HashMap<_, _> = actor_ids + if !upstream_fragment_ids.is_empty() { + debug_assert!( + upstream_fragment_ids.len() == 1, + "source backfill fragment should have exactly one upstream fragment, fragment_id: {fragment_id}, upstream_fragment_ids: {upstream_fragment_ids:?}" + ); + for actor in &actors { + debug_assert!( + actor.upstream_actor_id.len() == 1, + "source backfill actor should have exactly one upstream actor, fragment_id: {fragment_id}, actor: {actor:?}" + ); + } + backfill_fragments.push((*fragment_id, upstream_fragment_ids[0], actors)); + } else { + for actor in &actors { + debug_assert!( + actor.upstream_actor_id.is_empty(), + "source actor should not have upstream actors, fragment_id: {fragment_id}, actor: {actor:?}" + ); + } + source_fragments.push((*fragment_id, actors)); + } + } + + // assign splits for source fragments first + for (fragment_id, actors) in source_fragments { + let prev_actor_splits: HashMap<_, _> = actors .into_iter() - .map(|actor_id| { + .map(|actor| { + let actor_id = actor.actor_id; ( actor_id, self.actor_splits @@ -306,16 +341,40 @@ impl SourceManagerCore { .collect(); if let Some(new_assignment) = reassign_splits( - *fragment_id, + fragment_id, prev_actor_splits, &discovered_splits, SplitDiffOptions { enable_scale_in: handle.enable_scale_in, }, ) { - split_assignment.insert(*fragment_id, new_assignment); + split_assignment.insert(fragment_id, new_assignment); } } + + // align splits for backfill fragments with its upstream source fragment + for (fragment_id, upstream_fragment_id, actors) in backfill_fragments { + let upstream_assignment = split_assignment + .get(&upstream_fragment_id) + .unwrap_or_else(||panic!( + "source backfill fragment's upstream fragment should have assignment, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, split_assignment: {split_assignment:?}")); + split_assignment.insert( + fragment_id, + actors + .into_iter() + .map(|a| { + let actor_id = a.actor_id; + ( + actor_id, + upstream_assignment + .get(&actor_id) + .cloned() + .unwrap_or_else(||panic!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor: {a:?}, upstream_assignment: {upstream_assignment:?}")), + ) + }) + .collect(), + ); + } } Ok(split_assignment) @@ -430,9 +489,12 @@ impl Default for SplitDiffOptions { } } -/// Reassigns splits if there are new splits or dropped splits, +/// Reassigns splits (for a fragment's actors) if there are new splits or dropped splits, /// i.e., `actor_splits` and `discovered_splits` differ. /// +/// If an actor has an upstream actor, it should be a backfill executor, +/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case. +/// /// - `fragment_id`: just for logging fn reassign_splits( fragment_id: FragmentId,