diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index c78f07f1270e8..88878667b8411 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -33,7 +33,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; use crate::error::Result; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes; +use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl; use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta}; use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -88,7 +88,7 @@ impl StreamMaterialize { let create_type = if matches!(table_type, TableType::MaterializedView) && input.ctx().session_ctx().config().background_ddl() - && plan_has_backfill_leaf_nodes(&input) + && plan_can_use_backgronud_ddl(&input) { CreateType::Background } else { diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 3e34475c8d4bb..69e2086b9dd3b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -46,7 +46,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec use crate::error::{ErrorCode, Result}; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes; +use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -380,7 +380,7 @@ impl StreamSink { let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; let distribution_key = input.distribution().dist_column_indices().to_vec(); let create_type = if input.ctx().session_ctx().config().background_ddl() - && plan_has_backfill_leaf_nodes(&input) + && plan_can_use_backgronud_ddl(&input) { CreateType::Background } else { diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index d25aee9d20c8b..4aefa1e54c882 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -377,9 +377,13 @@ pub fn infer_kv_log_store_table_catalog_inner( /// Check that all leaf nodes must be stream table scan, /// since that plan node maps to `backfill` executor, which supports recovery. -pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { +/// Some other leaf nodes like `StreamValues` do not support recovery, and they +/// cannot use background ddl. +pub(crate) fn plan_can_use_backgronud_ddl(plan: &PlanRef) -> bool { if plan.inputs().is_empty() { - if let Some(scan) = plan.as_stream_table_scan() { + if plan.as_stream_source_scan().is_some() { + true + } else if let Some(scan) = plan.as_stream_table_scan() { scan.stream_scan_type() == StreamScanType::Backfill || scan.stream_scan_type() == StreamScanType::ArrangementBackfill } else { @@ -387,7 +391,7 @@ pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { } } else { assert!(!plan.inputs().is_empty()); - plan.inputs().iter().all(plan_has_backfill_leaf_nodes) + plan.inputs().iter().all(plan_can_use_backgronud_ddl) } } diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 43abd22fba8dd..75f8ee6e6e7e4 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -76,6 +76,7 @@ impl DdlController { tracing::debug!( id = job_id, definition = streaming_job.definition(), + create_type = streaming_job.create_type().as_str_name(), "starting streaming job", ); let _permit = self