From 74d0c2965bfe81e0a18d63b72d8a11080da166df Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 30 Jul 2024 18:23:21 +0800 Subject: [PATCH] Implement unique identities for sinks across `SinkCatalog`, handlers, `DdlController`, and `catalog::Sink`. --- src/connector/src/sink/catalog/mod.rs | 5 ++++ .../src/handler/alter_table_column.rs | 17 +++++++------- src/frontend/src/handler/create_sink.rs | 23 ++++++++++++------- src/frontend/src/handler/drop_sink.rs | 7 +++++- src/meta/src/manager/catalog/mod.rs | 1 + src/meta/src/rpc/ddl_controller.rs | 18 +++++++-------- src/meta/src/rpc/ddl_controller_v2.rs | 4 +--- src/prost/src/lib.rs | 8 +++++++ 8 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index a13ad203d1fa6..7a9dc5d564ca7 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -450,6 +450,11 @@ impl SinkCatalog { pub fn downstream_pk_indices(&self) -> Vec { self.downstream_pk.clone() } + + pub fn unique_identity(&self) -> String { + // We need to align with meta here, so we've utilized the proto method. + self.to_proto().unique_identity() + } } impl From for SinkCatalog { diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 75293d3f4b8cd..ba3ae95399783 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -107,7 +107,12 @@ pub async fn replace_table_with_definition( .collect_vec(); for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? { - hijack_merger_for_target_table(&mut graph, &target_columns, &sink)?; + hijack_merger_for_target_table( + &mut graph, + &target_columns, + &sink, + Some(&sink.unique_identity()), + )?; } table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); @@ -124,6 +129,7 @@ pub(crate) fn hijack_merger_for_target_table( graph: &mut StreamFragmentGraph, target_columns: &[ColumnCatalog], sink: &SinkCatalog, + uniq_identify: Option<&str>, ) -> Result<()> { let mut sink_columns = sink.original_target_columns.clone(); if sink_columns.is_empty() { @@ -164,14 +170,7 @@ pub(crate) fn hijack_merger_for_target_table( for fragment in graph.fragments.values_mut() { if let Some(node) = &mut fragment.node { - insert_merger_to_union_with_project( - node, - &pb_project, - &format!( - "{}.{}.{}", - sink.database_id.database_id, sink.schema_id.schema_id, sink.name - ), - ); + insert_merger_to_union_with_project(node, &pb_project, uniq_identify); } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 64ac847433952..9ce59cddece54 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -454,6 +454,8 @@ pub async fn handle_create_sink( let mut target_table_replace_plan = None; if let Some(table_catalog) = target_table_catalog { + use crate::handler::alter_table_column::hijack_merger_for_target_table; + check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?; let (mut graph, mut table, source) = @@ -470,16 +472,20 @@ pub async fn handle_create_sink( .clone_from(&table_catalog.incoming_sinks); let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect(); - let mut incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?; - incoming_sinks.push(Arc::new(sink.clone())); - for sink in incoming_sinks { - crate::handler::alter_table_column::hijack_merger_for_target_table( + let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?; + + for existing_sink in incoming_sinks { + hijack_merger_for_target_table( &mut graph, table_catalog.columns(), - &sink, + &existing_sink, + Some(&existing_sink.unique_identity()), )?; } + // for new creating sink, we don't have a unique identity because the sink id is not generated yet. + hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink, None)?; + target_table_replace_plan = Some(ReplaceTablePlan { source, table: Some(table), @@ -696,9 +702,10 @@ pub(crate) async fn reparse_table_for_sink( pub(crate) fn insert_merger_to_union_with_project( node: &mut StreamNode, project_node: &PbNodeBody, - uniq_name: &str, + uniq_identity: Option<&str>, ) { if let Some(NodeBody::Union(_union_node)) = &mut node.node_body { + // TODO: MergeNode is used as a placeholder, see issue #17658 node.input.push(StreamNode { input: vec![StreamNode { node_body: Some(NodeBody::Merge(MergeNode { @@ -706,7 +713,7 @@ pub(crate) fn insert_merger_to_union_with_project( })), ..Default::default() }], - identity: uniq_name.to_string(), + identity: uniq_identity.unwrap_or("").to_string(), fields: node.fields.clone(), node_body: Some(project_node.clone()), ..Default::default() @@ -716,7 +723,7 @@ pub(crate) fn insert_merger_to_union_with_project( } for input in &mut node.input { - insert_merger_to_union_with_project(input, project_node, uniq_name); + insert_merger_to_union_with_project(input, project_node, uniq_identity); } } diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 85c6a7e792620..967fe700dfcde 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -85,7 +85,12 @@ pub async fn handle_drop_sink( assert!(incoming_sink_ids.remove(&sink_id.sink_id)); for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? { - hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink)?; + hijack_merger_for_target_table( + &mut graph, + table_catalog.columns(), + &sink, + Some(&sink.unique_identity()), + )?; } affected_table_change = Some(ReplaceTablePlan { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index fc03445a3dcfb..62ecb766a2d74 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -322,6 +322,7 @@ impl CatalogManager { Ok(()) } + // Fill in the original_target_columns that wasn't written in the previous version for the table sink. async fn table_sink_catalog_update(&self) -> MetaResult<()> { let core = &mut *self.core.lock().await; let mut sinks = BTreeMapTransaction::new(&mut core.database.sinks); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 240b61aa2a428..b27c9824e2e40 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1092,7 +1092,6 @@ impl DdlController { if let Some(creating_sink_table_fragments) = creating_sink_table_fragments { let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap(); let sink = sink.expect("sink not found"); - let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); Self::inject_replace_table_plan_for_sink( Some(sink.id), &sink_fragment, @@ -1100,7 +1099,7 @@ impl DdlController { &mut replace_table_ctx, &mut table_fragments, target_fragment_id, - uniq_name, + None, ); } @@ -1126,8 +1125,6 @@ impl DdlController { continue; }; - let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); - let sink_table_fragments = mgr .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id)) .await?; @@ -1141,7 +1138,7 @@ impl DdlController { &mut replace_table_ctx, &mut table_fragments, target_fragment_id, - uniq_name, + Some(&sink.unique_identity()), ); } } @@ -1169,7 +1166,7 @@ impl DdlController { replace_table_ctx: &mut ReplaceTableContext, table_fragments: &mut TableFragments, target_fragment_id: FragmentId, - uniq_name: &str, + unique_identity: Option<&str>, ) { let sink_actor_ids = sink_fragment .actors @@ -1254,7 +1251,10 @@ impl DdlController { let merge_stream_node = input_project_node.input.iter_mut().exactly_one().unwrap(); - if input_project_node.identity.as_str() != uniq_name { + // we need to align nodes here + if input_project_node.identity.as_str() + != unique_identity.unwrap_or("") + { continue; } @@ -1897,8 +1897,6 @@ impl DdlController { for sink in catalogs { let sink_id = &sink.id; - let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); - let sink_table_fragments = self .metadata_manager .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) @@ -1913,7 +1911,7 @@ impl DdlController { &mut ctx, &mut table_fragments, target_fragment_id, - uniq_name, + Some(&sink.unique_identity()), ); if sink.original_target_columns.is_empty() { diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 3cebc53f08cfb..c097fa5acb5c6 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -487,8 +487,6 @@ impl DdlController { for sink in catalogs { let sink_id = &sink.id; - let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); - let sink_table_fragments = self .metadata_manager .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) @@ -503,7 +501,7 @@ impl DdlController { &mut ctx, &mut table_fragments, target_fragment_id, - uniq_name, + Some(&sink.unique_identity()), ); if sink.original_target_columns.is_empty() { diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index f26f8c9f38d91..8839a043ad764 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -296,6 +296,14 @@ impl catalog::StreamSourceInfo { } } +impl catalog::Sink { + pub fn unique_identity(&self) -> String { + // TODO: use a more unique name + // We don't use sink.id because it's generated by the meta node . + format!("{}.{}.{}", self.database_id, self.schema_id, self.name) + } +} + #[cfg(test)] mod tests { use crate::data::{data_type, DataType};