Skip to content

Commit

Permalink
refactor: remove ouput_exprs from LogicalKafkaScan (#16385)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Apr 19, 2024
1 parent c3b494c commit 43e8df9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 47 deletions.
61 changes: 15 additions & 46 deletions src/frontend/src/optimizer/plan_node/logical_kafka_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use risingwave_connector::source::DataType;
use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
generic, BatchProject, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject,
PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
generic, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase,
PlanRef, PredicatePushdown, ToBatch, ToStream,
};
use crate::catalog::source_catalog::SourceCatalog;
use crate::error::Result;
use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor};
use crate::expr::{Expr, ExprImpl, ExprType};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::plan_node::{
Expand All @@ -44,27 +44,28 @@ pub struct LogicalKafkaScan {
pub base: PlanBase<Logical>,
pub core: generic::Source,

/// Expressions to output. This field presents and will be turned to a `Project` when
/// converting to a physical plan, only if there are generated columns.
output_exprs: Option<Vec<ExprImpl>>,

/// Kafka timestamp range.
kafka_timestamp_range: (Bound<i64>, Bound<i64>),
}

impl LogicalKafkaScan {
pub fn new(logical_source: &LogicalSource) -> Self {
pub fn create(logical_source: &LogicalSource) -> PlanRef {
assert!(logical_source.core.is_kafka_connector());

let base = logical_source.base.clone_with_new_plan_id();
let core = logical_source.core.clone();
let base = PlanBase::new_logical_with_core(&core);
let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded);

LogicalKafkaScan {
let kafka_scan = LogicalKafkaScan {
base,
core,
output_exprs: logical_source.output_exprs.clone(),
kafka_timestamp_range,
};

if let Some(exprs) = &logical_source.output_exprs {
LogicalProject::create(kafka_scan.into(), exprs.to_vec())
} else {
kafka_scan.into()
}
}

Expand All @@ -76,7 +77,6 @@ impl LogicalKafkaScan {
Self {
base: self.base.clone(),
core: self.core.clone(),
output_exprs: self.output_exprs.clone(),
kafka_timestamp_range: range,
}
}
Expand Down Expand Up @@ -107,34 +107,9 @@ impl ColPrunable for LogicalKafkaScan {
}
}

impl ExprRewritable for LogicalKafkaScan {
fn has_rewritable_expr(&self) -> bool {
self.output_exprs.is_some()
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut output_exprs = self.output_exprs.clone();

for expr in output_exprs.iter_mut().flatten() {
*expr = r.rewrite_expr(expr.clone());
}

Self {
output_exprs,
..self.clone()
}
.into()
}
}
impl ExprRewritable for LogicalKafkaScan {}

impl ExprVisitable for LogicalKafkaScan {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.output_exprs
.iter()
.flatten()
.for_each(|e| v.visit_expr(e));
}
}
impl ExprVisitable for LogicalKafkaScan {}

/// A util function to extract kafka offset timestamp range.
///
Expand Down Expand Up @@ -320,14 +295,8 @@ impl PredicatePushdown for LogicalKafkaScan {

impl ToBatch for LogicalKafkaScan {
fn to_batch(&self) -> Result<PlanRef> {
let mut plan: PlanRef =
let plan: PlanRef =
BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into();

if let Some(exprs) = &self.output_exprs {
let logical_project = generic::Project::new(exprs.to_vec(), plan);
plan = BatchProject::new(logical_project).into();
}

Ok(plan)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl Rule for SourceToKafkaScanRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let source: &LogicalSource = plan.as_logical_source()?;
if source.core.is_kafka_connector() {
Some(LogicalKafkaScan::new(source).into())
Some(LogicalKafkaScan::create(source))
} else {
None
}
Expand Down

0 comments on commit 43e8df9

Please sign in to comment.