Skip to content

Commit

Permalink
fix(sink): Fix sink into table in the recovery of the new version of …
Browse files Browse the repository at this point in the history
…table graph. (#17960)

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Aug 7, 2024
1 parent e68a7bb commit ac99459
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,35 @@ impl FragmentManager {

let mut dirty_downstream_table_ids = HashMap::new();

fn union_input_is_clean(
all_fragment_ids: &HashSet<FragmentId>,
input: &StreamNode,
) -> bool {
match &input.node_body {
// for old version sink into table
Some(NodeBody::Merge(merge_node))
if !all_fragment_ids.contains(&merge_node.upstream_fragment_id) =>
{
false
}
// for new version sink into table with project
Some(NodeBody::Project(_)) => {
let merge_stream_node = input.input.iter().exactly_one().expect(
"project of the sink input for the target table should have only one input",
);

if let Some(NodeBody::Merge(merge_node)) = &merge_stream_node.node_body
&& !all_fragment_ids.contains(&merge_node.upstream_fragment_id)
{
false
} else {
true
}
}
_ => true,
}
}

for (table_id, table_fragment) in table_fragments.tree_ref() {
if to_delete_table_ids.contains(table_id) {
continue;
Expand All @@ -722,9 +751,7 @@ impl FragmentManager {
visit_stream_node_cont(actor.nodes.as_ref().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
for input in &node.input {
if let Some(NodeBody::Merge(merge_node)) = &input.node_body
&& !all_fragment_ids.contains(&merge_node.upstream_fragment_id)
{
if !union_input_is_clean(&all_fragment_ids, input) {
dirty_downstream_table_ids
.insert(*table_id, fragment.fragment_id);
return false;
Expand Down Expand Up @@ -754,15 +781,8 @@ impl FragmentManager {
for actor in &mut fragment.actors {
visit_stream_node_cont_mut(actor.nodes.as_mut().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
&& !all_fragment_ids.contains(&merge_node.upstream_fragment_id)
{
false
} else {
true
}
});
node.input
.retain_mut(|input| union_input_is_clean(&all_fragment_ids, input));
}
true
})
Expand Down

0 comments on commit ac99459

Please sign in to comment.