From e36c80ab2ef87e13f6a5111c305bb14b778ed456 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 19 Jan 2024 23:56:33 +0800 Subject: [PATCH] fix: cdc source should not add row id gen --- src/frontend/src/handler/create_source.rs | 3 +- src/frontend/src/handler/create_table.rs | 3 +- src/frontend/src/optimizer/mod.rs | 13 ++- .../src/optimizer/plan_node/generic/source.rs | 19 ++++- .../src/optimizer/plan_node/logical_source.rs | 79 ++++++++++--------- src/frontend/src/planner/relation.rs | 8 +- 6 files changed, 77 insertions(+), 48 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 07f72145be46..d677d123812d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -71,6 +71,7 @@ use crate::handler::util::{ get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt, }; use crate::handler::HandlerArgs; +use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::resolve_privatelink_in_with_option; @@ -1264,7 +1265,7 @@ pub async fn handle_create_source( // cdc source is an append-only source in plain json format let source_node = LogicalSource::with_catalog( Rc::new(SourceCatalog::from(&source)), - false, + SourceNodeKind::CreateSourceWithStreamjob, context.into(), )?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ac88fc01f4eb..d0aaa4c5c21a 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -59,6 +59,7 @@ use crate::handler::create_source::{ check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; +use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; @@ -682,7 +683,7 @@ fn gen_table_plan_inner( source_catalog.clone(), columns.clone(), row_id_index, - true, + SourceNodeKind::CreateTable, context.clone(), )? .into(); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 66360b8fde5d..4e8caae1cc40 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -70,7 +70,7 @@ use self::property::{Cardinality, RequiredDist}; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; use crate::expr::TimestamptzExprFinder; -use crate::optimizer::plan_node::generic::Union; +use crate::optimizer::plan_node::generic::{SourceNodeKind, Union}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, ToStream, VisitExprsRecursive, @@ -618,9 +618,14 @@ impl PlanRoot { } }; - let dummy_source_node = - LogicalSource::new(None, columns.clone(), row_id_index, true, context.clone()) - .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?; + let dummy_source_node = LogicalSource::new( + None, + columns.clone(), + row_id_index, + SourceNodeKind::CreateTable, + context.clone(), + ) + .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?; let dml_node = inject_dml_node( &columns, diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 0903b8b3a54d..09d13224f0c6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -29,6 +29,22 @@ use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; use crate::{TableCatalog, WithOptions}; +/// In which scnario the source node is created +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[expect(clippy::enum_variant_names)] +pub enum SourceNodeKind { + /// `CREATE TABLE` with a connector. + CreateTable, + /// `CREATE SOURCE` with a streaming job (backfill-able source). + CreateSourceWithStreamjob, + /// `CREATE MATERIALIZED VIEW` which selects from a non backfill-able source. + /// + /// Note: + /// - For non backfill-able source, `CREATE SOURCE` will not create a source node. + /// - For backfill-able source, `CREATE MATERIALIZE VIEW` will not create a source node. + CreateMView, +} + /// [`Source`] returns contents of a table or other equivalent object #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] @@ -42,8 +58,7 @@ pub struct Source { pub column_catalog: Vec, pub row_id_index: Option, - /// True if it is a source created when creating table with a source. - pub for_table: bool, + pub kind: SourceNodeKind, #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index bf8c9d3c697a..413ae3dd4867 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -28,7 +28,7 @@ use risingwave_connector::source::DataType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; -use super::generic::GenericPlanRef; +use super::generic::{GenericPlanRef, SourceNodeKind}; use super::stream_watermark_filter::StreamWatermarkFilter; use super::utils::{childless_record, Distill}; use super::{ @@ -69,7 +69,7 @@ impl LogicalSource { source_catalog: Option>, column_catalog: Vec, row_id_index: Option, - for_table: bool, + kind: SourceNodeKind, ctx: OptimizerContextRef, ) -> Result { let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); @@ -77,7 +77,7 @@ impl LogicalSource { catalog: source_catalog, column_catalog, row_id_index, - for_table, + kind, ctx, kafka_timestamp_range, }; @@ -97,7 +97,7 @@ impl LogicalSource { pub fn with_catalog( source_catalog: Rc, - for_table: bool, + kind: SourceNodeKind, ctx: OptimizerContextRef, ) -> Result { let column_catalogs = source_catalog.columns.clone(); @@ -110,7 +110,7 @@ impl LogicalSource { Some(source_catalog), column_catalogs, row_id_index, - for_table, + kind, ctx, ) } @@ -506,45 +506,46 @@ impl ToBatch for LogicalSource { impl ToStream for LogicalSource { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { let mut plan: PlanRef; - - if self.core.for_table { - // Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan - if self.core.is_new_fs_connector() { - plan = Self::create_fs_list_plan(self.core.clone())?; - plan = StreamFsFetch::new(plan, self.core.clone()).into(); - } else { - plan = StreamSource::new(self.core.clone()).into() - } - } else { - // Create MV on source. - if self.core.is_new_fs_connector() { - plan = Self::create_fs_list_plan(self.core.clone())?; - plan = StreamFsFetch::new(plan, self.core.clone()).into(); - } else { - plan = StreamSource::new(self.core.clone()).into() + match self.core.kind { + SourceNodeKind::CreateTable | SourceNodeKind::CreateSourceWithStreamjob => { + // Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan + if self.core.is_new_fs_connector() { + plan = Self::create_fs_list_plan(self.core.clone())?; + plan = StreamFsFetch::new(plan, self.core.clone()).into(); + } else { + plan = StreamSource::new(self.core.clone()).into() + } } + SourceNodeKind::CreateMView => { + // Create MV on source. + if self.core.is_new_fs_connector() { + plan = Self::create_fs_list_plan(self.core.clone())?; + plan = StreamFsFetch::new(plan, self.core.clone()).into(); + } else { + plan = StreamSource::new(self.core.clone()).into() + } - if let Some(exprs) = &self.output_exprs { - let logical_project = generic::Project::new(exprs.to_vec(), plan); - plan = StreamProject::new(logical_project).into(); - } + if let Some(exprs) = &self.output_exprs { + let logical_project = generic::Project::new(exprs.to_vec(), plan); + plan = StreamProject::new(logical_project).into(); + } - if let Some(catalog) = self.source_catalog() - && !catalog.watermark_descs.is_empty() - { - plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); - } + if let Some(catalog) = self.source_catalog() + && !catalog.watermark_descs.is_empty() + { + plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); + } - if let Some(row_id_index) = self.output_row_id_index { - plan = StreamRowIdGen::new_with_dist( - plan, - row_id_index, - HashShard(vec![row_id_index]), - ) - .into(); + if let Some(row_id_index) = self.output_row_id_index { + plan = StreamRowIdGen::new_with_dist( + plan, + row_id_index, + HashShard(vec![row_id_index]), + ) + .into(); + } } - }; - + } Ok(plan) } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 42fdc83a3f93..57edea4116e2 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -25,6 +25,7 @@ use crate::binder::{ BoundWindowTableFunction, Relation, WindowTableFunctionKind, }; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{ LogicalApply, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, @@ -85,7 +86,12 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into()) + Ok(LogicalSource::with_catalog( + Rc::new(source.catalog), + SourceNodeKind::CreateMView, + self.ctx(), + )? + .into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result {