diff --git a/e2e_test/background_ddl/basic.slt b/e2e_test/background_ddl/basic.slt index 3c98b6943610e..c398a95cf1f5d 100644 --- a/e2e_test/background_ddl/basic.slt +++ b/e2e_test/background_ddl/basic.slt @@ -56,6 +56,48 @@ DROP MATERIALIZED VIEW m2; statement ok DROP TABLE t; +################### Test stream now + +statement ok +create table t (v1 timestamp with time zone, v2 timestamp with time zone); + +statement ok +insert into t select to_timestamp(x) as z, to_timestamp(x) as y from generate_series(1, 1000) t(x); + +statement ok +set backfill_rate_limit=1; + +statement ok +create materialized view m1 as select * from t where v1 >= now() and v2 >= now(); + +sleep 2s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +statement ok +recover; + +sleep 5s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +statement ok +drop materialized view m1; + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +0 + +statement ok +drop table t; + statement ok SET BACKGROUND_DDL=false; diff --git a/e2e_test/source_inline/kafka/background_ddl/basic.slt.serial b/e2e_test/source_inline/kafka/background_ddl/basic.slt.serial new file mode 100644 index 0000000000000..75a7601661a89 --- /dev/null +++ b/e2e_test/source_inline/kafka/background_ddl/basic.slt.serial @@ -0,0 +1,130 @@ +control substitution on + +statement ok +SET streaming_use_shared_source TO false; + +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + type = 'append-only', + force_append_only='true' +); + +# Wait for the topic to create +skipif in-memory +sleep 5s + +############## Create kafka source + +statement ok +create source kafka_source (v1 int) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + scan.startup.mode = 'earliest', + source_rate_limit = 1, +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Create background mv with only source upstream + +statement ok +set background_ddl = true; + +statement ok +create materialized view rl_mv1 as select count(*) from kafka_source; + +statement ok +wait + +statement ok +select * from rl_mv1; + +statement ok +drop materialized view rl_mv1; + +############## Create background mv joined with another mv + +statement ok +set streaming_parallelism=1; + +statement ok +create table t1(v1 int); + +statement ok +insert into t1 select * from generate_series(1, 10); + +statement ok +flush; + +statement ok +set backfill_rate_limit = 1; + +statement ok +set background_ddl=true; + +statement ok +create materialized view rl_mv2 as select kafka_source.v1 from kafka_source join t1 on kafka_source.v1 = t1.v1; + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +sleep 1s + +statement ok +recover + +sleep 5s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +statement ok +wait + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +0 + +statement ok +drop materialized view rl_mv2; + +statement ok +drop table t1; + +############## Cleanup + +statement ok +drop source kafka_source; + +statement ok +drop sink kafka_sink; + +statement ok +drop table kafka_seed_data; + +statement ok +set background_ddl=false; + +statement ok +SET streaming_use_shared_source TO true; + +statement ok +set streaming_parallelism=default; diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 57d4454ac254a..b567619e5c768 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -33,7 +33,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; use crate::error::Result; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes; +use crate::optimizer::plan_node::utils::plan_can_use_background_ddl; use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta}; use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -88,7 +88,7 @@ impl StreamMaterialize { let create_type = if matches!(table_type, TableType::MaterializedView) && input.ctx().session_ctx().config().background_ddl() - && plan_has_backfill_leaf_nodes(&input) + && plan_can_use_background_ddl(&input) { CreateType::Background } else { diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 3e34475c8d4bb..3f5f07aa05982 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -46,7 +46,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec use crate::error::{ErrorCode, Result}; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes; +use crate::optimizer::plan_node::utils::plan_can_use_background_ddl; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -380,7 +380,7 @@ impl StreamSink { let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; let distribution_key = input.distribution().dist_column_indices().to_vec(); let create_type = if input.ctx().session_ctx().config().background_ddl() - && plan_has_backfill_leaf_nodes(&input) + && plan_can_use_background_ddl(&input) { CreateType::Background } else { diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index bc3c223c615e6..f2471bafb84e0 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -374,11 +374,15 @@ pub fn infer_kv_log_store_table_catalog_inner( table_catalog_builder.build(dist_key, read_prefix_len_hint) } -/// Check that all leaf nodes must be stream table scan, -/// since that plan node maps to `backfill` executor, which supports recovery. -pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { +/// Some other leaf nodes like `StreamValues` do not support recovery, and they +/// cannot use background ddl. +pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool { if plan.inputs().is_empty() { - if let Some(scan) = plan.as_stream_table_scan() { + if plan.as_stream_now().is_some() + || plan.as_stream_source().is_some() + { + true + } else if let Some(scan) = plan.as_stream_table_scan() { scan.stream_scan_type() == StreamScanType::Backfill || scan.stream_scan_type() == StreamScanType::ArrangementBackfill } else { @@ -386,7 +390,7 @@ pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { } } else { assert!(!plan.inputs().is_empty()); - plan.inputs().iter().all(plan_has_backfill_leaf_nodes) + plan.inputs().iter().all(plan_can_use_background_ddl) } }