diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index 179f659e4c113..59e43773560f0 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -327,7 +327,6 @@ drop table t_primary_key; statement ok drop table t_s3; - # multi sinks statement ok @@ -423,3 +422,42 @@ drop table t_c; statement ok drop table t_m; + +# from view + +statement ok +create table t_a(v int); + +statement ok +insert into t_a values (1), (2), (3); + +statement ok +create view v_a as select v from t_a; + +statement ok +create table t_m(v int primary key); + +statement ok +create sink s_a into t_m as select v from v_a; + +statement ok +flush; + +query I +select * from t_m order by v; +---- +1 +2 +3 + +statement ok +drop sink s_a; + +statement ok +drop view v_a; + +statement ok +drop table t_m; + +statement ok +drop table t_a; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b81a40f6d5759..014742ed679f6 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -48,6 +48,7 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; use crate::catalog::source_catalog::SourceCatalog; +use crate::catalog::view_catalog::ViewCatalog; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; @@ -494,6 +495,7 @@ fn check_cycle_for_sink( let mut sinks = HashMap::new(); let mut sources = HashMap::new(); + let mut views = HashMap::new(); let db_name = session.database(); for schema in reader.iter_schemas(db_name)? { for sink in schema.iter_sink() { @@ -503,12 +505,17 @@ fn check_cycle_for_sink( for source in schema.iter_source() { sources.insert(source.id, source.as_ref()); } + + for view in schema.iter_view() { + views.insert(view.id, view.as_ref()); + } } struct Context<'a> { reader: &'a CatalogReadGuard, sink_index: &'a HashMap, source_index: &'a HashMap, + view_index: &'a HashMap, } impl Context<'_> { @@ -558,6 +565,8 @@ fn check_cycle_for_sink( path.pop(); } else if self.source_index.contains_key(&table_id.table_id) { continue; + } else if self.view_index.contains_key(&table_id.table_id) { + continue; } else { bail!("streaming job not found: {:?}", table_id); } @@ -575,6 +584,7 @@ fn check_cycle_for_sink( reader: &reader, sink_index: &sinks, source_index: &sources, + view_index: &views, }; ctx.visit_dependent_jobs(&sink_catalog.dependent_relations, table_id, &mut path)?;