diff --git a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs index 18ae05a135ff9..99333eef0779e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs @@ -24,12 +24,12 @@ use risingwave_connector::source::DataType; use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ - generic, BatchProject, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, - PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream, + generic, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, + PlanRef, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::source_catalog::SourceCatalog; use crate::error::Result; -use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor}; +use crate::expr::{Expr, ExprImpl, ExprType}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; use crate::optimizer::plan_node::{ @@ -44,27 +44,28 @@ pub struct LogicalKafkaScan { pub base: PlanBase, pub core: generic::Source, - /// Expressions to output. This field presents and will be turned to a `Project` when - /// converting to a physical plan, only if there are generated columns. - output_exprs: Option>, - /// Kafka timestamp range. kafka_timestamp_range: (Bound, Bound), } impl LogicalKafkaScan { - pub fn new(logical_source: &LogicalSource) -> Self { + pub fn create(logical_source: &LogicalSource) -> PlanRef { assert!(logical_source.core.is_kafka_connector()); - let base = logical_source.base.clone_with_new_plan_id(); let core = logical_source.core.clone(); + let base = PlanBase::new_logical_with_core(&core); let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); - LogicalKafkaScan { + let kafka_scan = LogicalKafkaScan { base, core, - output_exprs: logical_source.output_exprs.clone(), kafka_timestamp_range, + }; + + if let Some(exprs) = &logical_source.output_exprs { + LogicalProject::create(kafka_scan.into(), exprs.to_vec()) + } else { + kafka_scan.into() } } @@ -76,7 +77,6 @@ impl LogicalKafkaScan { Self { base: self.base.clone(), core: self.core.clone(), - output_exprs: self.output_exprs.clone(), kafka_timestamp_range: range, } } @@ -107,34 +107,9 @@ impl ColPrunable for LogicalKafkaScan { } } -impl ExprRewritable for LogicalKafkaScan { - fn has_rewritable_expr(&self) -> bool { - self.output_exprs.is_some() - } - - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut output_exprs = self.output_exprs.clone(); - - for expr in output_exprs.iter_mut().flatten() { - *expr = r.rewrite_expr(expr.clone()); - } - - Self { - output_exprs, - ..self.clone() - } - .into() - } -} +impl ExprRewritable for LogicalKafkaScan {} -impl ExprVisitable for LogicalKafkaScan { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.output_exprs - .iter() - .flatten() - .for_each(|e| v.visit_expr(e)); - } -} +impl ExprVisitable for LogicalKafkaScan {} /// A util function to extract kafka offset timestamp range. /// @@ -320,14 +295,8 @@ impl PredicatePushdown for LogicalKafkaScan { impl ToBatch for LogicalKafkaScan { fn to_batch(&self) -> Result { - let mut plan: PlanRef = + let plan: PlanRef = BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into(); - - if let Some(exprs) = &self.output_exprs { - let logical_project = generic::Project::new(exprs.to_vec(), plan); - plan = BatchProject::new(logical_project).into(); - } - Ok(plan) } } diff --git a/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs b/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs index 244278bdc33c4..bda4a3068e8bf 100644 --- a/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/source_to_kafka_scan_rule.rs @@ -21,7 +21,7 @@ impl Rule for SourceToKafkaScanRule { fn apply(&self, plan: PlanRef) -> Option { let source: &LogicalSource = plan.as_logical_source()?; if source.core.is_kafka_connector() { - Some(LogicalKafkaScan::new(source).into()) + Some(LogicalKafkaScan::create(source)) } else { None }