diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index 179f659e4c113..59e43773560f0 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -327,7 +327,6 @@ drop table t_primary_key; statement ok drop table t_s3; - # multi sinks statement ok @@ -423,3 +422,42 @@ drop table t_c; statement ok drop table t_m; + +# from view + +statement ok +create table t_a(v int); + +statement ok +insert into t_a values (1), (2), (3); + +statement ok +create view v_a as select v from t_a; + +statement ok +create table t_m(v int primary key); + +statement ok +create sink s_a into t_m as select v from v_a; + +statement ok +flush; + +query I +select * from t_m order by v; +---- +1 +2 +3 + +statement ok +drop sink s_a; + +statement ok +drop view v_a; + +statement ok +drop table t_m; + +statement ok +drop table t_a; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index cf0bd0968d079..7bc3a05310d78 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -47,6 +47,7 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; use crate::catalog::source_catalog::SourceCatalog; +use crate::catalog::view_catalog::ViewCatalog; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; @@ -493,6 +494,7 @@ fn check_cycle_for_sink( let mut sinks = HashMap::new(); let mut sources = HashMap::new(); + let mut views = HashMap::new(); let db_name = session.database(); for schema in reader.iter_schemas(db_name)? { for sink in schema.iter_sink() { @@ -502,12 +504,17 @@ fn check_cycle_for_sink( for source in schema.iter_source() { sources.insert(source.id, source.as_ref()); } + + for view in schema.iter_view() { + views.insert(view.id, view.as_ref()); + } } struct Context<'a> { reader: &'a CatalogReadGuard, sink_index: &'a HashMap<u32, &'a SinkCatalog>, source_index: &'a HashMap<u32, &'a SourceCatalog>, + view_index: &'a HashMap<u32, &'a ViewCatalog>, } impl Context<'_> { @@ -555,7 +562,9 @@ fn check_cycle_for_sink( path.push(table.name.clone()); self.visit_table(table.as_ref(), target_table_id, path)?; path.pop(); - } else if self.source_index.get(&table_id.table_id).is_some() { + } else if self.source_index.contains_key(&table_id.table_id) + || self.view_index.contains_key(&table_id.table_id) + { continue; } else { bail!("streaming job not found: {:?}", table_id); @@ -574,6 +583,7 @@ fn check_cycle_for_sink( reader: &reader, sink_index: &sinks, source_index: &sources, + view_index: &views, }; ctx.visit_dependent_jobs(&sink_catalog.dependent_relations, table_id, &mut path)?;