Skip to content

Commit

Permalink
feat(meta): align splits with source actor with an upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 1, 2024
1 parent 9161189 commit cf160ee
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ impl CommandContext {
// Find the actors of the upstream fragment.
let upstream_actor_ids = self
.fragment_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.get_running_actor_ids_of_fragment(upstream_fragment_id)
.await?;

// Record updates for all actors.
Expand Down Expand Up @@ -495,7 +495,7 @@ impl CommandContext {
// Find the actors of the downstream fragment.
let downstream_actor_ids = self
.fragment_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.get_running_actor_ids_of_fragment(downstream_fragment_id)
.await?;

// Downstream removed actors should be skipped
Expand Down
25 changes: 24 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ impl FragmentManager {
}

/// Get the actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_of_fragment(
pub async fn get_running_actor_ids_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<HashSet<ActorId>> {
Expand All @@ -1055,6 +1055,29 @@ impl FragmentManager {
bail!("fragment not found: {}", fragment_id)
}

pub async fn get_running_actors_and_upstream_fragment_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<(Vec<StreamActor>, Vec<FragmentId>)> {
let map = &self.core.read().await.table_fragments;

for table_fragment in map.values() {
if let Some(fragment) = table_fragment.fragments.get(&fragment_id) {
let running_actors = fragment
.actors
.iter()
.filter(|a| {
table_fragment.actor_status[&a.actor_id].state == ActorState::Running as i32
})
.cloned()
.collect();
return Ok((running_actors, fragment.upstream_fragment_ids.clone()));
}
}

bail!("fragment not found: {}", fragment_id)
}

/// Add the newly added Actor to the `FragmentManager`
pub async fn pre_apply_reschedules(
&self,
Expand Down
80 changes: 71 additions & 9 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,28 +273,63 @@ impl SourceManagerCore {
};

let Some(discovered_splits) = handle.discovered_splits().await else {
return Ok(split_assignment);
continue;
};
if discovered_splits.is_empty() {
tracing::warn!("No splits discovered for source {}", source_id);
}

let mut source_fragments = vec![];
let mut backfill_fragments = vec![];

for fragment_id in fragment_ids {
let actor_ids = match self
let (actors, upstream_fragment_ids) = match self
.fragment_manager
.get_running_actors_of_fragment(*fragment_id)
.get_running_actors_and_upstream_fragment_of_fragment(*fragment_id)
.await
{
Ok(actor_ids) => actor_ids,
Ok((actors, upstream_fragment_ids)) => {
if actors.is_empty() {
tracing::warn!("No actors found for fragment {}", fragment_id);
continue;
}
(actors, upstream_fragment_ids)
}
Err(err) => {
tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string());
continue;
}
};

let prev_actor_splits: HashMap<_, _> = actor_ids
if !upstream_fragment_ids.is_empty() {
debug_assert!(
upstream_fragment_ids.len() == 1,
"source backfill fragment should have exactly one upstream fragment, fragment_id: {fragment_id}, upstream_fragment_ids: {upstream_fragment_ids:?}"
);
for actor in &actors {
debug_assert!(
actor.upstream_actor_id.len() == 1,
"source backfill actor should have exactly one upstream actor, fragment_id: {fragment_id}, actor: {actor:?}"
);
}
backfill_fragments.push((*fragment_id, upstream_fragment_ids[0], actors));
} else {
for actor in &actors {
debug_assert!(
actor.upstream_actor_id.is_empty(),
"source actor should not have upstream actors, fragment_id: {fragment_id}, actor: {actor:?}"
);
}
source_fragments.push((*fragment_id, actors));
}
}

// assign splits for source fragments first
for (fragment_id, actors) in source_fragments {
let prev_actor_splits: HashMap<_, _> = actors
.into_iter()
.map(|actor_id| {
.map(|actor| {
let actor_id = actor.actor_id;
(
actor_id,
self.actor_splits
Expand All @@ -306,16 +341,40 @@ impl SourceManagerCore {
.collect();

if let Some(new_assignment) = reassign_splits(
*fragment_id,
fragment_id,
prev_actor_splits,
&discovered_splits,
SplitDiffOptions {
enable_scale_in: handle.enable_scale_in,
},
) {
split_assignment.insert(*fragment_id, new_assignment);
split_assignment.insert(fragment_id, new_assignment);
}
}

// align splits for backfill fragments with its upstream source fragment
for (fragment_id, upstream_fragment_id, actors) in backfill_fragments {
let upstream_assignment = split_assignment
.get(&upstream_fragment_id)
.unwrap_or_else(||panic!(
"source backfill fragment's upstream fragment should have assignment, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, split_assignment: {split_assignment:?}"));
split_assignment.insert(
fragment_id,
actors
.into_iter()
.map(|a| {
let actor_id = a.actor_id;
(
actor_id,
upstream_assignment
.get(&actor_id)
.cloned()
.unwrap_or_else(||panic!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor: {a:?}, upstream_assignment: {upstream_assignment:?}")),
)
})
.collect(),
);
}
}

Ok(split_assignment)
Expand Down Expand Up @@ -430,9 +489,12 @@ impl Default for SplitDiffOptions {
}
}

/// Reassigns splits if there are new splits or dropped splits,
/// Reassigns splits (for a fragment's actors) if there are new splits or dropped splits,
/// i.e., `actor_splits` and `discovered_splits` differ.
///
/// If an actor has an upstream actor, it should be a backfill executor,
/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case.
///
/// - `fragment_id`: just for logging
fn reassign_splits<T>(
fragment_id: FragmentId,
Expand Down

0 comments on commit cf160ee

Please sign in to comment.