Skip to content

Commit

Permalink
fix: refine cycle check for sink into table (#15170)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Feb 23, 2024
1 parent 07bd890 commit 304709b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 1 deletion.
29 changes: 29 additions & 0 deletions e2e_test/sink/sink_into_table/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,35 @@ drop table t_b;
statement ok
drop table t_c;

# cycle check (with materialize view)

statement ok
create table t_a(v int primary key);

statement ok
create materialized view m_a as select v from t_a;

statement ok
create table t_b(v int primary key);

statement ok
create sink s_a into t_b as select v from m_a;

statement error Creating such a sink will result in circular dependency
create sink s_b into t_a as select v from t_b;

statement ok
drop sink s_a;

statement ok
drop table t_b;

statement ok
drop materialized view m_a;

statement ok
drop table t_a;

# multi sinks

statement ok
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct TableCatalog {

pub name: String,

pub dependent_relations: Vec<TableId>,

/// All columns in this table.
pub columns: Vec<ColumnCatalog>,

Expand Down Expand Up @@ -573,6 +575,11 @@ impl From<PbTable> for TableCatalog {
created_at_cluster_version: tb.created_at_cluster_version.clone(),
initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
retention_seconds: tb.retention_seconds,
dependent_relations: tb
.dependent_relations
.into_iter()
.map(TableId::from)
.collect_vec(),
}
}
}
Expand Down Expand Up @@ -724,6 +731,7 @@ mod tests {
incoming_sinks: vec![],
created_at_cluster_version: None,
initialized_at_cluster_version: None,
dependent_relations: vec![],
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ fn check_cycle_for_sink(
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
} else {
bail!("table not found: {:?}", table_id);
bail!("streaming job not found: {:?}", table_id);
}
}

Expand Down Expand Up @@ -533,6 +533,14 @@ fn check_cycle_for_sink(
}
}

for table_id in &table.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
} else {
bail!("streaming job not found: {:?}", table_id);
}
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl StreamMaterialize {
id: TableId::placeholder(),
associated_source_id: None,
name,
dependent_relations: vec![],
columns,
pk: table_pk,
stream_key,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl TableCatalogBuilder {
id: TableId::placeholder(),
associated_source_id: None,
name: String::new(),
dependent_relations: vec![],
columns: self.columns.clone(),
pk: self.pk,
stream_key: vec![],
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ pub(crate) mod tests {
id: table_id,
associated_source_id: None,
name: "test".to_string(),
dependent_relations: vec![],
columns: vec![
ColumnCatalog {
column_desc: ColumnDesc::new_atomic(DataType::Int32, "a", 0),
Expand Down

0 comments on commit 304709b

Please sign in to comment.