Skip to content

Commit

Permalink
feat: Improve fragment management and actor retrieval
Browse files Browse the repository at this point in the history
- Add the ability to update rate limits of Source and Chain nodes in fragments
- Update vnode mapping for actors in fragments
- Notify frontend nodes about fragment mapping updates
- Update dispatcher of upstream fragments when applying reschedules
- Update merge executor of downstream fragments when applying reschedules
- Apply reschedules to fragments, including adding/removing actors and updating vnode mappings
- Migrate actors and update fragments according to migration plan
- Drop table fragments and remove downstream actor info in dependent tables
- Get lists of all worker parallel units, node actors, actor IDs in specific tables/fragments, running actors in specific fragments, upstream root fragments of specified relations, downstream Chain fragments of a specified table, Materialize fragment of a specified table, and parallelism of running fragments

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Nov 1, 2023
1 parent 6165df7 commit 981bb8e
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ impl FragmentManager {
}
}
}
if table_id_to_apply.is_empty() {
return Err(MetaError::from(anyhow!(
"source_id {:?} not found in all fragments",
source_id
)));
}

let mut table_fragments = BTreeMapTransaction::new(map);
let mut to_apply_fragment = HashMap::new();
Expand All @@ -822,6 +828,12 @@ impl FragmentManager {
}

commit_meta!(self, table_fragments)?;
tracing::info!(
"update source actor rate limit to: {:?}, actors {:?}",
rate_limit,
to_apply_fragment
);

Ok(to_apply_fragment)
}

Expand Down Expand Up @@ -853,6 +865,11 @@ impl FragmentManager {
}

commit_meta!(self, table_fragments)?;
tracing::info!(
"update mv actor rate limit to: {:?}, actors {:?}",
rate_limit,
fragment_to_apply
);

Ok(fragment_to_apply)
}
Expand Down

0 comments on commit 981bb8e

Please sign in to comment.