Skip to content

Commit

Permalink
Modifies impl PlanRoot and adds with_upstreams method.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Nov 24, 2023
1 parent 239c0d7 commit 2991a46
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,7 @@ impl PlanRoot {
.enforce_if_not_satisfies(external_source_node, &Order::any())?
}

PrimaryKeyKind::RowIdAsPrimaryKey => {
StreamExchange::new_no_shuffle(external_source_node).into()
}
PrimaryKeyKind::AppendOnly => {
PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => {
StreamExchange::new_no_shuffle(external_source_node).into()
}
};
Expand Down Expand Up @@ -583,6 +580,7 @@ impl PlanRoot {
kind,
column_descs,
)?;

vec![dml_node]
};

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ impl CompleteStreamFragmentGraph {
}
}

/// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing
/// `Materialize` or `Source` fragments.
pub fn with_upstreams(
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,10 @@ impl GlobalStreamManager {
table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1)
);
revert_funcs.push(Box::pin(async move {
if let Err(e) = hummock_manager_ref.unregister_table_ids(&registered_table_ids).await {
tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e);
if create_type == CreateType::Foreground {
if let Err(e) = hummock_manager_ref.unregister_table_ids(&registered_table_ids).await {
tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e);
}
}
}));

Expand Down

0 comments on commit 2991a46

Please sign in to comment.