diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f261bd9825f94..d677d123812d8 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -71,6 +71,7 @@ use crate::handler::util::{ get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt, }; use crate::handler::HandlerArgs; +use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::resolve_privatelink_in_with_option; @@ -1262,12 +1263,9 @@ pub async fn handle_create_source( let graph = { let context = OptimizerContext::from_handler_args(handler_args); // cdc source is an append-only source in plain json format - let source_node = LogicalSource::new( - Some(Rc::new(SourceCatalog::from(&source))), - columns.clone(), - row_id_index, - false, - false, + let source_node = LogicalSource::with_catalog( + Rc::new(SourceCatalog::from(&source)), + SourceNodeKind::CreateSourceWithStreamjob, context.into(), )?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d92e474962958..d0aaa4c5c21a8 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -59,6 +59,7 @@ use crate::handler::create_source::{ check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; +use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; @@ -682,8 +683,7 @@ fn gen_table_plan_inner( source_catalog.clone(), columns.clone(), row_id_index, - false, - true, + SourceNodeKind::CreateTable, context.clone(), )? .into(); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index cc66d1341b4f7..4e8caae1cc403 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -70,7 +70,7 @@ use self::property::{Cardinality, RequiredDist}; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; use crate::expr::TimestamptzExprFinder; -use crate::optimizer::plan_node::generic::Union; +use crate::optimizer::plan_node::generic::{SourceNodeKind, Union}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, ToStream, VisitExprsRecursive, @@ -622,8 +622,7 @@ impl PlanRoot { None, columns.clone(), row_id_index, - false, - true, + SourceNodeKind::CreateTable, context.clone(), ) .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?; diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index bbb1b1f48673a..803b74cd6fa11 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -20,6 +20,7 @@ use educe::Educe; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; +use risingwave_connector::source::ConnectorProperties; use super::super::utils::TableCatalogBuilder; use super::GenericPlanNode; @@ -28,20 +29,37 @@ use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; use crate::{TableCatalog, WithOptions}; +/// In which scnario the source node is created +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[expect(clippy::enum_variant_names)] +pub enum SourceNodeKind { + /// `CREATE TABLE` with a connector. + CreateTable, + /// `CREATE SOURCE` with a streaming job (backfill-able source). + CreateSourceWithStreamjob, + /// `CREATE MATERIALIZED VIEW` which selects from a source. + /// + /// Note: + /// - For non backfill-able source, `CREATE SOURCE` will not create a source node, and `CREATE MATERIALIZE VIEW` will create a `LogicalSource`. + /// - For backfill-able source, `CREATE MATERIALIZE VIEW` will create `LogicalSourceBackfill` instead of `LogicalSource`. + CreateMViewOrBatch, +} + /// [`Source`] returns contents of a table or other equivalent object #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] pub struct Source { /// If there is an external stream source, `catalog` will be `Some`. Otherwise, it is `None`. pub catalog: Option>, - /// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan - /// generating, even if there is no external stream source. + + // NOTE: Here we store `column_catalog` and `row_id_index` + // because they are needed when `catalog` is None. + // When `catalog` is Some, they are the same as these fields in `catalog`. pub column_catalog: Vec, pub row_id_index: Option, - /// Whether the "SourceNode" should generate the row id column for append only source - pub gen_row_id: bool, - /// True if it is a source created when creating table with a source. - pub for_table: bool, + + pub kind: SourceNodeKind, + #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, @@ -80,6 +98,30 @@ impl GenericPlanNode for Source { } impl Source { + pub fn is_new_fs_connector(&self) -> bool { + self.catalog.as_ref().is_some_and(|catalog| { + ConnectorProperties::is_new_fs_connector_b_tree_map(&catalog.with_properties) + }) + } + + /// The columns in stream/batch source node indicate the actual columns it will produce, + /// instead of the columns defined in source catalog. The difference is generated columns. + pub fn exclude_generated_columns(mut self) -> (Self, Option) { + let original_row_id_index = self.row_id_index; + // minus the number of generated columns before row_id_index. + self.row_id_index = original_row_id_index.map(|idx| { + let mut cnt = 0; + for col in self.column_catalog.iter().take(idx + 1) { + if col.is_generated() { + cnt += 1; + } + } + idx - cnt + }); + self.column_catalog.retain(|c| !c.is_generated()); + (self, original_row_id_index) + } + pub fn kafka_timestamp_range_value(&self) -> (Option, Option) { let (lower_bound, upper_bound) = &self.kafka_timestamp_range; let lower_bound = match lower_bound { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index f86f31c1e0765..fbc9fe7a40c8c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -18,18 +18,17 @@ use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; use fixedbitset::FixedBitSet; -use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::error::Result; -use risingwave_connector::source::{ConnectorProperties, DataType}; +use risingwave_connector::source::DataType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; -use super::generic::GenericPlanRef; +use super::generic::{GenericPlanRef, SourceNodeKind}; use super::stream_watermark_filter::StreamWatermarkFilter; use super::utils::{childless_record, Distill}; use super::{ @@ -60,6 +59,9 @@ pub struct LogicalSource { /// Expressions to output. This field presents and will be turned to a `Project` when /// converting to a physical plan, only if there are generated columns. output_exprs: Option>, + /// When there are generated columns, the `StreamRowIdGen`'s row_id_index is different from + /// the one in `core`. So we store the one in `output_exprs` here. + output_row_id_index: Option, } impl LogicalSource { @@ -67,8 +69,7 @@ impl LogicalSource { source_catalog: Option>, column_catalog: Vec, row_id_index: Option, - gen_row_id: bool, - for_table: bool, + kind: SourceNodeKind, ctx: OptimizerContextRef, ) -> Result { let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); @@ -76,8 +77,7 @@ impl LogicalSource { catalog: source_catalog, column_catalog, row_id_index, - gen_row_id, - for_table, + kind, ctx, kafka_timestamp_range, }; @@ -85,33 +85,40 @@ impl LogicalSource { let base = PlanBase::new_logical_with_core(&core); let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; + let (core, output_row_id_index) = core.exclude_generated_columns(); Ok(LogicalSource { base, core, output_exprs, + output_row_id_index, }) } pub fn with_catalog( source_catalog: Rc, - for_table: bool, + kind: SourceNodeKind, ctx: OptimizerContextRef, ) -> Result { let column_catalogs = source_catalog.columns.clone(); let row_id_index = source_catalog.row_id_index; - let gen_row_id = source_catalog.append_only; + if !source_catalog.append_only { + assert!(row_id_index.is_none()); + } Self::new( Some(source_catalog), column_catalogs, row_id_index, - gen_row_id, - for_table, + kind, ctx, ) } + /// If there are no generated columns, returns `None`. + /// + /// Otherwise, the returned expressions correspond to all columns. + /// Non-generated columns are represented by `InputRef`. pub fn derive_output_exprs_from_generated_columns( columns: &[ColumnCatalog], ) -> Result>> { @@ -162,9 +169,9 @@ impl LogicalSource { Ok(Some(exprs)) } - fn rewrite_new_s3_plan(&self) -> Result { + /// `StreamSource` (list) -> shuffle -> `StreamDedup` + fn create_fs_list_plan(core: generic::Source) -> Result { let logical_source = generic::Source { - catalog: self.core.catalog.clone(), column_catalog: vec![ ColumnCatalog { column_desc: ColumnDesc::from_field_with_column_id( @@ -204,8 +211,7 @@ impl LogicalSource { }, ], row_id_index: None, - gen_row_id: false, - ..self.core.clone() + ..core }; let mut new_s3_plan: PlanRef = StreamSource { base: PlanBase::new_stream_with_core( @@ -229,20 +235,6 @@ impl LogicalSource { Ok(new_s3_plan) } - /// `row_id_index` in source node should rule out generated column - #[must_use] - fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option) -> Option { - row_id_index.map(|idx| { - let mut cnt = 0; - for col in columns.iter().take(idx + 1) { - if col.is_generated() { - cnt += 1; - } - } - idx - cnt - }) - } - pub fn source_catalog(&self) -> Option> { self.core.catalog.clone() } @@ -254,52 +246,7 @@ impl LogicalSource { base: self.base.clone(), core, output_exprs: self.output_exprs.clone(), - } - } - - /// The columns in stream/batch source node indicate the actual columns it will produce, - /// instead of the columns defined in source catalog. The difference is generated columns. - #[must_use] - fn rewrite_to_stream_batch_source(&self) -> generic::Source { - let column_catalog = self.core.column_catalog.clone(); - // Filter out the generated columns. - let row_id_index = Self::rewrite_row_id_idx(&column_catalog, self.core.row_id_index); - let source_column_catalogs = column_catalog - .into_iter() - .filter(|c| !c.is_generated()) - .collect_vec(); - generic::Source { - catalog: self.core.catalog.clone(), - column_catalog: source_column_catalogs, - row_id_index, - ctx: self.core.ctx.clone(), - ..self.core - } - } - - fn wrap_with_optional_generated_columns_stream_proj( - &self, - input: Option, - ) -> Result { - if let Some(exprs) = &self.output_exprs { - let source: PlanRef = - dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), input); - let logical_project = generic::Project::new(exprs.to_vec(), source); - Ok(StreamProject::new(logical_project).into()) - } else { - let source = dispatch_new_s3_plan(self.core.clone(), input); - Ok(source) - } - } - - fn wrap_with_optional_generated_columns_batch_proj(&self) -> Result { - if let Some(exprs) = &self.output_exprs { - let source = BatchSource::new(self.rewrite_to_stream_batch_source()); - let logical_project = generic::Project::new(exprs.to_vec(), source.into()); - Ok(BatchProject::new(logical_project).into()) - } else { - let source = BatchSource::new(self.core.clone()); - Ok(source.into()) + output_row_id_index: self.output_row_id_index, } } } @@ -542,49 +489,62 @@ impl PredicatePushdown for LogicalSource { impl ToBatch for LogicalSource { fn to_batch(&self) -> Result { - if self.core.catalog.is_some() - && ConnectorProperties::is_new_fs_connector_b_tree_map( - &self.core.catalog.as_ref().unwrap().with_properties, - ) - { - bail_not_implemented!("New S3 connector for batch"); + if self.core.is_new_fs_connector() { + bail_not_implemented!("New fs connector for batch"); } - let source = self.wrap_with_optional_generated_columns_batch_proj()?; - Ok(source) + let mut plan: PlanRef = BatchSource::new(self.core.clone()).into(); + + if let Some(exprs) = &self.output_exprs { + let logical_project = generic::Project::new(exprs.to_vec(), plan); + plan = BatchProject::new(logical_project).into(); + } + + Ok(plan) } } impl ToStream for LogicalSource { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - let mut plan_prefix: Option = None; let mut plan: PlanRef; - if self.core.catalog.is_some() - && ConnectorProperties::is_new_fs_connector_b_tree_map( - &self.core.catalog.as_ref().unwrap().with_properties, - ) - { - plan_prefix = Some(self.rewrite_new_s3_plan()?); - } - plan = if self.core.for_table { - dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), plan_prefix) - } else { - // Create MV on source. - self.wrap_with_optional_generated_columns_stream_proj(plan_prefix)? - }; + match self.core.kind { + SourceNodeKind::CreateTable | SourceNodeKind::CreateSourceWithStreamjob => { + // Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan + if self.core.is_new_fs_connector() { + plan = Self::create_fs_list_plan(self.core.clone())?; + plan = StreamFsFetch::new(plan, self.core.clone()).into(); + } else { + plan = StreamSource::new(self.core.clone()).into() + } + } + SourceNodeKind::CreateMViewOrBatch => { + // Create MV on source. + if self.core.is_new_fs_connector() { + plan = Self::create_fs_list_plan(self.core.clone())?; + plan = StreamFsFetch::new(plan, self.core.clone()).into(); + } else { + plan = StreamSource::new(self.core.clone()).into() + } - if let Some(catalog) = self.source_catalog() - && !catalog.watermark_descs.is_empty() - && !self.core.for_table - { - plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); - } + if let Some(exprs) = &self.output_exprs { + let logical_project = generic::Project::new(exprs.to_vec(), plan); + plan = StreamProject::new(logical_project).into(); + } + + if let Some(catalog) = self.source_catalog() + && !catalog.watermark_descs.is_empty() + { + plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); + } - assert!(!(self.core.gen_row_id && self.core.for_table)); - if let Some(row_id_index) = self.core.row_id_index - && self.core.gen_row_id - { - plan = StreamRowIdGen::new_with_dist(plan, row_id_index, HashShard(vec![row_id_index])) - .into(); + if let Some(row_id_index) = self.output_row_id_index { + plan = StreamRowIdGen::new_with_dist( + plan, + row_id_index, + HashShard(vec![row_id_index]), + ) + .into(); + } + } } Ok(plan) } @@ -599,12 +559,3 @@ impl ToStream for LogicalSource { )) } } - -#[inline] -fn dispatch_new_s3_plan(source: generic::Source, input: Option) -> PlanRef { - if let Some(input) = input { - StreamFsFetch::new(input, source).into() - } else { - StreamSource::new(source).into() - } -} diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 42fdc83a3f933..b411277761e7f 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -25,6 +25,7 @@ use crate::binder::{ BoundWindowTableFunction, Relation, WindowTableFunctionKind, }; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{ LogicalApply, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, @@ -85,7 +86,12 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into()) + Ok(LogicalSource::with_catalog( + Rc::new(source.catalog), + SourceNodeKind::CreateMViewOrBatch, + self.ctx(), + )? + .into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result {