From 17962093e4dc88347625da2d97e038b240108761 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 20 Feb 2024 20:28:42 +0800 Subject: [PATCH] Refactor catalog.rs to handle table sinks with PartialObject --- src/meta/src/controller/catalog.rs | 41 ++++++++++++++---------- src/meta/src/controller/rename.rs | 19 +++++++++++ src/meta/src/controller/streaming_job.rs | 2 +- src/meta/src/rpc/ddl_controller.rs | 3 +- 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 8b50aa36ea24a..e26e1af0f0cff 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -29,7 +29,7 @@ use risingwave_meta_model_v2::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, - IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, SourceId, StreamSourceInfo, + IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; @@ -2084,22 +2084,28 @@ impl CatalogController { }); }}; } - let objs = get_referring_objects(object_id, &txn).await?; - // TODO: For sink into table. when sink into table is ready. - // if object_type == ObjectType::Table { - // let incoming_sinks: Vec<_> = Table::find_by_id(object_id) - // .select_only() - // .column(table::Column::IncomingSinks) - // .into_tuple() - // .one(&txn) - // .await? - // .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; - // objs.extend(incoming_sinks.into_iter().map(|id| PartialObject { - // oid: id as _, - // obj_type: ObjectType::Sink, - // ..Default::default() - // })); - // } + let mut objs = get_referring_objects(object_id, &txn).await?; + if object_type == ObjectType::Table { + let incoming_sinks: I32Array = Table::find_by_id(object_id) + .select_only() + .column(table::Column::IncomingSinks) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + objs.extend( + incoming_sinks + .into_inner() + .into_iter() + .map(|id| PartialObject { + oid: id, + obj_type: ObjectType::Sink, + schema_id: None, + database_id: None, + }), + ); + } for obj in objs { match obj.obj_type { @@ -2287,6 +2293,7 @@ impl CatalogController { .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()) .collect()) } + pub async fn find_creating_streaming_job_ids( &self, infos: Vec, diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index bde954a587fdf..15be4d7ef83b8 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -79,6 +79,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str stmt: CreateSinkStatement { sink_from: CreateSink::AsQuery(query), + into_table_name: None, .. }, } => { @@ -89,9 +90,27 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str stmt: CreateSinkStatement { sink_from: CreateSink::From(table_name), + into_table_name: None, .. }, } => replace_table_name(table_name, to), + Statement::CreateSink { + stmt: CreateSinkStatement { + sink_from, + into_table_name: Some(table_name), + .. + } + } => { + let idx = table_name.0.len() - 1; + if table_name.0[idx].real_value() == from { + table_name.0[idx] = Ident::new_unchecked(to); + } else { + match sink_from { + CreateSink::From(table_name) => replace_table_name(table_name, to), + CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to), + } + } + } _ => unreachable!(), }; stmt.to_string() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index c598a220c835f..5ba6de0ba1fad 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -194,7 +194,7 @@ impl CatalogController { let table: table::ActiveModel = table.clone().into(); Table::insert(table).exec(&txn).await?; } - StreamingJob::Sink(sink, s) => { + StreamingJob::Sink(sink, _) => { if let Some(target_table_id) = sink.target_table { if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? { bail!("Creating such a sink will result in circular dependency."); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7ff3fd39cdcb3..7cff9dc4a9b7a 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; @@ -38,7 +38,6 @@ use risingwave_connector::source::{ }; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::ObjectId; -use risingwave_pb::catalog; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId;