Skip to content

Commit

Permalink
fix: clean downstream dropped actors for cdc streaming graph
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 18, 2024
1 parent c902aad commit 5bd2818
Showing 1 changed file with 2 additions and 3 deletions.
5 changes: 2 additions & 3 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ impl FragmentManager {
let mut table_fragments = BTreeMapTransaction::new(map);
for table_fragment in &to_delete_table_fragments {
table_fragments.remove(table_fragment.table_id());
let backfill_actor_ids = table_fragment.backfill_actor_ids();
let to_remove_actor_ids: HashSet<_> = table_fragment.actor_ids().into_iter().collect();
let dependent_table_ids = table_fragment.dependent_table_ids();
for (dependent_table_id, _) in dependent_table_ids {
if table_ids.contains(&dependent_table_id) {
Expand All @@ -581,12 +581,11 @@ impl FragmentManager {
dependent_table
.fragments
.values_mut()
.filter(|f| (f.get_fragment_type_mask() & FragmentTypeFlag::Mview as u32) != 0)
.flat_map(|f| &mut f.actors)
.for_each(|a| {
a.dispatcher.retain_mut(|d| {
d.downstream_actor_id
.retain(|x| !backfill_actor_ids.contains(x));
.retain(|x| !to_remove_actor_ids.contains(x));
!d.downstream_actor_id.is_empty()
})
});
Expand Down

0 comments on commit 5bd2818

Please sign in to comment.