From 1f445a589110e3d734a3ad92b77bc8cbd6c656fd Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 18 Apr 2024 19:05:48 +0800 Subject: [PATCH 1/4] remove ouput_exprs from source --- src/frontend/src/handler/create_source.rs | 2 +- src/frontend/src/handler/create_table.rs | 5 +- src/frontend/src/optimizer/mod.rs | 4 +- .../optimizer/plan_node/logical_kafka_scan.rs | 49 ++---------- .../src/optimizer/plan_node/logical_source.rs | 75 ++++++------------- src/frontend/src/planner/relation.rs | 3 +- 6 files changed, 33 insertions(+), 105 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 364c7aafed12b..4ca9e805b0577 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -78,7 +78,7 @@ use crate::handler::create_table::{ use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; -use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; +use crate::optimizer::plan_node::{LogicalSource, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::resolve_privatelink_in_with_option; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions}; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 9c1290b59ba50..019f555750f87 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -708,15 +708,14 @@ fn gen_table_plan_inner( }); let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); - let source_node: PlanRef = LogicalSource::new( + let source_node: PlanRef = LogicalSource::create( source_catalog.clone(), columns.clone(), row_id_index, SourceNodeKind::CreateTable, context.clone(), None, - )? - .into(); + )?; let required_cols = FixedBitSet::with_capacity(columns.len()); let mut plan_root = PlanRoot::new( diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f68f2f3bc9e0c..e2fa157982ab0 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -77,7 +77,7 @@ use crate::expr::TimestamptzExprFinder; use crate::optimizer::plan_node::generic::{SourceNodeKind, Union}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, - ToStream, VisitExprsRecursive, + VisitExprsRecursive, }; use crate::optimizer::plan_visitor::TemporalJoinValidator; use crate::optimizer::property::Distribution; @@ -637,7 +637,7 @@ impl PlanRoot { } }; - let dummy_source_node = LogicalSource::new( + let dummy_source_node = LogicalSource::create( None, columns.clone(), row_id_index, diff --git a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs index 18ae05a135ff9..7de8a8e2f51d8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs @@ -24,12 +24,12 @@ use risingwave_connector::source::DataType; use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ - generic, BatchProject, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, - PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream, + generic, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, + PlanRef, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::error::Result; -use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor}; +use crate::expr::{Expr, ExprImpl, ExprType}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::plan_node::{ @@ -44,10 +44,6 @@ pub struct LogicalKafkaScan { pub base: PlanBase, pub core: generic::Source, - /// Expressions to output. This field presents and will be turned to a `Project` when - /// converting to a physical plan, only if there are generated columns. - output_exprs: Option>, - /// Kafka timestamp range. kafka_timestamp_range: (Bound, Bound), } @@ -63,7 +59,6 @@ impl LogicalKafkaScan { LogicalKafkaScan { base, core, - output_exprs: logical_source.output_exprs.clone(), kafka_timestamp_range, } } @@ -76,7 +71,6 @@ impl LogicalKafkaScan { Self { base: self.base.clone(), core: self.core.clone(), - output_exprs: self.output_exprs.clone(), kafka_timestamp_range: range, } } @@ -107,34 +101,9 @@ impl ColPrunable for LogicalKafkaScan { } } -impl ExprRewritable for LogicalKafkaScan { - fn has_rewritable_expr(&self) -> bool { - self.output_exprs.is_some() - } - - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut output_exprs = self.output_exprs.clone(); +impl ExprRewritable for LogicalKafkaScan {} - for expr in output_exprs.iter_mut().flatten() { - *expr = r.rewrite_expr(expr.clone()); - } - - Self { - output_exprs, - ..self.clone() - } - .into() - } -} - -impl ExprVisitable for LogicalKafkaScan { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.output_exprs - .iter() - .flatten() - .for_each(|e| v.visit_expr(e)); - } -} +impl ExprVisitable for LogicalKafkaScan {} /// A util function to extract kafka offset timestamp range. /// @@ -320,14 +289,8 @@ impl PredicatePushdown for LogicalKafkaScan { impl ToBatch for LogicalKafkaScan { fn to_batch(&self) -> Result { - let mut plan: PlanRef = + let plan: PlanRef = BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into(); - - if let Some(exprs) = &self.output_exprs { - let logical_project = generic::Project::new(exprs.to_vec(), plan); - plan = BatchProject::new(logical_project).into(); - } - Ok(plan) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 8a8fd37bd66b3..3b8320c774a82 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -28,13 +28,13 @@ use super::generic::{GenericPlanRef, SourceNodeKind}; use super::stream_watermark_filter::StreamWatermarkFilter; use super::utils::{childless_record, Distill}; use super::{ - generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, - LogicalProject, PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, - StreamSource, StreamSourceScan, ToBatch, ToStream, + generic, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, + PlanBase, PlanRef, PredicatePushdown, StreamRowIdGen, StreamSource, StreamSourceScan, ToBatch, + ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::error::Result; -use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef}; +use crate::expr::{ExprImpl, ExprRewriter, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch; @@ -53,23 +53,20 @@ pub struct LogicalSource { pub base: PlanBase, pub core: generic::Source, - /// Expressions to output. This field presents and will be turned to a `Project` when - /// converting to a physical plan, only if there are generated columns. - pub(crate) output_exprs: Option>, /// When there are generated columns, the `StreamRowIdGen`'s `row_id_index` is different from /// the one in `core`. So we store the one in `output_exprs` here. pub(crate) output_row_id_index: Option, } impl LogicalSource { - pub fn new( + pub fn create( source_catalog: Option>, column_catalog: Vec, row_id_index: Option, kind: SourceNodeKind, ctx: OptimizerContextRef, as_of: Option, - ) -> Result { + ) -> Result { let core = generic::Source { catalog: source_catalog, column_catalog, @@ -83,17 +80,22 @@ impl LogicalSource { bail!("Time travel is not supported for the source") } - let base = PlanBase::new_logical_with_core(&core); - let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; let (core, output_row_id_index) = core.exclude_generated_columns(); - Ok(LogicalSource { + let base = PlanBase::new_logical_with_core(&core); + + let source = LogicalSource { base, core, - output_exprs, output_row_id_index, - }) + }; + + if let Some(exprs) = output_exprs { + Ok(LogicalProject::create(source.into(), exprs.to_vec())) + } else { + Ok(source.into()) + } } pub fn with_catalog( @@ -101,14 +103,14 @@ impl LogicalSource { kind: SourceNodeKind, ctx: OptimizerContextRef, as_of: Option, - ) -> Result { + ) -> Result { let column_catalogs = source_catalog.columns.clone(); let row_id_index = source_catalog.row_id_index; if !source_catalog.append_only { assert!(row_id_index.is_none()); } - Self::new( + Self::create( Some(source_catalog), column_catalogs, row_id_index, @@ -270,34 +272,9 @@ impl ColPrunable for LogicalSource { } } -impl ExprRewritable for LogicalSource { - fn has_rewritable_expr(&self) -> bool { - self.output_exprs.is_some() - } - - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut output_exprs = self.output_exprs.clone(); +impl ExprRewritable for LogicalSource {} - for expr in output_exprs.iter_mut().flatten() { - *expr = r.rewrite_expr(expr.clone()); - } - - Self { - output_exprs, - ..self.clone() - } - .into() - } -} - -impl ExprVisitable for LogicalSource { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.output_exprs - .iter() - .flatten() - .for_each(|e| v.visit_expr(e)); - } -} +impl ExprVisitable for LogicalSource {} impl PredicatePushdown for LogicalSource { fn predicate_pushdown( @@ -315,12 +292,7 @@ impl ToBatch for LogicalSource { !self.core.is_kafka_connector(), "LogicalSource with a kafka property should be converted to LogicalKafkaScan" ); - let mut plan: PlanRef = BatchSource::new(self.core.clone()).into(); - - if let Some(exprs) = &self.output_exprs { - let logical_project = generic::Project::new(exprs.to_vec(), plan); - plan = BatchProject::new(logical_project).into(); - } + let plan: PlanRef = BatchSource::new(self.core.clone()).into(); Ok(plan) } @@ -357,11 +329,6 @@ impl ToStream for LogicalSource { } } - if let Some(exprs) = &self.output_exprs { - let logical_project = generic::Project::new(exprs.to_vec(), plan); - plan = StreamProject::new(logical_project).into(); - } - if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() { diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index d593a7308f39f..b2a6f608d9d9f 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -125,8 +125,7 @@ impl Planner { SourceNodeKind::CreateMViewOrBatch, self.ctx(), as_of, - )? - .into()) + )?) } } From 99f37d78fdbe1a19f1c6f5d615dac6b1e3ecebeb Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 18 Apr 2024 20:55:14 +0800 Subject: [PATCH 2/4] Revert "remove ouput_exprs from source" This reverts commit 1f445a589110e3d734a3ad92b77bc8cbd6c656fd. --- src/frontend/src/handler/create_source.rs | 2 +- src/frontend/src/handler/create_table.rs | 5 +- src/frontend/src/optimizer/mod.rs | 4 +- .../optimizer/plan_node/logical_kafka_scan.rs | 49 ++++++++++-- .../src/optimizer/plan_node/logical_source.rs | 75 +++++++++++++------ src/frontend/src/planner/relation.rs | 3 +- 6 files changed, 105 insertions(+), 33 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 4ca9e805b0577..364c7aafed12b 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -78,7 +78,7 @@ use crate::handler::create_table::{ use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; -use crate::optimizer::plan_node::{LogicalSource, ToStreamContext}; +use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::resolve_privatelink_in_with_option; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions}; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 019f555750f87..9c1290b59ba50 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -708,14 +708,15 @@ fn gen_table_plan_inner( }); let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); - let source_node: PlanRef = LogicalSource::create( + let source_node: PlanRef = LogicalSource::new( source_catalog.clone(), columns.clone(), row_id_index, SourceNodeKind::CreateTable, context.clone(), None, - )?; + )? + .into(); let required_cols = FixedBitSet::with_capacity(columns.len()); let mut plan_root = PlanRoot::new( diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e2fa157982ab0..f68f2f3bc9e0c 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -77,7 +77,7 @@ use crate::expr::TimestamptzExprFinder; use crate::optimizer::plan_node::generic::{SourceNodeKind, Union}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, - VisitExprsRecursive, + ToStream, VisitExprsRecursive, }; use crate::optimizer::plan_visitor::TemporalJoinValidator; use crate::optimizer::property::Distribution; @@ -637,7 +637,7 @@ impl PlanRoot { } }; - let dummy_source_node = LogicalSource::create( + let dummy_source_node = LogicalSource::new( None, columns.clone(), row_id_index, diff --git a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs index 7de8a8e2f51d8..18ae05a135ff9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs @@ -24,12 +24,12 @@ use risingwave_connector::source::DataType; use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ - generic, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, - PlanRef, PredicatePushdown, ToBatch, ToStream, + generic, BatchProject, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, + PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::error::Result; -use crate::expr::{Expr, ExprImpl, ExprType}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::plan_node::{ @@ -44,6 +44,10 @@ pub struct LogicalKafkaScan { pub base: PlanBase, pub core: generic::Source, + /// Expressions to output. This field presents and will be turned to a `Project` when + /// converting to a physical plan, only if there are generated columns. + output_exprs: Option>, + /// Kafka timestamp range. kafka_timestamp_range: (Bound, Bound), } @@ -59,6 +63,7 @@ impl LogicalKafkaScan { LogicalKafkaScan { base, core, + output_exprs: logical_source.output_exprs.clone(), kafka_timestamp_range, } } @@ -71,6 +76,7 @@ impl LogicalKafkaScan { Self { base: self.base.clone(), core: self.core.clone(), + output_exprs: self.output_exprs.clone(), kafka_timestamp_range: range, } } @@ -101,9 +107,34 @@ impl ColPrunable for LogicalKafkaScan { } } -impl ExprRewritable for LogicalKafkaScan {} +impl ExprRewritable for LogicalKafkaScan { + fn has_rewritable_expr(&self) -> bool { + self.output_exprs.is_some() + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + let mut output_exprs = self.output_exprs.clone(); -impl ExprVisitable for LogicalKafkaScan {} + for expr in output_exprs.iter_mut().flatten() { + *expr = r.rewrite_expr(expr.clone()); + } + + Self { + output_exprs, + ..self.clone() + } + .into() + } +} + +impl ExprVisitable for LogicalKafkaScan { + fn visit_exprs(&self, v: &mut dyn ExprVisitor) { + self.output_exprs + .iter() + .flatten() + .for_each(|e| v.visit_expr(e)); + } +} /// A util function to extract kafka offset timestamp range. /// @@ -289,8 +320,14 @@ impl PredicatePushdown for LogicalKafkaScan { impl ToBatch for LogicalKafkaScan { fn to_batch(&self) -> Result { - let plan: PlanRef = + let mut plan: PlanRef = BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into(); + + if let Some(exprs) = &self.output_exprs { + let logical_project = generic::Project::new(exprs.to_vec(), plan); + plan = BatchProject::new(logical_project).into(); + } + Ok(plan) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 3b8320c774a82..8a8fd37bd66b3 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -28,13 +28,13 @@ use super::generic::{GenericPlanRef, SourceNodeKind}; use super::stream_watermark_filter::StreamWatermarkFilter; use super::utils::{childless_record, Distill}; use super::{ - generic, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, - PlanBase, PlanRef, PredicatePushdown, StreamRowIdGen, StreamSource, StreamSourceScan, ToBatch, - ToStream, + generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, + LogicalProject, PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, + StreamSource, StreamSourceScan, ToBatch, ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::error::Result; -use crate::expr::{ExprImpl, ExprRewriter, InputRef}; +use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch; @@ -53,20 +53,23 @@ pub struct LogicalSource { pub base: PlanBase, pub core: generic::Source, + /// Expressions to output. This field presents and will be turned to a `Project` when + /// converting to a physical plan, only if there are generated columns. + pub(crate) output_exprs: Option>, /// When there are generated columns, the `StreamRowIdGen`'s `row_id_index` is different from /// the one in `core`. So we store the one in `output_exprs` here. pub(crate) output_row_id_index: Option, } impl LogicalSource { - pub fn create( + pub fn new( source_catalog: Option>, column_catalog: Vec, row_id_index: Option, kind: SourceNodeKind, ctx: OptimizerContextRef, as_of: Option, - ) -> Result { + ) -> Result { let core = generic::Source { catalog: source_catalog, column_catalog, @@ -80,22 +83,17 @@ impl LogicalSource { bail!("Time travel is not supported for the source") } + let base = PlanBase::new_logical_with_core(&core); + let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; let (core, output_row_id_index) = core.exclude_generated_columns(); - let base = PlanBase::new_logical_with_core(&core); - - let source = LogicalSource { + Ok(LogicalSource { base, core, + output_exprs, output_row_id_index, - }; - - if let Some(exprs) = output_exprs { - Ok(LogicalProject::create(source.into(), exprs.to_vec())) - } else { - Ok(source.into()) - } + }) } pub fn with_catalog( @@ -103,14 +101,14 @@ impl LogicalSource { kind: SourceNodeKind, ctx: OptimizerContextRef, as_of: Option, - ) -> Result { + ) -> Result { let column_catalogs = source_catalog.columns.clone(); let row_id_index = source_catalog.row_id_index; if !source_catalog.append_only { assert!(row_id_index.is_none()); } - Self::create( + Self::new( Some(source_catalog), column_catalogs, row_id_index, @@ -272,9 +270,34 @@ impl ColPrunable for LogicalSource { } } -impl ExprRewritable for LogicalSource {} +impl ExprRewritable for LogicalSource { + fn has_rewritable_expr(&self) -> bool { + self.output_exprs.is_some() + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + let mut output_exprs = self.output_exprs.clone(); -impl ExprVisitable for LogicalSource {} + for expr in output_exprs.iter_mut().flatten() { + *expr = r.rewrite_expr(expr.clone()); + } + + Self { + output_exprs, + ..self.clone() + } + .into() + } +} + +impl ExprVisitable for LogicalSource { + fn visit_exprs(&self, v: &mut dyn ExprVisitor) { + self.output_exprs + .iter() + .flatten() + .for_each(|e| v.visit_expr(e)); + } +} impl PredicatePushdown for LogicalSource { fn predicate_pushdown( @@ -292,7 +315,12 @@ impl ToBatch for LogicalSource { !self.core.is_kafka_connector(), "LogicalSource with a kafka property should be converted to LogicalKafkaScan" ); - let plan: PlanRef = BatchSource::new(self.core.clone()).into(); + let mut plan: PlanRef = BatchSource::new(self.core.clone()).into(); + + if let Some(exprs) = &self.output_exprs { + let logical_project = generic::Project::new(exprs.to_vec(), plan); + plan = BatchProject::new(logical_project).into(); + } Ok(plan) } @@ -329,6 +357,11 @@ impl ToStream for LogicalSource { } } + if let Some(exprs) = &self.output_exprs { + let logical_project = generic::Project::new(exprs.to_vec(), plan); + plan = StreamProject::new(logical_project).into(); + } + if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() { diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index b2a6f608d9d9f..d593a7308f39f 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -125,7 +125,8 @@ impl Planner { SourceNodeKind::CreateMViewOrBatch, self.ctx(), as_of, - )?) + )? + .into()) } } From a2d6a7ba1605fb556b519f1c01d484e4d32e9b07 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 18 Apr 2024 21:02:25 +0800 Subject: [PATCH 3/4] refactor --- .../optimizer/plan_node/logical_kafka_scan.rs | 59 +++++-------------- .../rule/source_to_kafka_scan_rule.rs | 2 +- 2 files changed, 15 insertions(+), 46 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs index 18ae05a135ff9..0dceebfdfc496 100644 --- a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs @@ -24,12 +24,12 @@ use risingwave_connector::source::DataType; use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ - generic, BatchProject, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, - PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream, + generic, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, + PlanRef, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::error::Result; -use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor}; +use crate::expr::{Expr, ExprImpl, ExprType}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::plan_node::{ @@ -44,27 +44,28 @@ pub struct LogicalKafkaScan { pub base: PlanBase, pub core: generic::Source, - /// Expressions to output. This field presents and will be turned to a `Project` when - /// converting to a physical plan, only if there are generated columns. - output_exprs: Option>, - /// Kafka timestamp range. kafka_timestamp_range: (Bound, Bound), } impl LogicalKafkaScan { - pub fn new(logical_source: &LogicalSource) -> Self { + pub fn create(logical_source: &LogicalSource) -> PlanRef { assert!(logical_source.core.is_kafka_connector()); let base = logical_source.base.clone_with_new_plan_id(); let core = logical_source.core.clone(); let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); - LogicalKafkaScan { + let kafka_scan = LogicalKafkaScan { base, core, - output_exprs: logical_source.output_exprs.clone(), kafka_timestamp_range, + }; + + if let Some(exprs) = &logical_source.output_exprs { + LogicalProject::create(kafka_scan.into(), exprs.to_vec()) + } else { + kafka_scan.into() } } @@ -76,7 +77,6 @@ impl LogicalKafkaScan { Self { base: self.base.clone(), core: self.core.clone(), - output_exprs: self.output_exprs.clone(), kafka_timestamp_range: range, } } @@ -107,34 +107,9 @@ impl ColPrunable for LogicalKafkaScan { } } -impl ExprRewritable for LogicalKafkaScan { - fn has_rewritable_expr(&self) -> bool { - self.output_exprs.is_some() - } - - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut output_exprs = self.output_exprs.clone(); - - for expr in output_exprs.iter_mut().flatten() { - *expr = r.rewrite_expr(expr.clone()); - } - - Self { - output_exprs, - ..self.clone() - } - .into() - } -} +impl ExprRewritable for LogicalKafkaScan {} -impl ExprVisitable for LogicalKafkaScan { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.output_exprs - .iter() - .flatten() - .for_each(|e| v.visit_expr(e)); - } -} +impl ExprVisitable for LogicalKafkaScan {} /// A util function to extract kafka offset timestamp range. /// @@ -320,14 +295,8 @@ impl PredicatePushdown for LogicalKafkaScan { impl ToBatch for LogicalKafkaScan { fn to_batch(&self) -> Result { - let mut plan: PlanRef = + let plan: PlanRef = BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into(); - - if let Some(exprs) = &self.output_exprs { - let logical_project = generic::Project::new(exprs.to_vec(), plan); - plan = BatchProject::new(logical_project).into(); - } - Ok(plan) } } diff --git a/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs b/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs index 244278bdc33c4..bda4a3068e8bf 100644 --- a/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs @@ -21,7 +21,7 @@ impl Rule for SourceToKafkaScanRule { fn apply(&self, plan: PlanRef) -> Option { let source: &LogicalSource = plan.as_logical_source()?; if source.core.is_kafka_connector() { - Some(LogicalKafkaScan::new(source).into()) + Some(LogicalKafkaScan::create(source)) } else { None } From df4c84bd73bf855a1c06582c42a5ed68579fc65c Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 18 Apr 2024 21:07:53 +0800 Subject: [PATCH 4/4] fix --- src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs index 0dceebfdfc496..99333eef0779e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs @@ -52,8 +52,8 @@ impl LogicalKafkaScan { pub fn create(logical_source: &LogicalSource) -> PlanRef { assert!(logical_source.core.is_kafka_connector()); - let base = logical_source.base.clone_with_new_plan_id(); let core = logical_source.core.clone(); + let base = PlanBase::new_logical_with_core(&core); let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); let kafka_scan = LogicalKafkaScan {