Skip to content

Commit

Permalink
Merge commit '118eecdc8384805cae752dac0c4ccc768cc9629b' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-2
  • Loading branch information
appletreeisyellow committed Apr 24, 2024
2 parents 78c0fbf + 118eecd commit 4f283e1
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 215 deletions.
4 changes: 2 additions & 2 deletions benchmarks/queries/clickbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ LIMIT 10;
Here are some interesting statistics about the data used in the queries
Max length of `"SearchPhrase"` is 1113 characters
```sql
select min(length("SearchPhrase")) as "SearchPhrase_len_min", max(length("SearchPhrase")) "SearchPhrase_len_max" from 'hits.parquet' limit 10;
> select min(length("SearchPhrase")) as "SearchPhrase_len_min", max(length("SearchPhrase")) "SearchPhrase_len_max" from 'hits.parquet' limit 10;
+----------------------+----------------------+
| SearchPhrase_len_min | SearchPhrase_len_max |
+----------------------+----------------------+
Expand All @@ -74,7 +74,7 @@ Max length of `"SearchPhrase"` is 1113 characters

Here is the schema of the data
```sql
describe 'hits.parquet';
> describe 'hits.parquet';
+-----------------------+-----------+-------------+
| column_name | data_type | is_nullable |
+-----------------------+-----------+-------------+
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub async fn exec_from_repl(
rl.load_history(".history").ok();

loop {
match rl.readline(" ") {
match rl.readline("> ") {
Ok(line) if line.starts_with('\\') => {
rl.add_history_entry(line.trim_end())?;
let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ mod tests {
/// Return a test for data_index_bloom_encoding_stats.parquet
/// Note the values in the `String` column are:
/// ```sql
/// select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
/// > select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
/// +-----------+
/// | String |
/// +-----------+
Expand Down
25 changes: 13 additions & 12 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_expr::{
LogicalPlanBuilder, ScalarUDF, Volatility,
};
use datafusion_functions::math;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions};
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
use std::sync::Arc;
Expand Down Expand Up @@ -109,14 +110,14 @@ fn test_table_scan() -> LogicalPlan {
.expect("building plan")
}

fn get_optimized_plan_formatted(plan: &LogicalPlan, date_time: &DateTime<Utc>) -> String {
fn get_optimized_plan_formatted(plan: LogicalPlan, date_time: &DateTime<Utc>) -> String {
let config = OptimizerContext::new().with_query_execution_start_time(*date_time);
let rule = SimplifyExpressions::new();

let optimized_plan = rule
.try_optimize(plan, &config)
.unwrap()
.expect("failed to optimize plan");
// Use Optimizer to do plan traversal
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let optimizer = Optimizer::with_rules(vec![Arc::new(SimplifyExpressions::new())]);
let optimized_plan = optimizer.optimize(plan, &config, observe).unwrap();

format!("{optimized_plan:?}")
}

Expand Down Expand Up @@ -238,7 +239,7 @@ fn to_timestamp_expr_folded() -> Result<()> {
let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS to_timestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
\n TableScan: test"
.to_string();
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
let actual = get_optimized_plan_formatted(plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}
Expand All @@ -262,7 +263,7 @@ fn now_less_than_timestamp() -> Result<()> {
// expression down to a single constant (true)
let expected = "Filter: Boolean(true)\
\n TableScan: test";
let actual = get_optimized_plan_formatted(&plan, &time);
let actual = get_optimized_plan_formatted(plan, &time);

assert_eq!(expected, actual);
Ok(())
Expand Down Expand Up @@ -290,7 +291,7 @@ fn select_date_plus_interval() -> Result<()> {
// expression down to a single constant (true)
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
TableScan: test"#;
let actual = get_optimized_plan_formatted(&plan, &time);
let actual = get_optimized_plan_formatted(plan, &time);

assert_eq!(expected, actual);
Ok(())
Expand All @@ -308,7 +309,7 @@ fn simplify_project_scalar_fn() -> Result<()> {
// after simplify: t.f as "power(t.f, 1.0)"
let expected = "Projection: test.f AS power(test.f,Float64(1))\
\n TableScan: test";
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
let actual = get_optimized_plan_formatted(plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}
Expand All @@ -330,7 +331,7 @@ fn simplify_scan_predicate() -> Result<()> {
// before simplify: t.g = power(t.f, 1.0)
// after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)"
let expected = "TableScan: test, full_filters=[g = f AS g = power(f,Float64(1))]";
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
let actual = get_optimized_plan_formatted(plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}
Expand Down Expand Up @@ -461,7 +462,7 @@ fn multiple_now() -> Result<()> {
.build()?;

// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(&plan, &time);
let actual = get_optimized_plan_formatted(plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
\n TableScan: test",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ pub struct Prepare {
/// # Example output:
///
/// ```sql
/// describe traces;
/// > describe traces;
/// +--------------------+-----------------------------+-------------+
/// | column_name | data_type | is_nullable |
/// +--------------------+-----------------------------+-------------+
Expand Down
47 changes: 35 additions & 12 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,10 @@ macro_rules! handle_transform_recursion_up {
impl LogicalPlan {
/// Calls `f` on all expressions in the current `LogicalPlan` node.
///
/// Note this does not include expressions in child `LogicalPlan` nodes.
/// # Notes
/// * Similar to [`TreeNode::apply`] but for this node's expressions.
/// * Does not include expressions in input `LogicalPlan` nodes
/// * Visits only the top level expressions (Does not recurse into each expression)
pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
&self,
mut f: F,
Expand Down Expand Up @@ -541,7 +544,9 @@ impl LogicalPlan {
///
/// Returns the current node.
///
/// Note this does not include expressions in child `LogicalPlan` nodes.
/// # Notes
/// * Similar to [`TreeNode::map_children`] but for this node's expressions.
/// * Visits only the top level expressions (Does not recurse into each expression)
pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
self,
mut f: F,
Expand Down Expand Up @@ -757,7 +762,8 @@ impl LogicalPlan {
})
}

/// Visits a plan similarly to [`Self::visit`], but including embedded subqueries.
/// Visits a plan similarly to [`Self::visit`], including subqueries that
/// may appear in expressions such as `IN (SELECT ...)`.
pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
&self,
visitor: &mut V,
Expand All @@ -771,7 +777,9 @@ impl LogicalPlan {
.visit_parent(|| visitor.f_up(self))
}

/// Rewrites a plan similarly t [`Self::visit`], but including embedded subqueries.
/// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -783,10 +791,9 @@ impl LogicalPlan {
)
}

/// Calls `f` recursively on all children of the `LogicalPlan` node.
///
/// Unlike [`Self::apply`], this method *does* includes `LogicalPlan`s that
/// are referenced in `Expr`s
/// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
f: &mut F,
Expand All @@ -796,20 +803,29 @@ impl LogicalPlan {
.visit_sibling(|| self.apply_children(|c| c.apply_with_subqueries(f)))
}

/// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
self,
f: &F,
) -> Result<Transformed<Self>> {
self.transform_up_with_subqueries(f)
}

/// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn transform_down_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
self,
f: &F,
) -> Result<Transformed<Self>> {
handle_transform_recursion_down!(f(self), |c| c.transform_down_with_subqueries(f))
}

/// Similarly to [`Self::transform_down_mut`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn transform_down_mut_with_subqueries<
F: FnMut(Self) -> Result<Transformed<Self>>,
>(
Expand All @@ -820,6 +836,9 @@ impl LogicalPlan {
.transform_down_mut_with_subqueries(f))
}

/// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn transform_up_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
self,
f: &F,
Expand All @@ -836,6 +855,9 @@ impl LogicalPlan {
handle_transform_recursion_up!(self, |c| c.transform_up_mut_with_subqueries(f), f)
}

/// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn transform_down_up_with_subqueries<
FD: FnMut(Self) -> Result<Transformed<Self>>,
FU: FnMut(Self) -> Result<Transformed<Self>>,
Expand All @@ -851,8 +873,9 @@ impl LogicalPlan {
)
}

/// Calls `f` on all subqueries referenced in expressions of the current
/// `LogicalPlan` node.
/// Similarly to [`Self::apply`], calls `f` on this node and its inputs
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
mut f: F,
Expand All @@ -872,8 +895,8 @@ impl LogicalPlan {
})
}

/// Rewrites all subquery `LogicalPlan` in the current `LogicalPlan` node
/// using `f`.
/// Similarly to [`Self::map_children`], rewrites all subqueries that may
/// appear in expressions such as `IN (SELECT ...)` using `f`.
///
/// Returns the current node.
pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
Expand Down
10 changes: 5 additions & 5 deletions datafusion/optimizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Every expression in DataFusion has a name, which is used as the column name. For
contains a single column with the name `"COUNT(aggregate_test_100.c9)"`:

```text
select count(c9) from aggregate_test_100;
> select count(c9) from aggregate_test_100;
+------------------------------+
| COUNT(aggregate_test_100.c9) |
+------------------------------+
Expand All @@ -116,7 +116,7 @@ These names are used to refer to the columns in both subqueries as well as inter
to another. For example:

```text
select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq;
> select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq;
+--------------------------------------------+
| sq.COUNT(aggregate_test_100.c9) + Int64(1) |
+--------------------------------------------+
Expand All @@ -134,7 +134,7 @@ Here is a simple example of such a rewrite. The expression `1 + 2` can be intern
displayed the same as `1 + 2`:

```text
select 1 + 2;
> select 1 + 2;
+---------------------+
| Int64(1) + Int64(2) |
+---------------------+
Expand All @@ -146,7 +146,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re
`3 as "1 + 2"`:

```text
explain select 1 + 2;
> explain select 1 + 2;
+---------------+-------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------+
Expand Down Expand Up @@ -289,7 +289,7 @@ The `EXPLAIN VERBOSE` command can be used to show the effect of each optimizatio
In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time.

```text
explain verbose select cast(1 + 2.2 as string) as foo;
> explain verbose select cast(1 + 2.2 as string) as foo;
+------------------------------------------------------------+---------------------------------------------------------------------------+
| plan_type | plan |
+------------------------------------------------------------+---------------------------------------------------------------------------+
Expand Down
50 changes: 40 additions & 10 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_common::{internal_err, DFSchema, DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;

use crate::common_subexpr_eliminate::CommonSubexprEliminate;
Expand Down Expand Up @@ -69,8 +69,12 @@ use crate::utils::log_plan;
/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.add_optimizer_rule
pub trait OptimizerRule {
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
/// Try and rewrite `plan` to an optimized form, returning None if the plan
/// cannot be optimized by this rule.
///
/// Note this API will be deprecated in the future as it requires `clone`ing
/// the input plan, which can be expensive. OptimizerRules should implement
/// [`Self::rewrite`] instead.
fn try_optimize(
&self,
plan: &LogicalPlan,
Expand All @@ -80,12 +84,31 @@ pub trait OptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
/// How should the rule be applied by the optimizer? See comments on
/// [`ApplyOrder`] for details.
///
/// If a rule use default None, it should traverse recursively plan inside itself
/// If returns `None`, the default, the rule must handle recursion itself
fn apply_order(&self) -> Option<ApplyOrder> {
None
}

/// Does this rule support rewriting owned plans (rather than by reference)?
fn supports_rewrite(&self) -> bool {
false
}

/// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
/// if the plan was rewritten and `Transformed::no` if it was not.
///
/// Note: this function is only called if [`Self::supports_rewrite`] returns
/// true. Otherwise the Optimizer calls [`Self::try_optimize`]
fn rewrite(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
internal_err!("rewrite is not implemented for {}", self.name())
}
}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -298,12 +321,19 @@ fn optimize_plan_node(
rule: &dyn OptimizerRule,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// TODO: add API to OptimizerRule to allow rewriting by ownership
rule.try_optimize(&plan, config)
.map(|maybe_plan| match maybe_plan {
Some(new_plan) => Transformed::yes(new_plan),
if rule.supports_rewrite() {
return rule.rewrite(plan, config);
}

rule.try_optimize(&plan, config).map(|maybe_plan| {
match maybe_plan {
Some(new_plan) => {
// if the node was rewritten by the optimizer, replace the node
Transformed::yes(new_plan)
}
None => Transformed::no(plan),
})
}
})
}

impl Optimizer {
Expand Down
Loading

0 comments on commit 4f283e1

Please sign in to comment.