From 987f452d9f7565ad2d6f118e105d3a5b2a8c5266 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 4 Mar 2024 14:04:35 +0800 Subject: [PATCH] feat: Refactor the cycle check for sink into table. (#15390) --- src/frontend/src/handler/create_sink.rs | 52 +++++++++++++++++++------ 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index e667a82bbb79..0a20c4f8b4e4 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::rc::Rc; use std::sync::{Arc, LazyLock}; @@ -498,11 +498,21 @@ fn check_cycle_for_sink( reader: &CatalogReadGuard, sink_index: &HashMap, sink: &SinkCatalog, - visited_tables: &mut HashSet, + target_table_id: catalog::TableId, + path: &mut Vec, ) -> Result<()> { for table_id in &sink.dependent_relations { if let Ok(table) = reader.get_table_by_id(table_id) { - visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + path.push(table.name.clone()); + visit_table( + session, + reader, + sink_index, + table.as_ref(), + target_table_id, + path, + )?; + path.pop(); } else { bail!("streaming job not found: {:?}", table_id); } @@ -516,18 +526,26 @@ fn check_cycle_for_sink( reader: &CatalogReadGuard, sink_index: &HashMap, table: &TableCatalog, - visited_tables: &mut HashSet, + target_table_id: catalog::TableId, + path: &mut Vec, ) -> Result<()> { - if visited_tables.contains(&table.id.table_id) { + if table.id == target_table_id { + path.reverse(); + path.push(table.name.clone()); return Err(RwError::from(ErrorCode::BindError( - "Creating such a sink will result in circular dependency.".to_string(), + format!( + "Creating such a sink will result in circular dependency, path = [{}]", + path.join(", ") + ) + .to_string(), ))); } - let _ = visited_tables.insert(table.id.table_id); for sink_id in &table.incoming_sinks { if let Some(sink) = sink_index.get(sink_id) { - visit_sink(session, reader, sink_index, sink, visited_tables)? + path.push(sink.name.clone()); + visit_sink(session, reader, sink_index, sink, target_table_id, path)?; + path.pop(); } else { bail!("sink not found: {:?}", sink_id); } @@ -535,7 +553,16 @@ fn check_cycle_for_sink( for table_id in &table.dependent_relations { if let Ok(table) = reader.get_table_by_id(table_id) { - visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + path.push(table.name.clone()); + visit_table( + session, + reader, + sink_index, + table.as_ref(), + target_table_id, + path, + )?; + path.pop(); } else { bail!("streaming job not found: {:?}", table_id); } @@ -544,10 +571,11 @@ fn check_cycle_for_sink( Ok(()) } - let mut visited_tables = HashSet::new(); - visited_tables.insert(table_id.table_id); + let mut path = vec![]; + + path.push(sink_catalog.name.clone()); - visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables) + visit_sink(session, &reader, &sinks, &sink_catalog, table_id, &mut path) } pub(crate) async fn reparse_table_for_sink(