Skip to content

Commit

Permalink
Introduce OptimizerRule::rewrite to rewrite in place, Rewrite `Simp…
Browse files Browse the repository at this point in the history
…lifyExprs` to avoid copies (apache#9954)
  • Loading branch information
alamb authored Apr 11, 2024
1 parent b9759b9 commit 58e0b59
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 88 deletions.
25 changes: 13 additions & 12 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_expr::{
LogicalPlanBuilder, ScalarUDF, Volatility,
};
use datafusion_functions::math;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions};
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
use std::sync::Arc;
Expand Down Expand Up @@ -109,14 +110,14 @@ fn test_table_scan() -> LogicalPlan {
.expect("building plan")
}

fn get_optimized_plan_formatted(plan: &LogicalPlan, date_time: &DateTime<Utc>) -> String {
fn get_optimized_plan_formatted(plan: LogicalPlan, date_time: &DateTime<Utc>) -> String {
let config = OptimizerContext::new().with_query_execution_start_time(*date_time);
let rule = SimplifyExpressions::new();

let optimized_plan = rule
.try_optimize(plan, &config)
.unwrap()
.expect("failed to optimize plan");
// Use Optimizer to do plan traversal
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let optimizer = Optimizer::with_rules(vec![Arc::new(SimplifyExpressions::new())]);
let optimized_plan = optimizer.optimize(plan, &config, observe).unwrap();

format!("{optimized_plan:?}")
}

Expand Down Expand Up @@ -238,7 +239,7 @@ fn to_timestamp_expr_folded() -> Result<()> {
let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS to_timestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
\n TableScan: test"
.to_string();
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
let actual = get_optimized_plan_formatted(plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}
Expand All @@ -262,7 +263,7 @@ fn now_less_than_timestamp() -> Result<()> {
// expression down to a single constant (true)
let expected = "Filter: Boolean(true)\
\n TableScan: test";
let actual = get_optimized_plan_formatted(&plan, &time);
let actual = get_optimized_plan_formatted(plan, &time);

assert_eq!(expected, actual);
Ok(())
Expand Down Expand Up @@ -290,7 +291,7 @@ fn select_date_plus_interval() -> Result<()> {
// expression down to a single constant (true)
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
TableScan: test"#;
let actual = get_optimized_plan_formatted(&plan, &time);
let actual = get_optimized_plan_formatted(plan, &time);

assert_eq!(expected, actual);
Ok(())
Expand All @@ -308,7 +309,7 @@ fn simplify_project_scalar_fn() -> Result<()> {
// after simplify: t.f as "power(t.f, 1.0)"
let expected = "Projection: test.f AS power(test.f,Float64(1))\
\n TableScan: test";
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
let actual = get_optimized_plan_formatted(plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}
Expand All @@ -330,7 +331,7 @@ fn simplify_scan_predicate() -> Result<()> {
// before simplify: t.g = power(t.f, 1.0)
// after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)"
let expected = "TableScan: test, full_filters=[g = f AS g = power(f,Float64(1))]";
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
let actual = get_optimized_plan_formatted(plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}
Expand Down Expand Up @@ -461,7 +462,7 @@ fn multiple_now() -> Result<()> {
.build()?;

// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(&plan, &time);
let actual = get_optimized_plan_formatted(plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
\n TableScan: test",
Expand Down
50 changes: 40 additions & 10 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_common::{internal_err, DFSchema, DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;

use crate::common_subexpr_eliminate::CommonSubexprEliminate;
Expand Down Expand Up @@ -69,8 +69,12 @@ use crate::utils::log_plan;
/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.add_optimizer_rule
pub trait OptimizerRule {
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
/// Try and rewrite `plan` to an optimized form, returning None if the plan
/// cannot be optimized by this rule.
///
/// Note this API will be deprecated in the future as it requires `clone`ing
/// the input plan, which can be expensive. OptimizerRules should implement
/// [`Self::rewrite`] instead.
fn try_optimize(
&self,
plan: &LogicalPlan,
Expand All @@ -80,12 +84,31 @@ pub trait OptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
/// How should the rule be applied by the optimizer? See comments on
/// [`ApplyOrder`] for details.
///
/// If a rule use default None, it should traverse recursively plan inside itself
/// If returns `None`, the default, the rule must handle recursion itself
fn apply_order(&self) -> Option<ApplyOrder> {
None
}

/// Does this rule support rewriting owned plans (rather than by reference)?
fn supports_rewrite(&self) -> bool {
false
}

/// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
/// if the plan was rewritten and `Transformed::no` if it was not.
///
/// Note: this function is only called if [`Self::supports_rewrite`] returns
/// true. Otherwise the Optimizer calls [`Self::try_optimize`]
fn rewrite(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
internal_err!("rewrite is not implemented for {}", self.name())
}
}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -298,12 +321,19 @@ fn optimize_plan_node(
rule: &dyn OptimizerRule,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// TODO: add API to OptimizerRule to allow rewriting by ownership
rule.try_optimize(&plan, config)
.map(|maybe_plan| match maybe_plan {
Some(new_plan) => Transformed::yes(new_plan),
if rule.supports_rewrite() {
return rule.rewrite(plan, config);
}

rule.try_optimize(&plan, config).map(|maybe_plan| {
match maybe_plan {
Some(new_plan) => {
// if the node was rewritten by the optimizer, replace the node
Transformed::yes(new_plan)
}
None => Transformed::no(plan),
})
}
})
}

impl Optimizer {
Expand Down
Loading

0 comments on commit 58e0b59

Please sign in to comment.