diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index de88c1ae17608..fa8b3fa7e6d71 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -491,7 +491,7 @@ impl CommandContext { // Find the actors of the upstream fragment. let upstream_actor_ids = mgr .fragment_manager - .get_running_actors_of_fragment(upstream_fragment_id) + .get_running_actor_ids_of_fragment(upstream_fragment_id) .await?; // Record updates for all actors. @@ -530,7 +530,7 @@ impl CommandContext { // Find the actors of the downstream fragment. let downstream_actor_ids = mgr .fragment_manager - .get_running_actors_of_fragment(downstream_fragment_id) + .get_running_actor_ids_of_fragment(downstream_fragment_id) .await?; // Downstream removed actors should be skipped diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index f71ea1567725c..72f903d3b389d 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -985,7 +985,7 @@ impl FragmentManager { } /// Get the actor ids of the fragment with `fragment_id` with `Running` status. - pub async fn get_running_actors_of_fragment( + pub async fn get_running_actor_ids_of_fragment( &self, fragment_id: FragmentId, ) -> MetaResult> { @@ -1006,6 +1006,29 @@ impl FragmentManager { bail!("fragment not found: {}", fragment_id) } + pub async fn get_running_actors_and_upstream_fragment_of_fragment( + &self, + fragment_id: FragmentId, + ) -> MetaResult<(Vec, Vec)> { + let map = &self.core.read().await.table_fragments; + + for table_fragment in map.values() { + if let Some(fragment) = table_fragment.fragments.get(&fragment_id) { + let running_actors = fragment + .actors + .iter() + .filter(|a| { + table_fragment.actor_status[&a.actor_id].state == ActorState::Running as i32 + }) + .cloned() + .collect(); + return Ok((running_actors, fragment.upstream_fragment_ids.clone())); + } + } + + bail!("fragment not found: {}", fragment_id) + } + /// Add the newly added Actor to the `FragmentManager` pub async fn pre_apply_reschedules( &self, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index d4ed2e0e15577..30726984a1d99 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -359,7 +359,7 @@ impl MetadataManager { match self { MetadataManager::V1(mgr) => { mgr.fragment_manager - .get_running_actors_of_fragment(id) + .get_running_actor_ids_of_fragment(id) .await } MetadataManager::V2(mgr) => { @@ -372,6 +372,22 @@ impl MetadataManager { } } + pub async fn get_running_actors_and_upstream_fragment_of_fragment( + &self, + id: FragmentId, + ) -> MetaResult<(Vec, Vec)> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .get_running_actors_and_upstream_fragment_of_fragment(id) + .await + } + MetadataManager::V2(_mgr) => { + todo!() + } + } + } + pub async fn get_job_fragments_by_ids( &self, ids: &[TableId], diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index f3ce3816522b9..95affb4a6c606 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -276,28 +276,63 @@ impl SourceManagerCore { }; let Some(discovered_splits) = handle.discovered_splits().await else { - return Ok(split_assignment); + continue; }; if discovered_splits.is_empty() { tracing::warn!("No splits discovered for source {}", source_id); } + let mut source_fragments = vec![]; + let mut backfill_fragments = vec![]; + for fragment_id in fragment_ids { - let actor_ids = match self + let (actors, upstream_fragment_ids) = match self .metadata_manager - .get_running_actors_of_fragment(*fragment_id) + .get_running_actors_and_upstream_fragment_of_fragment(*fragment_id) .await { - Ok(actor_ids) => actor_ids, + Ok((actors, upstream_fragment_ids)) => { + if actors.is_empty() { + tracing::warn!("No actors found for fragment {}", fragment_id); + continue; + } + (actors, upstream_fragment_ids) + } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); continue; } }; - let prev_actor_splits: HashMap<_, _> = actor_ids + if !upstream_fragment_ids.is_empty() { + debug_assert!( + upstream_fragment_ids.len() == 1, + "source backfill fragment should have exactly one upstream fragment, fragment_id: {fragment_id}, upstream_fragment_ids: {upstream_fragment_ids:?}" + ); + for actor in &actors { + debug_assert!( + actor.upstream_actor_id.len() == 1, + "source backfill actor should have exactly one upstream actor, fragment_id: {fragment_id}, actor: {actor:?}" + ); + } + backfill_fragments.push((*fragment_id, upstream_fragment_ids[0], actors)); + } else { + for actor in &actors { + debug_assert!( + actor.upstream_actor_id.is_empty(), + "source actor should not have upstream actors, fragment_id: {fragment_id}, actor: {actor:?}" + ); + } + source_fragments.push((*fragment_id, actors)); + } + } + + // assign splits for source fragments first + for (fragment_id, actors) in source_fragments { + let prev_actor_splits: HashMap<_, _> = actors .into_iter() - .map(|actor_id| { + .map(|actor| { + let actor_id = actor.actor_id; ( actor_id, self.actor_splits @@ -309,16 +344,40 @@ impl SourceManagerCore { .collect(); if let Some(new_assignment) = reassign_splits( - *fragment_id, + fragment_id, prev_actor_splits, &discovered_splits, SplitDiffOptions { enable_scale_in: handle.enable_scale_in, }, ) { - split_assignment.insert(*fragment_id, new_assignment); + split_assignment.insert(fragment_id, new_assignment); } } + + // align splits for backfill fragments with its upstream source fragment + for (fragment_id, upstream_fragment_id, actors) in backfill_fragments { + let upstream_assignment = split_assignment + .get(&upstream_fragment_id) + .unwrap_or_else(||panic!( + "source backfill fragment's upstream fragment should have assignment, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, split_assignment: {split_assignment:?}")); + split_assignment.insert( + fragment_id, + actors + .into_iter() + .map(|a| { + let actor_id = a.actor_id; + ( + actor_id, + upstream_assignment + .get(&actor_id) + .cloned() + .unwrap_or_else(||panic!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor: {a:?}, upstream_assignment: {upstream_assignment:?}")), + ) + }) + .collect(), + ); + } } Ok(split_assignment) @@ -428,9 +487,12 @@ impl Default for SplitDiffOptions { } } -/// Reassigns splits if there are new splits or dropped splits, +/// Reassigns splits (for a fragment's actors) if there are new splits or dropped splits, /// i.e., `actor_splits` and `discovered_splits` differ. /// +/// If an actor has an upstream actor, it should be a backfill executor, +/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case. +/// /// - `fragment_id`: just for logging fn reassign_splits( fragment_id: FragmentId,