From 5ba1874c3cb9c6785f2ac772ddfb6f1b3cd2ba87 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 20 May 2024 17:43:12 +0800 Subject: [PATCH] refactor(frontend): derive `create_type` when generating the table catalog (#16827) --- src/frontend/src/handler/create_mv.rs | 31 +++---------------- .../optimizer/plan_node/stream_materialize.rs | 14 ++++++++- .../src/optimizer/plan_node/stream_sink.rs | 5 ++- src/frontend/src/optimizer/plan_node/utils.rs | 17 ++++++++++ src/meta/src/rpc/ddl_controller.rs | 9 ++++-- 5 files changed, 45 insertions(+), 31 deletions(-) diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index ceaa92e06b71a..8e6882e6bb4a5 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -16,9 +16,8 @@ use either::Either; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::acl::AclMode; -use risingwave_pb::catalog::{CreateType, PbTable}; +use risingwave_pb::catalog::PbTable; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; -use risingwave_pb::stream_plan::StreamScanType; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; use super::privilege::resolve_relation_privileges; @@ -167,7 +166,7 @@ pub async fn handle_create_mv( return Ok(resp); } - let (mut table, graph, can_run_in_background) = { + let (table, graph) = { let context = OptimizerContext::from_handler_args(handler_args); if !context.with_options().is_empty() { // get other useful fields by `remove`, the logic here is to reject unknown options. @@ -186,21 +185,7 @@ It only indicates the physical clustering of the data, which may improve the per let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; - // All leaf nodes must be stream table scan, no other scan operators support recovery. - fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { - if plan.inputs().is_empty() { - if let Some(scan) = plan.as_stream_table_scan() { - scan.stream_scan_type() == StreamScanType::Backfill - || scan.stream_scan_type() == StreamScanType::ArrangementBackfill - } else { - false - } - } else { - assert!(!plan.inputs().is_empty()); - plan.inputs().iter().all(plan_has_backfill_leaf_nodes) - } - } - let can_run_in_background = plan_has_backfill_leaf_nodes(&plan); + let context = plan.plan_base().ctx().clone(); let mut graph = build_graph(plan)?; graph.parallelism = @@ -214,7 +199,7 @@ It only indicates the physical clustering of the data, which may improve the per let ctx = graph.ctx.as_mut().unwrap(); ctx.timezone = context.get_session_timezone(); - (table, graph, can_run_in_background) + (table, graph) }; // Ensure writes to `StreamJobTracker` are atomic. @@ -229,14 +214,6 @@ It only indicates the physical clustering of the data, which may improve the per table.name.clone(), )); - let run_in_background = session.config().background_ddl(); - let create_type = if run_in_background && can_run_in_background { - CreateType::Background - } else { - CreateType::Foreground - }; - table.create_type = create_type.into(); - let session = session.clone(); let catalog_writer = session.catalog_writer()?; catalog_writer diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 53b8983bfd0ed..e5a2496916adc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -33,6 +33,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; use crate::error::Result; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes; use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta}; use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -84,6 +85,14 @@ impl StreamMaterialize { let input = reorganize_elements_id(input); let columns = derive_columns(input.schema(), out_names, &user_cols)?; + let create_type = if matches!(table_type, TableType::MaterializedView) + && input.ctx().session_ctx().config().background_ddl() + && plan_has_backfill_leaf_nodes(&input) + { + CreateType::Background + } else { + CreateType::Foreground + }; let table = Self::derive_table_catalog( input.clone(), name, @@ -98,6 +107,7 @@ impl StreamMaterialize { None, cardinality, retention_seconds, + create_type, )?; Ok(Self::new(input, table)) @@ -139,6 +149,7 @@ impl StreamMaterialize { version, Cardinality::unknown(), // unknown cardinality for tables retention_seconds, + CreateType::Foreground, )?; Ok(Self::new(input, table)) @@ -210,6 +221,7 @@ impl StreamMaterialize { version: Option, cardinality: Cardinality, retention_seconds: Option, + create_type: CreateType, ) -> Result { let input = rewritten_input; @@ -259,7 +271,7 @@ impl StreamMaterialize { created_at_epoch: None, initialized_at_epoch: None, cleaned_by_watermark: false, - create_type: CreateType::Foreground, // Will be updated in the handler itself. + create_type, stream_job_status: StreamJobStatus::Creating, description: None, incoming_sinks: vec![], diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 94dbf36591444..689bf8cba7e63 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -45,6 +45,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec use crate::error::{ErrorCode, Result}; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -373,7 +374,9 @@ impl StreamSink { }; let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; let distribution_key = input.distribution().dist_column_indices().to_vec(); - let create_type = if input.ctx().session_ctx().config().background_ddl() { + let create_type = if input.ctx().session_ctx().config().background_ddl() + && plan_has_backfill_leaf_nodes(&input) + { CreateType::Background } else { CreateType::Foreground diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 3fe97555a3282..b0b50dd8f78b1 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -31,6 +31,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::TableType; use crate::catalog::{ColumnId, TableCatalog, TableId}; use crate::optimizer::property::{Cardinality, Order, RequiredDist}; +use crate::optimizer::StreamScanType; use crate::utils::{Condition, IndexSet}; #[derive(Default)] @@ -370,3 +371,19 @@ pub fn infer_kv_log_store_table_catalog_inner( table_catalog_builder.build(dist_key, read_prefix_len_hint) } + +/// Check that all leaf nodes must be stream table scan, +/// since that plan node maps to `backfill` executor, which supports recovery. +pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { + if plan.inputs().is_empty() { + if let Some(scan) = plan.as_stream_table_scan() { + scan.stream_scan_type() == StreamScanType::Backfill + || scan.stream_scan_type() == StreamScanType::ArrangementBackfill + } else { + false + } + } else { + assert!(!plan.inputs().is_empty()); + plan.inputs().iter().all(plan_has_backfill_leaf_nodes) + } +} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c8080498e1761..21ca5f25bd700 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -69,7 +69,8 @@ use crate::manager::{ CatalogManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, MetadataManagerV1, NotificationVersion, RelationIdEnum, SchemaId, SinkId, SourceId, StreamingClusterInfo, - StreamingJob, SubscriptionId, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, + StreamingJob, StreamingJobDiscriminants, SubscriptionId, TableId, UserId, ViewId, + IGNORED_NOTIFICATION_VERSION, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; @@ -909,7 +910,7 @@ impl DdlController { ) .await } - (CreateType::Background, _) => { + (CreateType::Background, &StreamingJob::MaterializedView(_)) => { let ctrl = self.clone(); let mgr = mgr.clone(); let stream_job_id = stream_job.id(); @@ -935,6 +936,10 @@ impl DdlController { tokio::spawn(fut); Ok(IGNORED_NOTIFICATION_VERSION) } + (CreateType::Background, _) => { + let d: StreamingJobDiscriminants = stream_job.into(); + bail!("background_ddl not supported for: {:?}", d) + } } }