Skip to content

Commit

Permalink
feat: Refactor the cycle check for sink into table. (#15390)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored and shanicky committed Mar 4, 2024
1 parent 5e6e2ca commit b9d28ea
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::{Arc, LazyLock};

Expand Down Expand Up @@ -498,11 +498,21 @@ fn check_cycle_for_sink(
reader: &CatalogReadGuard,
sink_index: &HashMap<u32, &SinkCatalog>,
sink: &SinkCatalog,
visited_tables: &mut HashSet<u32>,
target_table_id: catalog::TableId,
path: &mut Vec<String>,
) -> Result<()> {
for table_id in &sink.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
path.push(table.name.clone());
visit_table(
session,
reader,
sink_index,
table.as_ref(),
target_table_id,
path,
)?;
path.pop();
} else {
bail!("streaming job not found: {:?}", table_id);
}
Expand All @@ -516,26 +526,43 @@ fn check_cycle_for_sink(
reader: &CatalogReadGuard,
sink_index: &HashMap<u32, &SinkCatalog>,
table: &TableCatalog,
visited_tables: &mut HashSet<u32>,
target_table_id: catalog::TableId,
path: &mut Vec<String>,
) -> Result<()> {
if visited_tables.contains(&table.id.table_id) {
if table.id == target_table_id {
path.reverse();
path.push(table.name.clone());
return Err(RwError::from(ErrorCode::BindError(
"Creating such a sink will result in circular dependency.".to_string(),
format!(
"Creating such a sink will result in circular dependency, path = [{}]",
path.join(", ")
)
.to_string(),
)));
}

let _ = visited_tables.insert(table.id.table_id);
for sink_id in &table.incoming_sinks {
if let Some(sink) = sink_index.get(sink_id) {
visit_sink(session, reader, sink_index, sink, visited_tables)?
path.push(sink.name.clone());
visit_sink(session, reader, sink_index, sink, target_table_id, path)?;
path.pop();
} else {
bail!("sink not found: {:?}", sink_id);
}
}

for table_id in &table.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
path.push(table.name.clone());
visit_table(
session,
reader,
sink_index,
table.as_ref(),
target_table_id,
path,
)?;
path.pop();
} else {
bail!("streaming job not found: {:?}", table_id);
}
Expand All @@ -544,10 +571,11 @@ fn check_cycle_for_sink(
Ok(())
}

let mut visited_tables = HashSet::new();
visited_tables.insert(table_id.table_id);
let mut path = vec![];

path.push(sink_catalog.name.clone());

visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables)
visit_sink(session, &reader, &sinks, &sink_catalog, table_id, &mut path)
}

pub(crate) async fn reparse_table_for_sink(
Expand Down

0 comments on commit b9d28ea

Please sign in to comment.