Skip to content

Commit

Permalink
fix: hotfix cycle check for sink from source (#15639)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Mar 14, 2024
1 parent a82a02f commit 0e460ca
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 132 deletions.
63 changes: 0 additions & 63 deletions e2e_test/sink/sink_into_table/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -327,69 +327,6 @@ drop table t_primary_key;
statement ok
drop table t_s3;

# cycle check

statement ok
create table t_a(v int primary key);

statement ok
create table t_b(v int primary key);

statement ok
create table t_c(v int primary key);

statement ok
create sink s_a into t_b as select v from t_a;

statement ok
create sink s_b into t_c as select v from t_b;

statement error Creating such a sink will result in circular dependency
create sink s_c into t_a as select v from t_c;

statement ok
drop sink s_a;

statement ok
drop sink s_b;

statement ok
drop table t_a;

statement ok
drop table t_b;

statement ok
drop table t_c;

# cycle check (with materialize view)

statement ok
create table t_a(v int primary key);

statement ok
create materialized view m_a as select v from t_a;

statement ok
create table t_b(v int primary key);

statement ok
create sink s_a into t_b as select v from m_a;

statement error Creating such a sink will result in circular dependency
create sink s_b into t_a as select v from t_b;

statement ok
drop sink s_a;

statement ok
drop table t_b;

statement ok
drop materialized view m_a;

statement ok
drop table t_a;

# multi sinks

Expand Down
110 changes: 110 additions & 0 deletions e2e_test/sink/sink_into_table/cycle.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# cycle check

statement ok
create table t_a(v int primary key);

statement ok
create table t_b(v int primary key);

statement ok
create table t_c(v int primary key);

statement ok
create sink s_a into t_b as select v from t_a;

statement ok
create sink s_b into t_c as select v from t_b;

statement error Creating such a sink will result in circular dependency
create sink s_c into t_a as select v from t_c;

statement ok
drop sink s_a;

statement ok
drop sink s_b;

statement ok
drop table t_a;

statement ok
drop table t_b;

statement ok
drop table t_c;

# cycle check (with materialize view)

statement ok
create table t_a(v int primary key);

statement ok
create materialized view m_a as select v from t_a;

statement ok
create table t_b(v int primary key);

statement ok
create sink s_a into t_b as select v from m_a;

statement error Creating such a sink will result in circular dependency
create sink s_b into t_a as select v from t_b;

statement ok
drop sink s_a;

statement ok
drop table t_b;

statement ok
drop materialized view m_a;

statement ok
drop table t_a;

# cycle check (with source)

statement ok
create source ss_a (v1 int, v2 float) with (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v1.end = '10',
fields.v2.kind = 'sequence',
fields.v2.start = '11',
fields.v2.end = '20',
datagen.rows.per.second = '15',
datagen.split.num = '1'
);

statement ok
create table t_a(v int primary key);

statement ok
create sink s_a into t_a as select v1 from ss_a;

statement ok
drop sink s_a;

statement ok
drop table t_a;

statement ok
drop source ss_a;

# cycle check (with index)

statement ok
create table t_a(v int primary key);

statement ok
create index i_a on t_a(v);

statement error Creating such a sink will result in circular dependency
create sink s_a into t_a as select v from i_a;

statement ok
drop index i_a;

statement ok
drop table t_a;
137 changes: 68 additions & 69 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, TableId, UserId};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::{bail, catalog};
Expand All @@ -48,6 +48,7 @@ use super::create_source::UPSTREAM_SOURCE_KEY;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::source_catalog::SourceCatalog;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{ExprImpl, InputRef, Literal};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
Expand Down Expand Up @@ -487,95 +488,93 @@ fn check_cycle_for_sink(
let reader = session.env().catalog_reader().read_guard();

let mut sinks = HashMap::new();
let mut sources = HashMap::new();
let db_name = session.database();
for schema in reader.iter_schemas(db_name)? {
for sink in schema.iter_sink() {
sinks.insert(sink.id.sink_id, sink.as_ref());
}
}
fn visit_sink(
session: &SessionImpl,
reader: &CatalogReadGuard,
sink_index: &HashMap<u32, &SinkCatalog>,
sink: &SinkCatalog,
target_table_id: catalog::TableId,
path: &mut Vec<String>,
) -> Result<()> {
for table_id in &sink.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
path.push(table.name.clone());
visit_table(
session,
reader,
sink_index,
table.as_ref(),
target_table_id,
path,
)?;
path.pop();
} else {
bail!("streaming job not found: {:?}", table_id);
}

for source in schema.iter_source() {
sources.insert(source.id, source.as_ref());
}
}

Ok(())
struct Context<'a> {
reader: &'a CatalogReadGuard,
sink_index: &'a HashMap<u32, &'a SinkCatalog>,
source_index: &'a HashMap<u32, &'a SourceCatalog>,
}

fn visit_table(
session: &SessionImpl,
reader: &CatalogReadGuard,
sink_index: &HashMap<u32, &SinkCatalog>,
table: &TableCatalog,
target_table_id: catalog::TableId,
path: &mut Vec<String>,
) -> Result<()> {
if table.id == target_table_id {
path.reverse();
path.push(table.name.clone());
return Err(RwError::from(ErrorCode::BindError(
format!(
"Creating such a sink will result in circular dependency, path = [{}]",
path.join(", ")
)
.to_string(),
)));
}
impl Context<'_> {
fn visit_table(
&self,
table: &TableCatalog,
target_table_id: catalog::TableId,
path: &mut Vec<String>,
) -> Result<()> {
if table.id == target_table_id {
path.reverse();
path.push(table.name.clone());
return Err(RwError::from(ErrorCode::BindError(
format!(
"Creating such a sink will result in circular dependency, path = [{}]",
path.join(", ")
)
.to_string(),
)));
}

for sink_id in &table.incoming_sinks {
if let Some(sink) = sink_index.get(sink_id) {
path.push(sink.name.clone());
visit_sink(session, reader, sink_index, sink, target_table_id, path)?;
path.pop();
} else {
bail!("sink not found: {:?}", sink_id);
for sink_id in &table.incoming_sinks {
if let Some(sink) = self.sink_index.get(sink_id) {
path.push(sink.name.clone());
self.visit_dependent_jobs(&sink.dependent_relations, target_table_id, path)?;
path.pop();
} else {
bail!("sink not found: {:?}", sink_id);
}
}

self.visit_dependent_jobs(&table.dependent_relations, target_table_id, path)?;

Ok(())
}

for table_id in &table.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
path.push(table.name.clone());
visit_table(
session,
reader,
sink_index,
table.as_ref(),
target_table_id,
path,
)?;
path.pop();
} else {
bail!("streaming job not found: {:?}", table_id);
fn visit_dependent_jobs(
&self,
dependent_jobs: &[TableId],
target_table_id: TableId,
path: &mut Vec<String>,
) -> Result<()> {
for table_id in dependent_jobs {
if let Ok(table) = self.reader.get_table_by_id(table_id) {
path.push(table.name.clone());
self.visit_table(table.as_ref(), target_table_id, path)?;
path.pop();
} else if self.source_index.get(&table_id.table_id).is_some() {
continue;
} else {
bail!("streaming job not found: {:?}", table_id);
}
}
}

Ok(())
Ok(())
}
}

let mut path = vec![];

path.push(sink_catalog.name.clone());

visit_sink(session, &reader, &sinks, &sink_catalog, table_id, &mut path)
let ctx = Context {
reader: &reader,
sink_index: &sinks,
source_index: &sources,
};

ctx.visit_dependent_jobs(&sink_catalog.dependent_relations, table_id, &mut path)?;

Ok(())
}

pub(crate) async fn reparse_table_for_sink(
Expand Down

0 comments on commit 0e460ca

Please sign in to comment.