diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index 1bc5a47907077..890087e207fd0 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -362,6 +362,35 @@ drop table t_b; statement ok drop table t_c; +# cycle check (with materialize view) + +statement ok +create table t_a(v int primary key); + +statement ok +create materialized view m_a as select v from t_a; + +statement ok +create table t_b(v int primary key); + +statement ok +create sink s_a into t_b as select v from m_a; + +statement error Creating such a sink will result in circular dependency +create sink s_b into t_a as select v from t_b; + +statement ok +drop sink s_a; + +statement ok +drop table t_b; + +statement ok +drop materialized view m_a; + +statement ok +drop table t_a; + # multi sinks statement ok diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 2954cb37384dc..edb458997e33f 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -74,6 +74,8 @@ pub struct TableCatalog { pub name: String, + pub dependent_relations: Vec, + /// All columns in this table. pub columns: Vec, @@ -573,6 +575,11 @@ impl From for TableCatalog { created_at_cluster_version: tb.created_at_cluster_version.clone(), initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(), retention_seconds: tb.retention_seconds, + dependent_relations: tb + .dependent_relations + .into_iter() + .map(TableId::from) + .collect_vec(), } } } @@ -724,6 +731,7 @@ mod tests { incoming_sinks: vec![], created_at_cluster_version: None, initialized_at_cluster_version: None, + dependent_relations: vec![], } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index de8e93e04a784..830253675c1bd 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -504,7 +504,7 @@ fn check_cycle_for_sink( if let Ok(table) = reader.get_table_by_id(table_id) { visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? } else { - bail!("table not found: {:?}", table_id); + bail!("streaming job not found: {:?}", table_id); } } @@ -533,6 +533,14 @@ fn check_cycle_for_sink( } } + for table_id in &table.dependent_relations { + if let Ok(table) = reader.get_table_by_id(table_id) { + visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + } else { + bail!("streaming job not found: {:?}", table_id); + } + } + Ok(()) } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 3abc7ace0e494..f2acbcf9d258c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -226,6 +226,7 @@ impl StreamMaterialize { id: TableId::placeholder(), associated_source_id: None, name, + dependent_relations: vec![], columns, pk: table_pk, stream_key, diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 39d9ff5e7018d..c8cd1bb05fa83 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -141,6 +141,7 @@ impl TableCatalogBuilder { id: TableId::placeholder(), associated_source_id: None, name: String::new(), + dependent_relations: vec![], columns: self.columns.clone(), pk: self.pk, stream_key: vec![], diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 6295d8036b566..515a83d0923ef 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -543,6 +543,7 @@ pub(crate) mod tests { id: table_id, associated_source_id: None, name: "test".to_string(), + dependent_relations: vec![], columns: vec![ ColumnCatalog { column_desc: ColumnDesc::new_atomic(DataType::Int32, "a", 0),