diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 80bb5ad42e81..19e71a92a706 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -330,7 +330,7 @@ pub trait PruningStatistics { /// `x = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END` /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` -/// `x IS NULL` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_null_count > 0 END` +/// `x IS NULL` | `x_null_count > 0` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` /// /// ## Predicate Evaluation diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index a93282574e8a..0d99d0b5028e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -264,7 +264,7 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result> { /// Recursively walk an expression tree, collecting the unique set of columns /// referenced in the expression pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { - inspect_expr_pre(expr, |expr| { + expr.apply(&mut |expr| { match expr { Expr::Column(qc) => { accum.insert(qc.clone()); @@ -307,8 +307,9 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { | Expr::Placeholder(_) | Expr::OuterReferenceColumn { .. } => {} } - Ok(()) + Ok(TreeNodeRecursion::Continue) }) + .map(|_| ()) } /// Find excluded columns in the schema, if any @@ -838,11 +839,11 @@ pub fn find_column_exprs(exprs: &[Expr]) -> Vec { pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec { let mut exprs = vec![]; - inspect_expr_pre(e, |expr| { + e.apply(&mut |expr| { if let Expr::Column(c) = expr { exprs.push(c.clone()) } - Ok(()) as Result<()> + Ok(TreeNodeRecursion::Continue) }) // As the closure always returns Ok, this "can't" error .expect("Unexpected error"); @@ -867,7 +868,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr( schema: &DFSchemaRef, ) -> Vec { let mut indexes = vec![]; - inspect_expr_pre(e, |expr| { + e.apply(&mut |expr| { match expr { Expr::Column(qc) => { if let Ok(idx) = schema.index_of_column(qc) { @@ -879,7 +880,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr( } _ => {} } - Ok(()) as Result<()> + Ok(TreeNodeRecursion::Continue) }) .unwrap(); indexes diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index c7eb6e895d57..b446fe2f320e 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//! [`Analyzer`] and [`AnalyzerRule`] use std::sync::Arc; use log::debug; diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 12e84a63ea15..dbcf02b26ba6 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`PullUpCorrelatedExpr`] converts correlated subqueries to `Joins` + use std::collections::{BTreeSet, HashMap}; use std::ops::Deref; @@ -31,8 +33,11 @@ use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_physical_expr::execution_props::ExecutionProps; -/// This struct rewrite the sub query plan by pull up the correlated expressions(contains outer reference columns) from the inner subquery's 'Filter'. -/// It adds the inner reference columns to the 'Projection' or 'Aggregate' of the subquery if they are missing, so that they can be evaluated by the parent operator as the join condition. +/// This struct rewrite the sub query plan by pull up the correlated +/// expressions(contains outer reference columns) from the inner subquery's +/// 'Filter'. It adds the inner reference columns to the 'Projection' or +/// 'Aggregate' of the subquery if they are missing, so that they can be +/// evaluated by the parent operator as the join condition. pub struct PullUpCorrelatedExpr { pub join_filters: Vec, // mapping from the plan to its holding correlated columns @@ -54,7 +59,9 @@ pub struct PullUpCorrelatedExpr { /// This is used to handle the Count bug pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true"; -/// Mapping from expr display name to its evaluation result on empty record batch (for example: 'count(*)' is 'ScalarValue(0)', 'count(*) + 2' is 'ScalarValue(2)') +/// Mapping from expr display name to its evaluation result on empty record +/// batch (for example: 'count(*)' is 'ScalarValue(0)', 'count(*) + 2' is +/// 'ScalarValue(2)') pub type ExprResultMap = HashMap; impl TreeNodeRewriter for PullUpCorrelatedExpr { diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index b94cf37c5c12..019e7507b122 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//! [`DecorrelatePredicateSubquery`] converts `IN`/`EXISTS` subquery predicates to `SEMI`/`ANTI` joins use std::collections::BTreeSet; use std::ops::Deref; use std::sync::Arc; diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 7f65690a4a7c..18a9c05b9dc6 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to eliminate cross join to inner join if join predicates are available in filters. +//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available. use std::collections::HashSet; use std::sync::Arc; diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index de05717a72e2..349d4d8878e0 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`EliminateDuplicatedExpr`] Removes redundant expressions + use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index fea14342ca77..9411dc192beb 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to replace `where false or null` on a plan with an empty relation. -//! This saves time in planning and executing the query. -//! Note that this rule should be applied after simplify expressions optimizer rule. +//! [`EliminateFilter`] replaces `where false` or `where null` with an empty relation. + use crate::optimizer::ApplyOrder; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ @@ -27,7 +26,11 @@ use datafusion_expr::{ use crate::{OptimizerConfig, OptimizerRule}; -/// Optimization rule that eliminate the scalar value (true/false/null) filter with an [LogicalPlan::EmptyRelation] +/// Optimization rule that eliminate the scalar value (true/false/null) filter +/// with an [LogicalPlan::EmptyRelation] +/// +/// This saves time in planning and executing the query. +/// Note that this rule should be applied after simplify expressions optimizer rule. #[derive(Default)] pub struct EliminateFilter; diff --git a/datafusion/optimizer/src/eliminate_join.rs b/datafusion/optimizer/src/eliminate_join.rs index 0dbebcc8a051..e685229c61b2 100644 --- a/datafusion/optimizer/src/eliminate_join.rs +++ b/datafusion/optimizer/src/eliminate_join.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null` use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{Result, ScalarValue}; @@ -24,7 +25,7 @@ use datafusion_expr::{ CrossJoin, Expr, }; -/// Eliminates joins when inner join condition is false. +/// Eliminates joins when join condition is false. /// Replaces joins when inner join condition is true with a cross join. #[derive(Default)] pub struct EliminateJoin; diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index 4386253740aa..fb5d0d17b839 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -15,18 +15,19 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to replace `LIMIT 0` or -//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch` -//! on a plan with an empty relation. -//! This rule also removes OFFSET 0 from the [LogicalPlan] -//! This saves time in planning and executing the query. +//! [`EliminateLimit`] eliminates `LIMIT` when possible use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan}; -/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None). -/// It can cooperate with `propagate_empty_relation` and `limit_push_down`. +/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is +/// greater than or equal to current's fetch +/// +/// It can cooperate with `propagate_empty_relation` and `limit_push_down`. on a +/// plan with an empty relation. +/// +/// This rule also removes OFFSET 0 from the [LogicalPlan] #[derive(Default)] pub struct EliminateLimit; diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 5771ea2e19a2..924a0853418c 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to replace nested unions to single union. +//! [`EliminateNestedUnion`]: flattens nested `Union` to a single `Union` use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; diff --git a/datafusion/optimizer/src/eliminate_one_union.rs b/datafusion/optimizer/src/eliminate_one_union.rs index 70ee490346ff..63c3e789daa6 100644 --- a/datafusion/optimizer/src/eliminate_one_union.rs +++ b/datafusion/optimizer/src/eliminate_one_union.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to eliminate one union. +//! [`EliminateOneUnion`] eliminates single element `Union` use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::logical_plan::{LogicalPlan, Union}; diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 56a4a76987f7..a004da2bff19 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to eliminate left/right/full join to inner join if possible. +//! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{Column, DFSchema, Result}; use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan}; diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 24664d57c38d..4cfcd07b47d9 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ExtractEquijoinPredicate`] rule that extracts equijoin predicates +//! [`ExtractEquijoinPredicate`] identifies equality join (equijoin) predicates use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::DFSchema; diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index 95cd8a9fd36c..16039b182bb2 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -15,10 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! The FilterNullJoinKeys rule will identify inner joins with equi-join conditions -//! where the join key is nullable on one side and non-nullable on the other side -//! and then insert an `IsNotNull` filter on the nullable side since null values -//! can never match. +//! [`FilterNullJoinKeys`] adds filters to join inputs when input isn't nullable use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index b54facc5d682..f1f49727c39c 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -15,6 +15,19 @@ // specific language governing permissions and limitations // under the License. +//! # DataFusion Optimizer +//! +//! Contains rules for rewriting [`LogicalPlan`]s +//! +//! 1. [`Analyzer`] applies [`AnalyzerRule`]s to transform `LogicalPlan`s +//! to make the plan valid prior to the rest of the DataFusion optimization +//! process (for example, [`TypeCoercion`]). +//! +//! 2. [`Optimizer`] applies [`OptimizerRule`]s to transform `LogicalPlan`s +//! into equivalent, but more efficient plans. +//! +//! [`LogicalPlan`]: datafusion_expr::LogicalPlan +//! [`TypeCoercion`]: analyzer::type_coercion::TypeCoercion pub mod analyzer; pub mod common_subexpr_eliminate; pub mod decorrelate; @@ -46,7 +59,8 @@ pub mod utils; #[cfg(test)] pub mod test; -pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; +pub use analyzer::{Analyzer, AnalyzerRule}; +pub use optimizer::{Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule}; pub use utils::optimize_children; mod plan_signature; diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index c40a9bb704eb..69905c990a7f 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -15,13 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to prune unnecessary columns from intermediate schemas -//! inside the [`LogicalPlan`]. This rule: -//! - Removes unnecessary columns that do not appear at the output and/or are -//! not used during any computation step. -//! - Adds projections to decrease table column size before operators that -//! benefit from a smaller memory footprint at its input. -//! - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`]. +//! [`OptimizeProjections`] identifies and eliminates unused columns use std::collections::HashSet; use std::sync::Arc; @@ -40,11 +34,17 @@ use datafusion_expr::{ Expr, Projection, TableScan, Window, }; -use datafusion_expr::utils::inspect_expr_pre; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use hashbrown::HashMap; use itertools::{izip, Itertools}; -/// A rule for optimizing logical plans by removing unused columns/fields. +/// Optimizer rule to prune unnecessary columns from intermediate schemas +/// inside the [`LogicalPlan`]. This rule: +/// - Removes unnecessary columns that do not appear at the output and/or are +/// not used during any computation step. +/// - Adds projections to decrease table column size before operators that +/// benefit from a smaller memory footprint at its input. +/// - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`]. /// /// `OptimizeProjections` is an optimizer rule that identifies and eliminates /// columns from a logical plan that are not used by downstream operations. @@ -613,7 +613,7 @@ fn rewrite_expr(expr: &Expr, input: &Projection) -> Result> { /// columns are collected. fn outer_columns(expr: &Expr, columns: &mut HashSet) { // inspect_expr_pre doesn't handle subquery references, so find them explicitly - inspect_expr_pre(expr, |expr| { + expr.apply(&mut |expr| { match expr { Expr::OuterReferenceColumn(_, col) => { columns.insert(col.clone()); @@ -632,7 +632,7 @@ fn outer_columns(expr: &Expr, columns: &mut HashSet) { } _ => {} }; - Ok(()) as Result<()> + Ok(TreeNodeRecursion::Continue) }) // unwrap: closure above never returns Err, so can not be Err here .unwrap(); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 3153f72d7ee7..03ff402c3e3f 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Query optimizer traits +//! [`Optimizer`] and [`OptimizerRule`] use std::collections::HashSet; use std::sync::Arc; @@ -54,7 +54,7 @@ use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; use log::{debug, warn}; -/// `OptimizerRule` transforms one [`LogicalPlan`] into another which +/// `OptimizerRule`s transforms one [`LogicalPlan`] into another which /// computes the same results, but in a potentially more efficient /// way. If there are no suitable transformations for the input plan, /// the optimizer should simply return it unmodified. diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 55fb982d2a87..2aca6f93254a 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//! [`PropagateEmptyRelation`] eliminates nodes fed by `EmptyRelation` use datafusion_common::{plan_err, Result}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{EmptyRelation, JoinType, Projection, Union}; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 83db4b0640a4..ff24df259adf 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -12,8 +12,7 @@ // specific language governing permissions and limitations // under the License. -//! [`PushDownFilter`] Moves filters so they are applied as early as possible in -//! the plan. +//! [`PushDownFilter`] applies filters as early as possible use std::collections::{HashMap, HashSet}; use std::sync::Arc; diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 33d02d5c5628..cca6c3fd9bd1 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to push down LIMIT in the query plan -//! It will push down through projection, limits (taking the smaller limit) +//! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan use std::sync::Arc; @@ -29,7 +28,9 @@ use datafusion_expr::logical_plan::{ }; use datafusion_expr::CrossJoin; -/// Optimization rule that tries to push down LIMIT. +/// Optimization rule that tries to push down `LIMIT`. +/// +//. It will push down through projection, limits (taking the smaller limit) #[derive(Default)] pub struct PushDownLimit {} diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index ccdcf2f65bc8..ae57ed9e5a34 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! Projection Push Down optimizer rule ensures that only referenced columns are -//! loaded into memory - #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index 0055e329c29d..752915be69c0 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//! [`ReplaceDistinctWithAggregate`] replaces `DISTINCT ...` with `GROUP BY ...` use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp}; use crate::{OptimizerConfig, OptimizerRule}; diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 90c96b4b8b8c..059b1452ff3d 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`RewriteDisjunctivePredicate`] rewrites predicates to reduce redundancy + use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 8acc36e479ca..a2c4eabcaae6 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`ScalarSubqueryToJoin`] rewriting scalar subquery filters to `JOIN`s + use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; diff --git a/datafusion/optimizer/src/simplify_expressions/mod.rs b/datafusion/optimizer/src/simplify_expressions/mod.rs index 5244f9a5af88..d0399fef07e6 100644 --- a/datafusion/optimizer/src/simplify_expressions/mod.rs +++ b/datafusion/optimizer/src/simplify_expressions/mod.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +//! [`SimplifyExpressions`] simplifies expressions in the logical plan, +//! [`ExprSimplifier`] simplifies individual `Expr`s. + pub mod expr_simplifier; mod guarantees; mod inlist_simplifier; diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 5b47abb308d0..076bf4e24296 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! single distinct to group by optimizer rule +//! [`SingleDistinctToGroupBy`] replaces `AGG(DISTINCT ..)` with `AGG(..) GROUP BY ..` use std::sync::Arc; diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index f573ac69377b..fda390f37961 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Unwrap-cast binary comparison rule can be used to the binary/inlist comparison expr now, and other type -//! of expr can be added if needed. -//! This rule can reduce adding the `Expr::Cast` the expr instead of adding the `Expr::Cast` to literal expr. +//! [`UnwrapCastInComparison`] rewrites `CAST(col) = lit` to `col = CAST(lit)` use std::cmp::Ordering; use std::sync::Arc; diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 0df79550f143..560c63b18882 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Collection of utility functions that are leveraged by the query optimizer rules +//! Utility functions leveraged by the query optimizer rules use std::collections::{BTreeSet, HashMap}; diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 324e2ea2d773..6ea1b3c40c83 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -364,32 +364,31 @@ fn unnest_generic_list>( options: &UnnestOptions, ) -> Result> { let values = list_array.values(); - if list_array.null_count() == 0 || !options.preserve_nulls { - Ok(values.clone()) - } else { - let mut take_indicies_builder = - PrimitiveArray::

::builder(values.len() + list_array.null_count()); - let mut take_offset = 0; + if list_array.null_count() == 0 { + return Ok(values.clone()); + } - list_array.iter().for_each(|elem| match elem { - Some(array) => { - for i in 0..array.len() { - // take_offset + i is always positive - let take_index = P::Native::from_usize(take_offset + i).unwrap(); - take_indicies_builder.append_value(take_index); - } - take_offset += array.len(); - } - None => { + let mut take_indicies_builder = + PrimitiveArray::

::builder(values.len() + list_array.null_count()); + let offsets = list_array.value_offsets(); + for row in 0..list_array.len() { + if list_array.is_null(row) { + if options.preserve_nulls { take_indicies_builder.append_null(); } - }); - Ok(kernels::take::take( - &values, - &take_indicies_builder.finish(), - None, - )?) + } else { + let start = offsets[row].as_usize(); + let end = offsets[row + 1].as_usize(); + for idx in start..end { + take_indicies_builder.append_value(P::Native::from_usize(idx).unwrap()); + } + } } + Ok(kernels::take::take( + &values, + &take_indicies_builder.finish(), + None, + )?) } fn build_batch_fixedsize_list( @@ -596,3 +595,99 @@ where Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::{ + array::AsArray, + datatypes::{DataType, Field}, + }; + use arrow_array::StringArray; + use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; + + // Create a ListArray with the following list values: + // [A, B, C], [], NULL, [D], NULL, [NULL, F] + fn make_test_array() -> ListArray { + let mut values = vec![]; + let mut offsets = vec![0]; + let mut valid = BooleanBufferBuilder::new(2); + + // [A, B, C] + values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]); + offsets.push(values.len() as i32); + valid.append(true); + + // [] + offsets.push(values.len() as i32); + valid.append(true); + + // NULL with non-zero value length + // Issue https://github.com/apache/arrow-datafusion/issues/9932 + values.push(Some("?")); + offsets.push(values.len() as i32); + valid.append(false); + + // [D] + values.push(Some("D")); + offsets.push(values.len() as i32); + valid.append(true); + + // Another NULL with zero value length + offsets.push(values.len() as i32); + valid.append(false); + + // [NULL, F] + values.extend_from_slice(&[None, Some("F")]); + offsets.push(values.len() as i32); + valid.append(true); + + let field = Arc::new(Field::new("item", DataType::Utf8, true)); + ListArray::new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(StringArray::from(values)), + Some(NullBuffer::new(valid.finish())), + ) + } + + #[test] + fn test_unnest_generic_list() -> datafusion_common::Result<()> { + let list_array = make_test_array(); + + // Test with preserve_nulls = false + let options = UnnestOptions { + preserve_nulls: false, + }; + let unnested_array = + unnest_generic_list::(&list_array, &options)?; + let strs = unnested_array.as_string::().iter().collect::>(); + assert_eq!( + strs, + vec![Some("A"), Some("B"), Some("C"), Some("D"), None, Some("F")] + ); + + // Test with preserve_nulls = true + let options = UnnestOptions { + preserve_nulls: true, + }; + let unnested_array = + unnest_generic_list::(&list_array, &options)?; + let strs = unnested_array.as_string::().iter().collect::>(); + assert_eq!( + strs, + vec![ + Some("A"), + Some("B"), + Some("C"), + None, + Some("D"), + None, + None, + Some("F") + ] + ); + + Ok(()) + } +} diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index 79c36092fd3d..666e86b46002 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -35,8 +35,22 @@ TO 'file_name' [ OPTIONS( option [, ... ] ) ] +`STORED AS` specifies the file format the `COPY` command will write. If this +clause is not specified, it will be inferred from the file extension if possible. + +`PARTITIONED BY` specifies the columns to use for partitioning the output files into +separate hive-style directories. + +The output format is determined by the first match of the following rules: + +1. Value of `STORED AS` +2. Value of the `OPTION (FORMAT ..)` +3. Filename extension (e.g. `foo.parquet` implies `PARQUET` format) + For a detailed list of valid OPTIONS, see [Write Options](write_options). +### Examples + Copy the contents of `source_table` to `file_name.json` in JSON format: ```sql @@ -72,6 +86,23 @@ of hive-style partitioned parquet files: +-------+ ``` +If the the data contains values of `x` and `y` in column1 and only `a` in +column2, output files will appear in the following directory structure: + +``` +dir_name/ + column1=x/ + column2=a/ + .parquet + .parquet + ... + column1=y/ + column2=a/ + .parquet + .parquet + ... +``` + Run the query `SELECT * from source ORDER BY time` and write the results (maintaining the order) to a parquet file named `output.parquet` with a maximum parquet row group size of 10MB: @@ -85,14 +116,10 @@ results (maintaining the order) to a parquet file named +-------+ ``` -The output format is determined by the first match of the following rules: - -1. Value of `STORED AS` -2. Value of the `OPTION (FORMAT ..)` -3. Filename extension (e.g. `foo.parquet` implies `PARQUET` format) - ## INSERT +### Examples + Insert values into a table.

diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md
index ac0a41a97f07..5c204d8fc0e6 100644
--- a/docs/source/user-guide/sql/write_options.md
+++ b/docs/source/user-guide/sql/write_options.md
@@ -35,44 +35,41 @@ If inserting to an external table, table specific write options can be specified
 
 ```sql
 CREATE EXTERNAL TABLE
-my_table(a bigint, b bigint)
-STORED AS csv
-COMPRESSION TYPE gzip
-WITH HEADER ROW
-DELIMITER ';'
-LOCATION '/test/location/my_csv_table/'
-OPTIONS(
-NULL_VALUE 'NAN'
-);
+  my_table(a bigint, b bigint)
+  STORED AS csv
+  COMPRESSION TYPE gzip
+  WITH HEADER ROW
+  DELIMITER ';'
+  LOCATION '/test/location/my_csv_table/'
+  OPTIONS(
+    NULL_VALUE 'NAN'
+  )
 ```
 
 When running `INSERT INTO my_table ...`, the options from the `CREATE TABLE` will be respected (gzip compression, special delimiter, and header row included). There will be a single output file if the output path doesn't have folder format, i.e. ending with a `\`. Note that compression, header, and delimiter settings can also be specified within the `OPTIONS` tuple list. Dedicated syntax within the SQL statement always takes precedence over arbitrary option tuples, so if both are specified the `OPTIONS` setting will be ignored. NULL_VALUE is a CSV format specific option that determines how null values should be encoded within the CSV file.
 
 Finally, options can be passed when running a `COPY` command.
 
+
+
 ```sql
 COPY source_table
-TO 'test/table_with_options'
-(format parquet,
-compression snappy,
-'compression::col1' 'zstd(5)',
-partition_by 'column3, column4'
-)
+  TO 'test/table_with_options'
+  PARTITIONED BY (column3, column4)
+  OPTIONS (
+    format parquet,
+    compression snappy,
+    'compression::column1' 'zstd(5)',
+  )
 ```
 
 In this example, we write the entirety of `source_table` out to a folder of parquet files. One parquet file will be written in parallel to the folder for each partition in the query. The next option `compression` set to `snappy` indicates that unless otherwise specified all columns should use the snappy compression codec. The option `compression::col1` sets an override, so that the column `col1` in the parquet file will use `ZSTD` compression codec with compression level `5`. In general, parquet options which support column specific settings can be specified with the syntax `OPTION::COLUMN.NESTED.PATH`.
 
 ## Available Options
 
-### COPY Specific Options
-
-The following special options are specific to the `COPY` command.
-
-| Option       | Description                                                                                                                                                                         | Default Value |
-| ------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- |
-| FORMAT       | Specifies the file format COPY query will write out. If there're more than one output file or the format cannot be inferred from the file extension, then FORMAT must be specified. | N/A           |
-| PARTITION_BY | Specifies the columns that the output files should be partitioned by into separate hive-style directories. Value should be a comma separated string literal, e.g. 'col1,col2'       | N/A           |
-
 ### JSON Format Specific Options
 
 The following options are available when writing JSON files. Note: If any unsupported option is specified, an error will be raised and the query will fail.