diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 42e80d1c13a64..a523bbfeb3e7e 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -704,6 +704,35 @@ impl FragmentManager { let mut dirty_downstream_table_ids = HashMap::new(); + fn union_input_is_clean( + all_fragment_ids: &HashSet, + input: &StreamNode, + ) -> bool { + match &input.node_body { + // for old version sink into table + Some(NodeBody::Merge(merge_node)) + if !all_fragment_ids.contains(&merge_node.upstream_fragment_id) => + { + false + } + // for new version sink into table with project + Some(NodeBody::Project(_)) => { + let merge_stream_node = input.input.iter().exactly_one().expect( + "project of the sink input for the target table should have only one input", + ); + + if let Some(NodeBody::Merge(merge_node)) = &merge_stream_node.node_body + && !all_fragment_ids.contains(&merge_node.upstream_fragment_id) + { + false + } else { + true + } + } + _ => true, + } + } + for (table_id, table_fragment) in table_fragments.tree_ref() { if to_delete_table_ids.contains(table_id) { continue; @@ -722,9 +751,7 @@ impl FragmentManager { visit_stream_node_cont(actor.nodes.as_ref().unwrap(), |node| { if let Some(NodeBody::Union(_)) = node.node_body { for input in &node.input { - if let Some(NodeBody::Merge(merge_node)) = &input.node_body - && !all_fragment_ids.contains(&merge_node.upstream_fragment_id) - { + if !union_input_is_clean(&all_fragment_ids, input) { dirty_downstream_table_ids .insert(*table_id, fragment.fragment_id); return false; @@ -754,15 +781,8 @@ impl FragmentManager { for actor in &mut fragment.actors { visit_stream_node_cont_mut(actor.nodes.as_mut().unwrap(), |node| { if let Some(NodeBody::Union(_)) = node.node_body { - node.input.retain_mut(|input| { - if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body - && !all_fragment_ids.contains(&merge_node.upstream_fragment_id) - { - false - } else { - true - } - }); + node.input + .retain_mut(|input| union_input_is_clean(&all_fragment_ids, input)); } true })