diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index fbb77a0ca0bb5..69a3e8cf9ec55 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -74,6 +74,8 @@ pub struct TableCatalog { pub name: String, + pub dependent_relations: Vec, + /// All columns in this table. pub columns: Vec, @@ -564,6 +566,11 @@ impl From for TableCatalog { created_at_cluster_version: tb.created_at_cluster_version.clone(), initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(), retention_seconds: tb.retention_seconds, + dependent_relations: tb + .dependent_relations + .into_iter() + .map(TableId::from) + .collect_vec(), } } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index de8e93e04a784..09e8856934bca 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -533,6 +533,14 @@ fn check_cycle_for_sink( } } + for table_id in &table.dependent_relations { + if let Ok(table) = reader.get_table_by_id(table_id) { + visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + } else { + bail!("table not found: {:?}", table_id); + } + } + Ok(()) } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 3abc7ace0e494..f2acbcf9d258c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -226,6 +226,7 @@ impl StreamMaterialize { id: TableId::placeholder(), associated_source_id: None, name, + dependent_relations: vec![], columns, pk: table_pk, stream_key, diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 39d9ff5e7018d..c8cd1bb05fa83 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -141,6 +141,7 @@ impl TableCatalogBuilder { id: TableId::placeholder(), associated_source_id: None, name: String::new(), + dependent_relations: vec![], columns: self.columns.clone(), pk: self.pk, stream_key: vec![], diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index d9ea770338887..8b50aa36ea24a 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -25,7 +25,13 @@ use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::table::TableType; -use risingwave_meta_model_v2::{actor, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, SinkId}; +use risingwave_meta_model_v2::{ + actor, connection, database, fragment, function, index, object, object_dependency, schema, + sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors, + ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, + IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, SourceId, StreamSourceInfo, + StreamingParallelism, TableId, UserId, +}; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, @@ -2281,35 +2287,6 @@ impl CatalogController { .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()) .collect()) } - - pub async fn get_dependent_by_ids(&self, table_ids: Vec) -> MetaResult> { - let inner = self.inner.read().await; - ObjectDependency::find().filter() - // let table_objs = Table::find() - // .find_also_related(Object) - // .filter(table::Column::TableId.is_in(table_ids)) - // .all(&inner.db) - // .await?; - // Ok(table_objs - // .into_iter() - // .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()) - // .collect()) - } - - - // pub async fn get_sink_by_ids(&self, sink_ids: Vec) -> MetaResult> { - // let inner = self.inner.read().await; - // let sink_objs = Sink::find() - // .find_also_related(Object) - // .filter(sink::Column::SinkId.is_in(sink_ids)) - // .all(&inner.db) - // .await?; - // Ok(sink_objs - // .into_iter() - // .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()) - // .collect()) - // } - pub async fn find_creating_streaming_job_ids( &self, infos: Vec, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 9bb8af6172469..57153b23715a9 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::num::NonZeroUsize; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -34,6 +35,7 @@ use risingwave_meta_model_v2::{ FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism, TableId, TableVersion, UserId, }; +use risingwave_pb::catalog; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; @@ -97,6 +99,61 @@ impl CatalogController { Ok(obj.oid) } + async fn check_cycle_for_table_sink( + table: u32, + sink: &catalog::Sink, + txn: &DatabaseTransaction, + ) -> MetaResult { + let mut queue: VecDeque<(ObjectId, ObjectType)> = VecDeque::new(); + + let mut visited_objects = HashSet::new(); + + visited_objects.insert(table as ObjectId); + + for table_id in &sink.dependent_relations { + queue.push_front((*table_id as ObjectId, ObjectType::Table)); + } + + while let Some((object_id, object_type)) = queue.pop_front() { + if visited_objects.contains(&object_id) { + return Ok(true); + } + + visited_objects.insert(object_id); + + if object_type == ObjectType::Table { + let table = Table::find_by_id(object_id) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + for sink_id in table.incoming_sinks.inner_ref() { + println!("insert table {} incoming {} ", object_id, sink_id); + queue.push_front((*sink_id, ObjectType::Sink)); + } + } + + let dependent_relations: Vec<(ObjectId, ObjectType)> = ObjectDependency::find() + .select_only() + .column(object_dependency::Column::Oid) + .column(object::Column::ObjType) + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object2.def(), + ) + .filter(object_dependency::Column::UsedBy.eq(object_id)) + .into_tuple() + .all(txn) + .await?; + + println!("deps {:?}", dependent_relations); + + queue.extend(dependent_relations.into_iter()); + } + + Ok(false) + } + pub async fn create_job_catalog( &self, streaming_job: &mut StreamingJob, @@ -140,7 +197,13 @@ impl CatalogController { let table: table::ActiveModel = table.clone().into(); Table::insert(table).exec(&txn).await?; } - StreamingJob::Sink(sink, _) => { + StreamingJob::Sink(sink, s) => { + if let Some(target_table_id) = sink.target_table { + if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? { + bail!("Creating such a sink will result in circular dependency."); + } + } + let job_id = Self::create_streaming_job_obj( &txn, ObjectType::Sink, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7cff9dc4a9b7a..7ff3fd39cdcb3 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, VecDeque}; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; @@ -38,6 +38,7 @@ use risingwave_connector::source::{ }; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::ObjectId; +use risingwave_pb::catalog; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId;