Skip to content

Commit

Permalink
Merge commit '215f30f74a12e91fd7dca0d30e37014c8c3493ed' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-1-3
  • Loading branch information
appletreeisyellow committed Apr 23, 2024
2 parents f7e4aa1 + 215f30f commit aacf194
Show file tree
Hide file tree
Showing 32 changed files with 261 additions and 110 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub trait PruningStatistics {
/// `x = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END`
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_null_count > 0 END`
/// `x IS NULL` | `x_null_count > 0`
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
Expand Down
13 changes: 7 additions & 6 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result<Vec<Expr>> {
/// Recursively walk an expression tree, collecting the unique set of columns
/// referenced in the expression
pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
inspect_expr_pre(expr, |expr| {
expr.apply(&mut |expr| {
match expr {
Expr::Column(qc) => {
accum.insert(qc.clone());
Expand Down Expand Up @@ -307,8 +307,9 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn { .. } => {}
}
Ok(())
Ok(TreeNodeRecursion::Continue)
})
.map(|_| ())
}

/// Find excluded columns in the schema, if any
Expand Down Expand Up @@ -838,11 +839,11 @@ pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {

pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
let mut exprs = vec![];
inspect_expr_pre(e, |expr| {
e.apply(&mut |expr| {
if let Expr::Column(c) = expr {
exprs.push(c.clone())
}
Ok(()) as Result<()>
Ok(TreeNodeRecursion::Continue)
})
// As the closure always returns Ok, this "can't" error
.expect("Unexpected error");
Expand All @@ -867,7 +868,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
schema: &DFSchemaRef,
) -> Vec<usize> {
let mut indexes = vec![];
inspect_expr_pre(e, |expr| {
e.apply(&mut |expr| {
match expr {
Expr::Column(qc) => {
if let Ok(idx) = schema.index_of_column(qc) {
Expand All @@ -879,7 +880,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
}
_ => {}
}
Ok(()) as Result<()>
Ok(TreeNodeRecursion::Continue)
})
.unwrap();
indexes
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`Analyzer`] and [`AnalyzerRule`]
use std::sync::Arc;

use log::debug;
Expand Down
13 changes: 10 additions & 3 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`PullUpCorrelatedExpr`] converts correlated subqueries to `Joins`
use std::collections::{BTreeSet, HashMap};
use std::ops::Deref;

Expand All @@ -31,8 +33,11 @@ use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_physical_expr::execution_props::ExecutionProps;

/// This struct rewrite the sub query plan by pull up the correlated expressions(contains outer reference columns) from the inner subquery's 'Filter'.
/// It adds the inner reference columns to the 'Projection' or 'Aggregate' of the subquery if they are missing, so that they can be evaluated by the parent operator as the join condition.
/// This struct rewrite the sub query plan by pull up the correlated
/// expressions(contains outer reference columns) from the inner subquery's
/// 'Filter'. It adds the inner reference columns to the 'Projection' or
/// 'Aggregate' of the subquery if they are missing, so that they can be
/// evaluated by the parent operator as the join condition.
pub struct PullUpCorrelatedExpr {
pub join_filters: Vec<Expr>,
// mapping from the plan to its holding correlated columns
Expand All @@ -54,7 +59,9 @@ pub struct PullUpCorrelatedExpr {
/// This is used to handle the Count bug
pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true";

/// Mapping from expr display name to its evaluation result on empty record batch (for example: 'count(*)' is 'ScalarValue(0)', 'count(*) + 2' is 'ScalarValue(2)')
/// Mapping from expr display name to its evaluation result on empty record
/// batch (for example: 'count(*)' is 'ScalarValue(0)', 'count(*) + 2' is
/// 'ScalarValue(2)')
pub type ExprResultMap = HashMap<String, Expr>;

impl TreeNodeRewriter for PullUpCorrelatedExpr {
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`DecorrelatePredicateSubquery`] converts `IN`/`EXISTS` subquery predicates to `SEMI`/`ANTI` joins
use std::collections::BTreeSet;
use std::ops::Deref;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to eliminate cross join to inner join if join predicates are available in filters.
//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use std::collections::HashSet;
use std::sync::Arc;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`EliminateDuplicatedExpr`] Removes redundant expressions
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
Expand Down
11 changes: 7 additions & 4 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to replace `where false or null` on a plan with an empty relation.
//! This saves time in planning and executing the query.
//! Note that this rule should be applied after simplify expressions optimizer rule.
//! [`EliminateFilter`] replaces `where false` or `where null` with an empty relation.
use crate::optimizer::ApplyOrder;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
Expand All @@ -27,7 +26,11 @@ use datafusion_expr::{

use crate::{OptimizerConfig, OptimizerRule};

/// Optimization rule that eliminate the scalar value (true/false/null) filter with an [LogicalPlan::EmptyRelation]
/// Optimization rule that eliminate the scalar value (true/false/null) filter
/// with an [LogicalPlan::EmptyRelation]
///
/// This saves time in planning and executing the query.
/// Note that this rule should be applied after simplify expressions optimizer rule.
#[derive(Default)]
pub struct EliminateFilter;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/eliminate_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null`
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{Result, ScalarValue};
Expand All @@ -24,7 +25,7 @@ use datafusion_expr::{
CrossJoin, Expr,
};

/// Eliminates joins when inner join condition is false.
/// Eliminates joins when join condition is false.
/// Replaces joins when inner join condition is true with a cross join.
#[derive(Default)]
pub struct EliminateJoin;
Expand Down
15 changes: 8 additions & 7 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to replace `LIMIT 0` or
//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch`
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
//! [`EliminateLimit`] eliminates `LIMIT` when possible
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};

/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None).
/// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is
/// greater than or equal to current's fetch
///
/// It can cooperate with `propagate_empty_relation` and `limit_push_down`. on a
/// plan with an empty relation.
///
/// This rule also removes OFFSET 0 from the [LogicalPlan]
#[derive(Default)]
pub struct EliminateLimit;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/eliminate_nested_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to replace nested unions to single union.
//! [`EliminateNestedUnion`]: flattens nested `Union` to a single `Union`
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/eliminate_one_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to eliminate one union.
//! [`EliminateOneUnion`] eliminates single element `Union`
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{LogicalPlan, Union};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to eliminate left/right/full join to inner join if possible.
//! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{Column, DFSchema, Result};
use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`ExtractEquijoinPredicate`] rule that extracts equijoin predicates
//! [`ExtractEquijoinPredicate`] identifies equality join (equijoin) predicates
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::DFSchema;
Expand Down
5 changes: 1 addition & 4 deletions datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! The FilterNullJoinKeys rule will identify inner joins with equi-join conditions
//! where the join key is nullable on one side and non-nullable on the other side
//! and then insert an `IsNotNull` filter on the nullable side since null values
//! can never match.
//! [`FilterNullJoinKeys`] adds filters to join inputs when input isn't nullable
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
Expand Down
16 changes: 15 additions & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
// specific language governing permissions and limitations
// under the License.

//! # DataFusion Optimizer
//!
//! Contains rules for rewriting [`LogicalPlan`]s
//!
//! 1. [`Analyzer`] applies [`AnalyzerRule`]s to transform `LogicalPlan`s
//! to make the plan valid prior to the rest of the DataFusion optimization
//! process (for example, [`TypeCoercion`]).
//!
//! 2. [`Optimizer`] applies [`OptimizerRule`]s to transform `LogicalPlan`s
//! into equivalent, but more efficient plans.
//!
//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
//! [`TypeCoercion`]: analyzer::type_coercion::TypeCoercion
pub mod analyzer;
pub mod common_subexpr_eliminate;
pub mod decorrelate;
Expand Down Expand Up @@ -46,7 +59,8 @@ pub mod utils;
#[cfg(test)]
pub mod test;

pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
pub use analyzer::{Analyzer, AnalyzerRule};
pub use optimizer::{Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule};
pub use utils::optimize_children;

mod plan_signature;
Expand Down
22 changes: 11 additions & 11 deletions datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to prune unnecessary columns from intermediate schemas
//! inside the [`LogicalPlan`]. This rule:
//! - Removes unnecessary columns that do not appear at the output and/or are
//! not used during any computation step.
//! - Adds projections to decrease table column size before operators that
//! benefit from a smaller memory footprint at its input.
//! - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`].
//! [`OptimizeProjections`] identifies and eliminates unused columns
use std::collections::HashSet;
use std::sync::Arc;
Expand All @@ -40,11 +34,17 @@ use datafusion_expr::{
Expr, Projection, TableScan, Window,
};

use datafusion_expr::utils::inspect_expr_pre;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use hashbrown::HashMap;
use itertools::{izip, Itertools};

/// A rule for optimizing logical plans by removing unused columns/fields.
/// Optimizer rule to prune unnecessary columns from intermediate schemas
/// inside the [`LogicalPlan`]. This rule:
/// - Removes unnecessary columns that do not appear at the output and/or are
/// not used during any computation step.
/// - Adds projections to decrease table column size before operators that
/// benefit from a smaller memory footprint at its input.
/// - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`].
///
/// `OptimizeProjections` is an optimizer rule that identifies and eliminates
/// columns from a logical plan that are not used by downstream operations.
Expand Down Expand Up @@ -613,7 +613,7 @@ fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
/// columns are collected.
fn outer_columns(expr: &Expr, columns: &mut HashSet<Column>) {
// inspect_expr_pre doesn't handle subquery references, so find them explicitly
inspect_expr_pre(expr, |expr| {
expr.apply(&mut |expr| {
match expr {
Expr::OuterReferenceColumn(_, col) => {
columns.insert(col.clone());
Expand All @@ -632,7 +632,7 @@ fn outer_columns(expr: &Expr, columns: &mut HashSet<Column>) {
}
_ => {}
};
Ok(()) as Result<()>
Ok(TreeNodeRecursion::Continue)
})
// unwrap: closure above never returns Err, so can not be Err here
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Query optimizer traits
//! [`Optimizer`] and [`OptimizerRule`]
use std::collections::HashSet;
use std::sync::Arc;
Expand Down Expand Up @@ -54,7 +54,7 @@ use datafusion_expr::logical_plan::LogicalPlan;
use chrono::{DateTime, Utc};
use log::{debug, warn};

/// `OptimizerRule` transforms one [`LogicalPlan`] into another which
/// `OptimizerRule`s transforms one [`LogicalPlan`] into another which
/// computes the same results, but in a potentially more efficient
/// way. If there are no suitable transformations for the input plan,
/// the optimizer should simply return it unmodified.
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`PropagateEmptyRelation`] eliminates nodes fed by `EmptyRelation`
use datafusion_common::{plan_err, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{EmptyRelation, JoinType, Projection, Union};
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`PushDownFilter`] Moves filters so they are applied as early as possible in
//! the plan.
//! [`PushDownFilter`] applies filters as early as possible
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
//! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan
use std::sync::Arc;

Expand All @@ -29,7 +28,9 @@ use datafusion_expr::logical_plan::{
};
use datafusion_expr::CrossJoin;

/// Optimization rule that tries to push down LIMIT.
/// Optimization rule that tries to push down `LIMIT`.
///
//. It will push down through projection, limits (taking the smaller limit)
#[derive(Default)]
pub struct PushDownLimit {}

Expand Down
3 changes: 0 additions & 3 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

//! Projection Push Down optimizer rule ensures that only referenced columns are
//! loaded into memory
#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
Loading

0 comments on commit aacf194

Please sign in to comment.