diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index e667a82bbb79f..0a20c4f8b4e4f 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::rc::Rc; use std::sync::{Arc, LazyLock}; @@ -498,11 +498,21 @@ fn check_cycle_for_sink( reader: &CatalogReadGuard, sink_index: &HashMap, sink: &SinkCatalog, - visited_tables: &mut HashSet, + target_table_id: catalog::TableId, + path: &mut Vec, ) -> Result<()> { for table_id in &sink.dependent_relations { if let Ok(table) = reader.get_table_by_id(table_id) { - visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + path.push(table.name.clone()); + visit_table( + session, + reader, + sink_index, + table.as_ref(), + target_table_id, + path, + )?; + path.pop(); } else { bail!("streaming job not found: {:?}", table_id); } @@ -516,18 +526,26 @@ fn check_cycle_for_sink( reader: &CatalogReadGuard, sink_index: &HashMap, table: &TableCatalog, - visited_tables: &mut HashSet, + target_table_id: catalog::TableId, + path: &mut Vec, ) -> Result<()> { - if visited_tables.contains(&table.id.table_id) { + if table.id == target_table_id { + path.reverse(); + path.push(table.name.clone()); return Err(RwError::from(ErrorCode::BindError( - "Creating such a sink will result in circular dependency.".to_string(), + format!( + "Creating such a sink will result in circular dependency, path = [{}]", + path.join(", ") + ) + .to_string(), ))); } - let _ = visited_tables.insert(table.id.table_id); for sink_id in &table.incoming_sinks { if let Some(sink) = sink_index.get(sink_id) { - visit_sink(session, reader, sink_index, sink, visited_tables)? + path.push(sink.name.clone()); + visit_sink(session, reader, sink_index, sink, target_table_id, path)?; + path.pop(); } else { bail!("sink not found: {:?}", sink_id); } @@ -535,7 +553,16 @@ fn check_cycle_for_sink( for table_id in &table.dependent_relations { if let Ok(table) = reader.get_table_by_id(table_id) { - visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + path.push(table.name.clone()); + visit_table( + session, + reader, + sink_index, + table.as_ref(), + target_table_id, + path, + )?; + path.pop(); } else { bail!("streaming job not found: {:?}", table_id); } @@ -544,10 +571,11 @@ fn check_cycle_for_sink( Ok(()) } - let mut visited_tables = HashSet::new(); - visited_tables.insert(table_id.table_id); + let mut path = vec![]; + + path.push(sink_catalog.name.clone()); - visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables) + visit_sink(session, &reader, &sinks, &sink_catalog, table_id, &mut path) } pub(crate) async fn reparse_table_for_sink(