Skip to content

Commit

Permalink
Refactored return types and statements.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Dec 7, 2023
1 parent 2dfdd78 commit a9501cf
Showing 1 changed file with 20 additions and 24 deletions.
44 changes: 20 additions & 24 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,7 @@ pub async fn handle_create_sink(
bail_not_implemented!(issue = 13818, "create sink into table with incoming sinks");
}

if check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())? {
return Err(RwError::from(ErrorCode::BindError(
"Creating such a sink will result in circular dependency.".to_string(),
)));
}
check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?;

let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;
Expand Down Expand Up @@ -357,7 +353,7 @@ fn check_cycle_for_sink(
session: &SessionImpl,
sink_catalog: SinkCatalog,
table_id: catalog::TableId,
) -> Result<bool> {
) -> Result<()> {
let reader = session.env().catalog_reader().read_guard();

let mut sinks = HashMap::new();
Expand All @@ -373,17 +369,16 @@ fn check_cycle_for_sink(
sink_index: &HashMap<u32, &SinkCatalog>,
sink: &SinkCatalog,
visited_tables: &mut HashSet<u32>,
) -> Result<bool> {
) -> Result<()> {
for table_id in &sink.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
if visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? {
return Ok(true);
}
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
} else {
bail!("table not found: {:?}", table_id);
}
}
Ok(false)

Ok(())
}

fn visit_table(
Expand All @@ -392,22 +387,23 @@ fn check_cycle_for_sink(
sink_index: &HashMap<u32, &SinkCatalog>,
table: &TableCatalog,
visited_tables: &mut HashSet<u32>,
) -> Result<bool> {
) -> Result<()> {
if visited_tables.contains(&table.id.table_id) {
Ok(true)
} else {
let _ = visited_tables.insert(table.id.table_id);
for sink_id in &table.incoming_sinks {
if let Some(sink) = sink_index.get(sink_id) {
if visit_sink(session, reader, sink_index, sink, visited_tables)? {
return Ok(true);
}
} else {
bail!("sink not found: {:?}", sink_id);
}
return Err(RwError::from(ErrorCode::BindError(
"Creating such a sink will result in circular dependency.".to_string(),
)));
}

let _ = visited_tables.insert(table.id.table_id);
for sink_id in &table.incoming_sinks {
if let Some(sink) = sink_index.get(sink_id) {
visit_sink(session, reader, sink_index, sink, visited_tables)?
} else {
bail!("sink not found: {:?}", sink_id);
}
Ok(false)
}

Ok(())
}

let mut visited_tables = HashSet::new();
Expand Down

0 comments on commit a9501cf

Please sign in to comment.