Skip to content

Commit

Permalink
feat(meta): align splits with source actor with an upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 13, 2024
1 parent d6e45e0 commit 8b7b439
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl CommandContext {
// Find the actors of the upstream fragment.
let upstream_actor_ids = mgr
.fragment_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.get_running_actor_ids_of_fragment(upstream_fragment_id)
.await?;

// Record updates for all actors.
Expand Down Expand Up @@ -507,7 +507,7 @@ impl CommandContext {
// Find the actors of the downstream fragment.
let downstream_actor_ids = mgr
.fragment_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.get_running_actor_ids_of_fragment(downstream_fragment_id)
.await?;

// Downstream removed actors should be skipped
Expand Down
25 changes: 24 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ impl FragmentManager {
}

/// Get the actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_of_fragment(
pub async fn get_running_actor_ids_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<HashSet<ActorId>> {
Expand All @@ -1010,6 +1010,29 @@ impl FragmentManager {
bail!("fragment not found: {}", fragment_id)
}

pub async fn get_running_actors_and_upstream_fragment_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<(Vec<StreamActor>, Vec<FragmentId>)> {
let map = &self.core.read().await.table_fragments;

for table_fragment in map.values() {
if let Some(fragment) = table_fragment.fragments.get(&fragment_id) {
let running_actors = fragment
.actors
.iter()
.filter(|a| {
table_fragment.actor_status[&a.actor_id].state == ActorState::Running as i32
})
.cloned()
.collect();
return Ok((running_actors, fragment.upstream_fragment_ids.clone()));
}
}

bail!("fragment not found: {}", fragment_id)
}

/// Add the newly added Actor to the `FragmentManager`
pub async fn pre_apply_reschedules(
&self,
Expand Down
18 changes: 17 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl MetadataManager {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_running_actors_of_fragment(id)
.get_running_actor_ids_of_fragment(id)
.await
}
MetadataManager::V2(mgr) => {
Expand All @@ -318,6 +318,22 @@ impl MetadataManager {
}
}

pub async fn get_running_actors_and_upstream_fragment_of_fragment(
&self,
id: FragmentId,
) -> MetaResult<(Vec<PbStreamActor>, Vec<FragmentId>)> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_running_actors_and_upstream_fragment_of_fragment(id)
.await
}
MetadataManager::V2(_mgr) => {
todo!()
}
}
}

pub async fn get_job_fragments_by_ids(
&self,
ids: &[TableId],
Expand Down
80 changes: 71 additions & 9 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,28 +275,63 @@ impl SourceManagerCore {
};

let Some(discovered_splits) = handle.discovered_splits().await else {
return Ok(split_assignment);
continue;
};
if discovered_splits.is_empty() {
tracing::warn!("No splits discovered for source {}", source_id);
}

let mut source_fragments = vec![];
let mut backfill_fragments = vec![];

for fragment_id in fragment_ids {
let actor_ids = match self
let (actors, upstream_fragment_ids) = match self
.metadata_manager
.get_running_actors_of_fragment(*fragment_id)
.get_running_actors_and_upstream_fragment_of_fragment(*fragment_id)
.await
{
Ok(actor_ids) => actor_ids,
Ok((actors, upstream_fragment_ids)) => {
if actors.is_empty() {
tracing::warn!("No actors found for fragment {}", fragment_id);
continue;
}
(actors, upstream_fragment_ids)
}
Err(err) => {
tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string());
continue;
}
};

let prev_actor_splits: HashMap<_, _> = actor_ids
if !upstream_fragment_ids.is_empty() {
debug_assert!(
upstream_fragment_ids.len() == 1,
"source backfill fragment should have exactly one upstream fragment, fragment_id: {fragment_id}, upstream_fragment_ids: {upstream_fragment_ids:?}"
);
for actor in &actors {
debug_assert!(
actor.upstream_actor_id.len() == 1,
"source backfill actor should have exactly one upstream actor, fragment_id: {fragment_id}, actor: {actor:?}"
);
}
backfill_fragments.push((*fragment_id, upstream_fragment_ids[0], actors));
} else {
for actor in &actors {
debug_assert!(
actor.upstream_actor_id.is_empty(),
"source actor should not have upstream actors, fragment_id: {fragment_id}, actor: {actor:?}"
);
}
source_fragments.push((*fragment_id, actors));
}
}

// assign splits for source fragments first
for (fragment_id, actors) in source_fragments {
let prev_actor_splits: HashMap<_, _> = actors
.into_iter()
.map(|actor_id| {
.map(|actor| {
let actor_id = actor.actor_id;
(
actor_id,
self.actor_splits
Expand All @@ -308,16 +343,40 @@ impl SourceManagerCore {
.collect();

if let Some(new_assignment) = reassign_splits(
*fragment_id,
fragment_id,
prev_actor_splits,
&discovered_splits,
SplitDiffOptions {
enable_scale_in: handle.enable_scale_in,
},
) {
split_assignment.insert(*fragment_id, new_assignment);
split_assignment.insert(fragment_id, new_assignment);
}
}

// align splits for backfill fragments with its upstream source fragment
for (fragment_id, upstream_fragment_id, actors) in backfill_fragments {
let upstream_assignment = split_assignment
.get(&upstream_fragment_id)
.unwrap_or_else(||panic!(
"source backfill fragment's upstream fragment should have assignment, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, split_assignment: {split_assignment:?}"));
split_assignment.insert(
fragment_id,
actors
.into_iter()
.map(|a| {
let actor_id = a.actor_id;
(
actor_id,
upstream_assignment
.get(&actor_id)
.cloned()
.unwrap_or_else(||panic!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor: {a:?}, upstream_assignment: {upstream_assignment:?}")),
)
})
.collect(),
);
}
}

Ok(split_assignment)
Expand Down Expand Up @@ -427,9 +486,12 @@ impl Default for SplitDiffOptions {
}
}

/// Reassigns splits if there are new splits or dropped splits,
/// Reassigns splits (for a fragment's actors) if there are new splits or dropped splits,
/// i.e., `actor_splits` and `discovered_splits` differ.
///
/// If an actor has an upstream actor, it should be a backfill executor,
/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case.
///
/// - `fragment_id`: just for logging
fn reassign_splits<T>(
fragment_id: FragmentId,
Expand Down

0 comments on commit 8b7b439

Please sign in to comment.