From e445dc462ff5071488e64c0055c42bf44f85d5d9 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 30 Jan 2024 20:10:30 +0800 Subject: [PATCH] fix: fix rename of target table for sink into table --- src/meta/src/controller/catalog.rs | 16 ++++++++++++++++ src/meta/src/manager/catalog/mod.rs | 4 +++- src/meta/src/manager/catalog/utils.rs | 19 +++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index d4f67c6ad1ee9..75ef08c92278a 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1803,6 +1803,22 @@ impl CatalogController { }}; } let objs = get_referring_objects(object_id, &txn).await?; + // TODO: For sink into table. when sink into table is ready. + // if object_type == ObjectType::Table { + // let incoming_sinks: Vec<_> = Table::find_by_id(object_id) + // .select_only() + // .column(table::Column::IncomingSinks) + // .into_tuple() + // .one(&txn) + // .await? + // .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + // objs.extend(incoming_sinks.into_iter().map(|id| PartialObject { + // oid: id as _, + // obj_type: ObjectType::Sink, + // ..Default::default() + // })); + // } + for obj in objs { match obj.obj_type { ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 89294fdcd43df..4d0476f5d4887 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1749,7 +1749,9 @@ impl CatalogManager { } for sink in database_mgr.sinks.values() { - if sink.dependent_relations.contains(&relation_id) { + if sink.dependent_relations.contains(&relation_id) + || sink.target_table == Some(relation_id) + { let mut sink = sink.clone(); sink.definition = alter_relation_rename_refs(&sink.definition, from, to); to_update_sinks.push(sink); diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index b9272b533daac..69592180aa60a 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -107,6 +107,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str stmt: CreateSinkStatement { sink_from: CreateSink::AsQuery(query), + into_table_name: None, .. }, } => { @@ -117,9 +118,27 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str stmt: CreateSinkStatement { sink_from: CreateSink::From(table_name), + into_table_name: None, .. }, } => replace_table_name(table_name, to), + Statement::CreateSink { + stmt: CreateSinkStatement { + sink_from, + into_table_name: Some(table_name), + .. + } + } => { + let idx = table_name.0.len() - 1; + if table_name.0[idx].real_value() == from { + table_name.0[idx] = Ident::new_unchecked(to); + } else { + match sink_from { + CreateSink::From(table_name) => replace_table_name(table_name, to), + CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to), + } + } + } _ => unreachable!(), }; stmt.to_string()