Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): align splits with source actor with an upstream #14171

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl CommandContext {
// Find the actors of the upstream fragment.
let upstream_actor_ids = mgr
.fragment_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.get_running_actor_ids_of_fragment(upstream_fragment_id)
.await?;

// Record updates for all actors.
Expand Down Expand Up @@ -530,7 +530,7 @@ impl CommandContext {
// Find the actors of the downstream fragment.
let downstream_actor_ids = mgr
.fragment_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.get_running_actor_ids_of_fragment(downstream_fragment_id)
.await?;

// Downstream removed actors should be skipped
Expand Down
25 changes: 24 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ impl FragmentManager {
}

/// Get the actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_of_fragment(
pub async fn get_running_actor_ids_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<HashSet<ActorId>> {
Expand All @@ -1006,6 +1006,29 @@ impl FragmentManager {
bail!("fragment not found: {}", fragment_id)
}

pub async fn get_running_actors_and_upstream_fragment_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<(Vec<StreamActor>, Vec<FragmentId>)> {
let map = &self.core.read().await.table_fragments;

for table_fragment in map.values() {
if let Some(fragment) = table_fragment.fragments.get(&fragment_id) {
let running_actors = fragment
.actors
.iter()
.filter(|a| {
table_fragment.actor_status[&a.actor_id].state == ActorState::Running as i32
})
.cloned()
.collect();
return Ok((running_actors, fragment.upstream_fragment_ids.clone()));
}
}

bail!("fragment not found: {}", fragment_id)
}

/// Add the newly added Actor to the `FragmentManager`
pub async fn pre_apply_reschedules(
&self,
Expand Down
18 changes: 17 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl MetadataManager {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_running_actors_of_fragment(id)
.get_running_actor_ids_of_fragment(id)
.await
}
MetadataManager::V2(mgr) => {
Expand All @@ -372,6 +372,22 @@ impl MetadataManager {
}
}

pub async fn get_running_actors_and_upstream_fragment_of_fragment(
&self,
id: FragmentId,
) -> MetaResult<(Vec<PbStreamActor>, Vec<FragmentId>)> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_running_actors_and_upstream_fragment_of_fragment(id)
.await
}
MetadataManager::V2(_mgr) => {
todo!()
}
}
}

pub async fn get_job_fragments_by_ids(
&self,
ids: &[TableId],
Expand Down
80 changes: 71 additions & 9 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,28 +276,63 @@ impl SourceManagerCore {
};

let Some(discovered_splits) = handle.discovered_splits().await else {
return Ok(split_assignment);
continue;
};
if discovered_splits.is_empty() {
tracing::warn!("No splits discovered for source {}", source_id);
}

let mut source_fragments = vec![];
let mut backfill_fragments = vec![];

for fragment_id in fragment_ids {
let actor_ids = match self
let (actors, upstream_fragment_ids) = match self
.metadata_manager
.get_running_actors_of_fragment(*fragment_id)
.get_running_actors_and_upstream_fragment_of_fragment(*fragment_id)
.await
{
Ok(actor_ids) => actor_ids,
Ok((actors, upstream_fragment_ids)) => {
if actors.is_empty() {
tracing::warn!("No actors found for fragment {}", fragment_id);
continue;
}
(actors, upstream_fragment_ids)
}
Err(err) => {
tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
continue;
}
};

let prev_actor_splits: HashMap<_, _> = actor_ids
if !upstream_fragment_ids.is_empty() {
debug_assert!(
upstream_fragment_ids.len() == 1,
"source backfill fragment should have exactly one upstream fragment, fragment_id: {fragment_id}, upstream_fragment_ids: {upstream_fragment_ids:?}"
);
for actor in &actors {
debug_assert!(
actor.upstream_actor_id.len() == 1,
"source backfill actor should have exactly one upstream actor, fragment_id: {fragment_id}, actor: {actor:?}"
);
}
backfill_fragments.push((*fragment_id, upstream_fragment_ids[0], actors));
} else {
for actor in &actors {
debug_assert!(
actor.upstream_actor_id.is_empty(),
"source actor should not have upstream actors, fragment_id: {fragment_id}, actor: {actor:?}"
);
}
source_fragments.push((*fragment_id, actors));
}
}

// assign splits for source fragments first
for (fragment_id, actors) in source_fragments {
let prev_actor_splits: HashMap<_, _> = actors
.into_iter()
.map(|actor_id| {
.map(|actor| {
let actor_id = actor.actor_id;
(
actor_id,
self.actor_splits
Expand All @@ -309,16 +344,40 @@ impl SourceManagerCore {
.collect();

if let Some(new_assignment) = reassign_splits(
*fragment_id,
fragment_id,
prev_actor_splits,
&discovered_splits,
SplitDiffOptions {
enable_scale_in: handle.enable_scale_in,
},
) {
split_assignment.insert(*fragment_id, new_assignment);
split_assignment.insert(fragment_id, new_assignment);
}
}

// align splits for backfill fragments with its upstream source fragment
for (fragment_id, upstream_fragment_id, actors) in backfill_fragments {
let upstream_assignment = split_assignment
.get(&upstream_fragment_id)
.unwrap_or_else(||panic!(
"source backfill fragment's upstream fragment should have assignment, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, split_assignment: {split_assignment:?}"));
split_assignment.insert(
fragment_id,
actors
.into_iter()
.map(|a| {
let actor_id = a.actor_id;
(
actor_id,
upstream_assignment
.get(&actor_id)
.cloned()
.unwrap_or_else(||panic!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor: {a:?}, upstream_assignment: {upstream_assignment:?}")),
)
})
.collect(),
);
}
}

Ok(split_assignment)
Expand Down Expand Up @@ -428,9 +487,12 @@ impl Default for SplitDiffOptions {
}
}

/// Reassigns splits if there are new splits or dropped splits,
/// Reassigns splits (for a fragment's actors) if there are new splits or dropped splits,
/// i.e., `actor_splits` and `discovered_splits` differ.
///
/// If an actor has an upstream actor, it should be a backfill executor,
/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case.
///
/// - `fragment_id`: just for logging
fn reassign_splits<T>(
fragment_id: FragmentId,
Expand Down