From 2991a461eaa7af094610c154bacdc119b9ea6eed Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 24 Nov 2023 18:52:48 +0800 Subject: [PATCH] Modifies `impl PlanRoot` and adds `with_upstreams` method. --- src/frontend/src/optimizer/mod.rs | 6 ++---- src/meta/src/stream/stream_graph/fragment.rs | 2 ++ src/meta/src/stream/stream_manager.rs | 6 ++++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 98149fc146984..f6de31e4b6990 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -546,10 +546,7 @@ impl PlanRoot { .enforce_if_not_satisfies(external_source_node, &Order::any())? } - PrimaryKeyKind::RowIdAsPrimaryKey => { - StreamExchange::new_no_shuffle(external_source_node).into() - } - PrimaryKeyKind::AppendOnly => { + PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => { StreamExchange::new_no_shuffle(external_source_node).into() } }; @@ -583,6 +580,7 @@ impl PlanRoot { kind, column_descs, )?; + vec![dml_node] }; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 0b292dd0a8eb1..a47380e21950f 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -516,6 +516,8 @@ impl CompleteStreamFragmentGraph { } } + /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing + /// `Materialize` or `Source` fragments. pub fn with_upstreams( graph: StreamFragmentGraph, upstream_root_fragments: HashMap, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 28fc367576cb9..964bf774bc042 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -476,8 +476,10 @@ impl GlobalStreamManager { table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1) ); revert_funcs.push(Box::pin(async move { - if let Err(e) = hummock_manager_ref.unregister_table_ids(®istered_table_ids).await { - tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e); + if create_type == CreateType::Foreground { + if let Err(e) = hummock_manager_ref.unregister_table_ids(®istered_table_ids).await { + tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e); + } } }));