diff --git a/.github/actions/setup-rust-runtime/action.yaml b/.github/actions/setup-rust-runtime/action.yaml index 27cdf9b97419..1d814055ae28 100644 --- a/.github/actions/setup-rust-runtime/action.yaml +++ b/.github/actions/setup-rust-runtime/action.yaml @@ -30,12 +30,9 @@ runs: # # Set debuginfo=line-tables-only as debuginfo=0 causes immensely slow build # See for more details: https://github.com/rust-lang/rust/issues/119560 - # - # set RUST_MIN_STACK to avoid rust stack overflows on tpc-ds tests run: | echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV echo "RUST_BACKTRACE=1" >> $GITHUB_ENV - echo "RUST_MIN_STACK=3000000" >> $GITHUB_ENV echo "RUSTFLAGS=-C debuginfo=line-tables-only -C incremental=false" >> $GITHUB_ENV diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 554722f37ba2..8e088e7a0b56 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -22,68 +22,25 @@ use std::sync::Arc; use crate::Result; -/// This macro is used to control continuation behaviors during tree traversals -/// based on the specified direction. Depending on `$DIRECTION` and the value of -/// the given expression (`$EXPR`), which should be a variant of [`TreeNodeRecursion`], -/// the macro results in the following behavior: -/// -/// - If the expression returns [`TreeNodeRecursion::Continue`], normal execution -/// continues. -/// - If it returns [`TreeNodeRecursion::Stop`], recursion halts and propagates -/// [`TreeNodeRecursion::Stop`]. -/// - If it returns [`TreeNodeRecursion::Jump`], the continuation behavior depends -/// on the traversal direction: -/// - For `UP` direction, the function returns with [`TreeNodeRecursion::Jump`], -/// bypassing further bottom-up closures until the next top-down closure. -/// - For `DOWN` direction, the function returns with [`TreeNodeRecursion::Continue`], -/// skipping further exploration. -/// - If no direction is specified, `Jump` is treated like `Continue`. -#[macro_export] -macro_rules! handle_visit_recursion { - // Internal helper macro for handling the `Jump` case based on the direction: - (@handle_jump UP) => { - return Ok(TreeNodeRecursion::Jump) - }; - (@handle_jump DOWN) => { - return Ok(TreeNodeRecursion::Continue) - }; - (@handle_jump) => { - {} // Treat `Jump` like `Continue`, do nothing and continue execution. - }; +/// These macros are used to determine continuation during transforming traversals. +macro_rules! handle_transform_recursion { + ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{ + #[allow(clippy::redundant_closure_call)] + $F_DOWN? + .transform_children(|n| n.map_children($F_CHILD))? + .transform_parent(|n| $F_UP(n)) + }}; +} - // Main macro logic with variables to handle directionality. - ($EXPR:expr $(, $DIRECTION:ident)?) => { - match $EXPR { - TreeNodeRecursion::Continue => {} - TreeNodeRecursion::Jump => handle_visit_recursion!(@handle_jump $($DIRECTION)?), - TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), - } - }; +macro_rules! handle_transform_recursion_down { + ($F_DOWN:expr, $F_CHILD:expr) => {{ + $F_DOWN?.transform_children(|n| n.map_children($F_CHILD)) + }}; } -/// This macro is used to determine continuation during combined transforming -/// traversals. -/// -/// Depending on the [`TreeNodeRecursion`] the bottom-up closure returns, -/// [`Transformed::try_transform_node_with()`] decides recursion continuation -/// and if state propagation is necessary. Then, the same procedure recursively -/// applies to the children of the node in question. -macro_rules! handle_transform_recursion { - ($F_DOWN:expr, $F_SELF:expr, $F_UP:expr) => {{ - let pre_visited = $F_DOWN?; - match pre_visited.tnr { - TreeNodeRecursion::Continue => pre_visited - .data - .map_children($F_SELF)? - .try_transform_node_with($F_UP, TreeNodeRecursion::Jump), - #[allow(clippy::redundant_closure_call)] - TreeNodeRecursion::Jump => $F_UP(pre_visited.data), - TreeNodeRecursion::Stop => return Ok(pre_visited), - } - .map(|mut post_visited| { - post_visited.transformed |= pre_visited.transformed; - post_visited - }) +macro_rules! handle_transform_recursion_up { + ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{ + $SELF.map_children($F_CHILD)?.transform_parent(|n| $F_UP(n)) }}; } @@ -128,17 +85,10 @@ pub trait TreeNode: Sized { &self, visitor: &mut V, ) -> Result { - match visitor.f_down(self)? { - TreeNodeRecursion::Continue => { - handle_visit_recursion!( - self.apply_children(&mut |n| n.visit(visitor))?, - UP - ); - visitor.f_up(self) - } - TreeNodeRecursion::Jump => visitor.f_up(self), - TreeNodeRecursion::Stop => Ok(TreeNodeRecursion::Stop), - } + visitor + .f_down(self)? + .visit_children(|| self.apply_children(|c| c.visit(visitor)))? + .visit_parent(|| visitor.f_up(self)) } /// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for @@ -184,8 +134,7 @@ pub trait TreeNode: Sized { &self, f: &mut F, ) -> Result { - handle_visit_recursion!(f(self)?, DOWN); - self.apply_children(&mut |n| n.apply(f)) + f(self)?.visit_children(|| self.apply_children(|c| c.apply(f))) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -205,10 +154,7 @@ pub trait TreeNode: Sized { self, f: &F, ) -> Result> { - f(self)?.try_transform_node_with( - |n| n.map_children(|c| c.transform_down(f)), - TreeNodeRecursion::Continue, - ) + handle_transform_recursion_down!(f(self), |c| c.transform_down(f)) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -218,10 +164,7 @@ pub trait TreeNode: Sized { self, f: &mut F, ) -> Result> { - f(self)?.try_transform_node_with( - |n| n.map_children(|c| c.transform_down_mut(f)), - TreeNodeRecursion::Continue, - ) + handle_transform_recursion_down!(f(self), |c| c.transform_down_mut(f)) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -232,8 +175,7 @@ pub trait TreeNode: Sized { self, f: &F, ) -> Result> { - self.map_children(|c| c.transform_up(f))? - .try_transform_node_with(f, TreeNodeRecursion::Jump) + handle_transform_recursion_up!(self, |c| c.transform_up(f), f) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -244,8 +186,7 @@ pub trait TreeNode: Sized { self, f: &mut F, ) -> Result> { - self.map_children(|c| c.transform_up_mut(f))? - .try_transform_node_with(f, TreeNodeRecursion::Jump) + handle_transform_recursion_up!(self, |c| c.transform_up_mut(f), f) } /// Transforms the tree using `f_down` while traversing the tree top-down @@ -355,7 +296,7 @@ pub trait TreeNode: Sized { /// Apply the closure `F` to the node's children. fn apply_children Result>( &self, - f: &mut F, + f: F, ) -> Result; /// Apply transform `F` to the node's children. Note that the transform `F` @@ -432,6 +373,45 @@ pub enum TreeNodeRecursion { Stop, } +impl TreeNodeRecursion { + /// Continues visiting nodes with `f` depending on the current [`TreeNodeRecursion`] + /// value and the fact that `f` is visiting the current node's children. + pub fn visit_children Result>( + self, + f: F, + ) -> Result { + match self { + TreeNodeRecursion::Continue => f(), + TreeNodeRecursion::Jump => Ok(TreeNodeRecursion::Continue), + TreeNodeRecursion::Stop => Ok(self), + } + } + + /// Continues visiting nodes with `f` depending on the current [`TreeNodeRecursion`] + /// value and the fact that `f` is visiting the current node's sibling. + pub fn visit_sibling Result>( + self, + f: F, + ) -> Result { + match self { + TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => f(), + TreeNodeRecursion::Stop => Ok(self), + } + } + + /// Continues visiting nodes with `f` depending on the current [`TreeNodeRecursion`] + /// value and the fact that `f` is visiting the current node's parent. + pub fn visit_parent Result>( + self, + f: F, + ) -> Result { + match self { + TreeNodeRecursion::Continue => f(), + TreeNodeRecursion::Jump | TreeNodeRecursion::Stop => Ok(self), + } + } +} + /// This struct is used by tree transformation APIs such as /// - [`TreeNode::rewrite`], /// - [`TreeNode::transform_down`], @@ -489,15 +469,23 @@ impl Transformed { f(self.data).map(|data| Transformed::new(data, self.transformed, self.tnr)) } - /// Handling [`TreeNodeRecursion::Continue`] and [`TreeNodeRecursion::Stop`] - /// is straightforward, but [`TreeNodeRecursion::Jump`] can behave differently - /// when we are traversing down or up on a tree. If [`TreeNodeRecursion`] of - /// the node is [`TreeNodeRecursion::Jump`], recursion stops with the given - /// `return_if_jump` value. - fn try_transform_node_with Result>>( + /// Maps the [`Transformed`] object to the result of the given `f`. + pub fn transform_data Result>>( + self, + f: F, + ) -> Result> { + f(self.data).map(|mut t| { + t.transformed |= self.transformed; + t + }) + } + + /// Maps the [`Transformed`] object to the result of the given `f` depending on the + /// current [`TreeNodeRecursion`] value and the fact that `f` is changing the current + /// node's children. + pub fn transform_children Result>>( mut self, f: F, - return_if_jump: TreeNodeRecursion, ) -> Result> { match self.tnr { TreeNodeRecursion::Continue => { @@ -507,37 +495,67 @@ impl Transformed { }); } TreeNodeRecursion::Jump => { - self.tnr = return_if_jump; + self.tnr = TreeNodeRecursion::Continue; } TreeNodeRecursion::Stop => {} } Ok(self) } - /// If [`TreeNodeRecursion`] of the node is [`TreeNodeRecursion::Continue`] or - /// [`TreeNodeRecursion::Jump`], transformation is applied to the node. - /// Otherwise, it remains as it is. - pub fn try_transform_node Result>>( + /// Maps the [`Transformed`] object to the result of the given `f` depending on the + /// current [`TreeNodeRecursion`] value and the fact that `f` is changing the current + /// node's sibling. + pub fn transform_sibling Result>>( self, f: F, ) -> Result> { - if self.tnr == TreeNodeRecursion::Stop { - Ok(self) - } else { - f(self.data).map(|mut t| { + match self.tnr { + TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => { + f(self.data).map(|mut t| { + t.transformed |= self.transformed; + t + }) + } + TreeNodeRecursion::Stop => Ok(self), + } + } + + /// Maps the [`Transformed`] object to the result of the given `f` depending on the + /// current [`TreeNodeRecursion`] value and the fact that `f` is changing the current + /// node's parent. + pub fn transform_parent Result>>( + self, + f: F, + ) -> Result> { + match self.tnr { + TreeNodeRecursion::Continue => f(self.data).map(|mut t| { t.transformed |= self.transformed; t - }) + }), + TreeNodeRecursion::Jump | TreeNodeRecursion::Stop => Ok(self), } } } /// Transformation helper to process a sequence of iterable tree nodes that are siblings. -pub trait TransformedIterator: Iterator { +pub trait TreeNodeIterator: Iterator { /// Apples `f` to each item in this iterator /// /// Visits all items in the iterator unless - /// `f` returns an error or `f` returns TreeNodeRecursion::stop. + /// `f` returns an error or `f` returns `TreeNodeRecursion::Stop`. + /// + /// # Returns + /// Error if `f` returns an error or `Ok(TreeNodeRecursion)` from the last invocation + /// of `f` or `Continue` if the iterator is empty + fn apply_until_stop Result>( + self, + f: F, + ) -> Result; + + /// Apples `f` to each item in this iterator + /// + /// Visits all items in the iterator unless + /// `f` returns an error or `f` returns `TreeNodeRecursion::Stop`. /// /// # Returns /// Error if `f` returns an error @@ -554,7 +572,22 @@ pub trait TransformedIterator: Iterator { ) -> Result>>; } -impl TransformedIterator for I { +impl TreeNodeIterator for I { + fn apply_until_stop Result>( + self, + mut f: F, + ) -> Result { + let mut tnr = TreeNodeRecursion::Continue; + for i in self { + tnr = f(i)?; + match tnr { + TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {} + TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), + } + } + Ok(tnr) + } + fn map_until_stop_and_collect< F: FnMut(Self::Item) -> Result>, >( @@ -580,7 +613,7 @@ impl TransformedIterator for I { /// Transformation helper to process a heterogeneous sequence of tree node containing /// expressions. -/// This macro is very similar to [TransformedIterator::map_until_stop_and_collect] to +/// This macro is very similar to [TreeNodeIterator::map_until_stop_and_collect] to /// process nodes that are siblings, but it accepts an initial transformation (`F0`) and /// a sequence of pairs. Each pair is made of an expression (`EXPR`) and its /// transformation (`F`). @@ -664,14 +697,9 @@ pub trait DynTreeNode { impl TreeNode for Arc { fn apply_children Result>( &self, - f: &mut F, + f: F, ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for child in self.arc_children() { - tnr = f(&child)?; - handle_visit_recursion!(tnr) - } - Ok(tnr) + self.arc_children().iter().apply_until_stop(f) } fn map_children Result>>( @@ -714,14 +742,9 @@ pub trait ConcreteTreeNode: Sized { impl TreeNode for T { fn apply_children Result>( &self, - f: &mut F, + f: F, ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for child in self.children() { - tnr = f(child)?; - handle_visit_recursion!(tnr) - } - Ok(tnr) + self.children().into_iter().apply_until_stop(f) } fn map_children Result>>( @@ -745,7 +768,7 @@ mod tests { use std::fmt::Display; use crate::tree_node::{ - Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, TreeNodeRewriter, + Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, }; use crate::Result; @@ -763,22 +786,17 @@ mod tests { } impl TreeNode for TestTreeNode { - fn apply_children(&self, f: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - let mut tnr = TreeNodeRecursion::Continue; - for child in &self.children { - tnr = f(child)?; - handle_visit_recursion!(tnr); - } - Ok(tnr) + fn apply_children Result>( + &self, + f: F, + ) -> Result { + self.children.iter().apply_until_stop(f) } - fn map_children(self, f: F) -> Result> - where - F: FnMut(Self) -> Result>, - { + fn map_children Result>>( + self, + f: F, + ) -> Result> { Ok(self .children .into_iter() diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 5ef7b6241b60..6625abd650d7 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -61,6 +61,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; +use itertools::Itertools; use object_store::ObjectStore; /// Configuration for creating a [`ListingTable`] @@ -438,6 +439,112 @@ impl ListingOptions { self.format.infer_schema(state, &store, &files).await } + + /// Infers the partition columns stored in `LOCATION` and compares + /// them with the columns provided in `PARTITIONED BY` to help prevent + /// accidental corrupts of partitioned tables. + /// + /// Allows specifying partial partitions. + pub async fn validate_partitions( + &self, + state: &SessionState, + table_path: &ListingTableUrl, + ) -> Result<()> { + if self.table_partition_cols.is_empty() { + return Ok(()); + } + + if !table_path.is_collection() { + return plan_err!( + "Can't create a partitioned table backed by a single file, \ + perhaps the URL is missing a trailing slash?" + ); + } + + let inferred = self.infer_partitions(state, table_path).await?; + + // no partitioned files found on disk + if inferred.is_empty() { + return Ok(()); + } + + let table_partition_names = self + .table_partition_cols + .iter() + .map(|(col_name, _)| col_name.clone()) + .collect_vec(); + + if inferred.len() < table_partition_names.len() { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } + + // match prefix to allow creating tables with partial partitions + for (idx, col) in table_partition_names.iter().enumerate() { + if &inferred[idx] != col { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } + } + + Ok(()) + } + + /// Infer the partitioning at the given path on the provided object store. + /// For performance reasons, it doesn't read all the files on disk + /// and therefore may fail to detect invalid partitioning. + async fn infer_partitions( + &self, + state: &SessionState, + table_path: &ListingTableUrl, + ) -> Result> { + let store = state.runtime_env().object_store(table_path)?; + + // only use 10 files for inference + // This can fail to detect inconsistent partition keys + // A DFS traversal approach of the store can help here + let files: Vec<_> = table_path + .list_all_files(state, store.as_ref(), &self.file_extension) + .await? + .take(10) + .try_collect() + .await?; + + let stripped_path_parts = files.iter().map(|file| { + table_path + .strip_prefix(&file.location) + .unwrap() + .collect_vec() + }); + + let partition_keys = stripped_path_parts + .map(|path_parts| { + path_parts + .into_iter() + .rev() + .skip(1) // get parents only; skip the file itself + .rev() + .map(|s| s.split('=').take(1).collect()) + .collect_vec() + }) + .collect_vec(); + + match partition_keys.into_iter().all_equal_value() { + Ok(v) => Ok(v), + Err(None) => Ok(vec![]), + Err(Some(diff)) => { + let mut sorted_diff = [diff.0, diff.1]; + sorted_diff.sort(); + plan_err!("Found mixed partition values on disk {:?}", sorted_diff) + } + } + } } /// Reads data from one or more files via an diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index cbadf163cecc..1a0eb34d1234 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -137,6 +137,8 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); + options.validate_partitions(state, &table_path).await?; + let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, Some(s) => s, diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 370ca91a0b0e..1ea411cb6f59 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type}; use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, ColumnStatistics, Statistics}; +use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::LexOrdering; use log::warn; @@ -256,9 +256,17 @@ impl PartitionColumnProjector { file_batch.columns().len() ); } + let mut cols = file_batch.columns().to_vec(); for &(pidx, sidx) in &self.projected_partition_indexes { - let mut partition_value = Cow::Borrowed(&partition_values[pidx]); + let p_value = + partition_values + .get(pidx) + .ok_or(DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ))?; + + let mut partition_value = Cow::Borrowed(p_value); // check if user forgot to dict-encode the partition value let field = self.projected_schema.field(sidx); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1a582be3013d..31a474bd217c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1113,7 +1113,7 @@ impl SessionContext { table_ref: impl Into, provider: Arc, ) -> Result>> { - let table_ref = table_ref.into(); + let table_ref: TableReference = table_ref.into(); let table = table_ref.table().to_owned(); self.state .read() diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index f6e2171d6b5f..feeace3b5cfd 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -435,10 +435,13 @@ //! and improve compilation times. The crates are: //! //! * [datafusion_common]: Common traits and types -//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure //! * [datafusion_execution]: State and structures needed for execution +//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure +//! * [datafusion_functions]: Scalar function packages +//! * [datafusion_functions_array]: Scalar function packages for `ARRAY`s //! * [datafusion_optimizer]: [`OptimizerRule`]s and [`AnalyzerRule`]s //! * [datafusion_physical_expr]: [`PhysicalExpr`] and related expressions +//! * [datafusion_physical_plan]: [`ExecutionPlan`] and related expressions //! * [datafusion_sql]: SQL planner ([`SqlToRel`]) //! //! [sqlparser]: https://docs.rs/sqlparser/latest/sqlparser diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c1067d75eddd..c25523c5ae33 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -972,67 +972,17 @@ impl DefaultPhysicalPlanner { }) => { let null_equals_null = *null_equals_null; - // If join has expression equijoin keys, add physical projecton. + // If join has expression equijoin keys, add physical projection. let has_expr_join_key = keys.iter().any(|(l, r)| { !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_))) }); if has_expr_join_key { - let left_keys = keys - .iter() - .map(|(l, _r)| l) - .cloned() - .collect::>(); - let right_keys = keys - .iter() - .map(|(_l, r)| r) - .cloned() - .collect::>(); - let (left, right, column_on, added_project) = { - let (left, left_col_keys, left_projected) = - wrap_projection_for_join_if_necessary( - left_keys.as_slice(), - left.as_ref().clone(), - )?; - let (right, right_col_keys, right_projected) = - wrap_projection_for_join_if_necessary( - &right_keys, - right.as_ref().clone(), - )?; - ( - left, - right, - (left_col_keys, right_col_keys), - left_projected || right_projected, - ) - }; - - let join_plan = - LogicalPlan::Join(Join::try_new_with_project_input( - logical_plan, - Arc::new(left), - Arc::new(right), - column_on, - )?); - - // Remove temporary projected columns - let join_plan = if added_project { - let final_join_result = join_schema - .iter() - .map(|(qualifier, field)| { - Expr::Column(datafusion_common::Column::from((qualifier, field.as_ref()))) - }) - .collect::>(); - let projection = - Projection::try_new( - final_join_result, - Arc::new(join_plan), - )?; - LogicalPlan::Projection(projection) - } else { - join_plan - }; - + // Logic extracted into a function here as subsequent recursive create_initial_plan() + // call can cause a stack overflow for a large number of joins. + // + // See #9962 and #1047 for detailed explanation. + let join_plan = project_expr_join_keys(keys,left,right,logical_plan,join_schema)?; return self .create_initial_plan(&join_plan, session_state) .await; @@ -2002,6 +1952,54 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } +/// Adding physical projection to join if has expression equijoin keys. +fn project_expr_join_keys( + keys: &[(Expr, Expr)], + left: &Arc, + right: &Arc, + logical_plan: &LogicalPlan, + join_schema: &Arc, +) -> Result { + let left_keys = keys.iter().map(|(l, _r)| l).cloned().collect::>(); + let right_keys = keys.iter().map(|(_l, r)| r).cloned().collect::>(); + let (left, right, column_on, added_project) = { + let (left, left_col_keys, left_projected) = + wrap_projection_for_join_if_necessary( + left_keys.as_slice(), + left.as_ref().clone(), + )?; + let (right, right_col_keys, right_projected) = + wrap_projection_for_join_if_necessary(&right_keys, right.as_ref().clone())?; + ( + left, + right, + (left_col_keys, right_col_keys), + left_projected || right_projected, + ) + }; + + let join_plan = LogicalPlan::Join(Join::try_new_with_project_input( + logical_plan, + Arc::new(left), + Arc::new(right), + column_on, + )?); + + // Remove temporary projected columns + if added_project { + let final_join_result = join_schema + .iter() + .map(|(qualifier, field)| { + Expr::Column(datafusion_common::Column::from((qualifier, field.as_ref()))) + }) + .collect::>(); + let projection = Projection::try_new(final_join_result, Arc::new(join_plan))?; + Ok(LogicalPlan::Projection(projection)) + } else { + Ok(join_plan) + } +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index df1585e5a598..97331720ce7d 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -25,16 +25,14 @@ use crate::expr::{ use crate::{Expr, GetFieldAccess}; use datafusion_common::tree_node::{ - Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, -}; -use datafusion_common::{ - handle_visit_recursion, internal_err, map_until_stop_and_collect, Result, + Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, }; +use datafusion_common::{internal_err, map_until_stop_and_collect, Result}; impl TreeNode for Expr { fn apply_children Result>( &self, - f: &mut F, + f: F, ) -> Result { let children = match self { Expr::Alias(Alias{expr,..}) @@ -133,19 +131,13 @@ impl TreeNode for Expr { } }; - let mut tnr = TreeNodeRecursion::Continue; - for child in children { - tnr = f(child)?; - handle_visit_recursion!(tnr, DOWN); - } - - Ok(tnr) + children.into_iter().apply_until_stop(f) } - fn map_children(self, mut f: F) -> Result> - where - F: FnMut(Self) -> Result>, - { + fn map_children Result>>( + self, + mut f: F, + ) -> Result> { Ok(match self { Expr::Column(_) | Expr::Wildcard { .. } diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index 02d5d1851289..7a6b1005fede 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -20,9 +20,9 @@ use crate::LogicalPlan; use datafusion_common::tree_node::{ - Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, TreeNodeVisitor, + Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, TreeNodeVisitor, }; -use datafusion_common::{handle_visit_recursion, Result}; +use datafusion_common::Result; impl TreeNode for LogicalPlan { fn apply Result>( @@ -31,9 +31,10 @@ impl TreeNode for LogicalPlan { ) -> Result { // Compared to the default implementation, we need to invoke // [`Self::apply_subqueries`] before visiting its children - handle_visit_recursion!(f(self)?, DOWN); - self.apply_subqueries(f)?; - self.apply_children(&mut |n| n.apply(f)) + f(self)?.visit_children(|| { + self.apply_subqueries(f)?; + self.apply_children(|n| n.apply(f)) + }) } /// To use, define a struct that implements the trait [`TreeNodeVisitor`] and then invoke @@ -62,39 +63,26 @@ impl TreeNode for LogicalPlan { ) -> Result { // Compared to the default implementation, we need to invoke // [`Self::visit_subqueries`] before visiting its children - match visitor.f_down(self)? { - TreeNodeRecursion::Continue => { + visitor + .f_down(self)? + .visit_children(|| { self.visit_subqueries(visitor)?; - handle_visit_recursion!( - self.apply_children(&mut |n| n.visit(visitor))?, - UP - ); - visitor.f_up(self) - } - TreeNodeRecursion::Jump => { - self.visit_subqueries(visitor)?; - visitor.f_up(self) - } - TreeNodeRecursion::Stop => Ok(TreeNodeRecursion::Stop), - } + self.apply_children(|n| n.visit(visitor)) + })? + .visit_parent(|| visitor.f_up(self)) } fn apply_children Result>( &self, - f: &mut F, + f: F, ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for child in self.inputs() { - tnr = f(child)?; - handle_visit_recursion!(tnr, DOWN) - } - Ok(tnr) + self.inputs().into_iter().apply_until_stop(f) } - fn map_children(self, f: F) -> Result> - where - F: FnMut(Self) -> Result>, - { + fn map_children Result>>( + self, + f: F, + ) -> Result> { let new_children = self .inputs() .iter() diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index b7f513727d39..038361c3ee8c 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -146,7 +146,7 @@ fn check_inner_plan( // We want to support as many operators as possible inside the correlated subquery match inner_plan { LogicalPlan::Aggregate(_) => { - inner_plan.apply_children(&mut |plan| { + inner_plan.apply_children(|plan| { check_inner_plan(plan, is_scalar, true, can_contain_outer_ref)?; Ok(TreeNodeRecursion::Continue) })?; @@ -171,7 +171,7 @@ fn check_inner_plan( } LogicalPlan::Window(window) => { check_mixed_out_refer_in_window(window)?; - inner_plan.apply_children(&mut |plan| { + inner_plan.apply_children(|plan| { check_inner_plan(plan, is_scalar, is_aggregate, can_contain_outer_ref)?; Ok(TreeNodeRecursion::Continue) })?; @@ -188,7 +188,7 @@ fn check_inner_plan( | LogicalPlan::Values(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) => { - inner_plan.apply_children(&mut |plan| { + inner_plan.apply_children(|plan| { check_inner_plan(plan, is_scalar, is_aggregate, can_contain_outer_ref)?; Ok(TreeNodeRecursion::Continue) })?; @@ -201,7 +201,7 @@ fn check_inner_plan( .. }) => match join_type { JoinType::Inner => { - inner_plan.apply_children(&mut |plan| { + inner_plan.apply_children(|plan| { check_inner_plan( plan, is_scalar, @@ -221,7 +221,7 @@ fn check_inner_plan( check_inner_plan(right, is_scalar, is_aggregate, can_contain_outer_ref) } JoinType::Full => { - inner_plan.apply_children(&mut |plan| { + inner_plan.apply_children(|plan| { check_inner_plan(plan, is_scalar, is_aggregate, false)?; Ok(TreeNodeRecursion::Continue) })?; diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index c3c0569df707..2fabd5de9282 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Eliminate common sub-expression. +//! [`CommonSubexprEliminate`] to avoid redundant computation of common sub-expressions use std::collections::hash_map::Entry; use std::collections::{BTreeSet, HashMap}; @@ -126,7 +126,7 @@ type Identifier = String; /// same value /// /// Currently only common sub-expressions within a single `LogicalPlan` are -/// be eliminated. +/// eliminated. /// /// # Example /// @@ -148,6 +148,12 @@ type Identifier = String; pub struct CommonSubexprEliminate {} impl CommonSubexprEliminate { + /// Rewrites `exprs_list` with common sub-expressions replaced with a new + /// column. + /// + /// `affected_id` is updated with any sub expressions that were replaced. + /// + /// Returns the rewritten expressions fn rewrite_exprs_list( &self, exprs_list: &[&[Expr]], @@ -166,6 +172,14 @@ impl CommonSubexprEliminate { .collect::>>() } + /// Rewrites the expression in `exprs_list` with common sub-expressions + /// replaced with a new colum and adds a ProjectionExec on top of `input` + /// which computes any replaced common sub-expressions. + /// + /// Returns a tuple of: + /// 1. The rewritten expressions + /// 2. A `LogicalPlan::Projection` with input of `input` that computes any + /// common sub-expressions that were used fn rewrite_expr( &self, exprs_list: &[&[Expr]], @@ -458,7 +472,16 @@ fn pop_expr(new_expr: &mut Vec>) -> Result> { .ok_or_else(|| DataFusionError::Internal("Failed to pop expression".to_string())) } -/// Build the "intermediate" projection plan that evaluates the extracted common expressions. +/// Build the "intermediate" projection plan that evaluates the extracted common +/// expressions. +/// +/// # Arguments +/// input: the input plan +/// +/// affected_id: which common subexpressions were used (and thus are added to +/// intermediate projection) +/// +/// expr_set: the set of common subexpressions fn build_common_expr_project_plan( input: LogicalPlan, affected_id: BTreeSet, @@ -493,10 +516,11 @@ fn build_common_expr_project_plan( )?)) } -/// Build the projection plan to eliminate unexpected columns produced by +/// Build the projection plan to eliminate unnecessary columns produced by /// the "intermediate" projection plan built in [build_common_expr_project_plan]. /// -/// This is for those plans who don't keep its own output schema like `Filter` or `Sort`. +/// This is required to keep the schema the same for plans that pass the input +/// on to the output, such as `Filter` or `Sort`. fn build_recover_project_plan( schema: &DFSchema, input: LogicalPlan, @@ -570,7 +594,7 @@ impl ExprMask { } } -/// Go through an expression tree and generate identifier. +/// Go through an expression tree and generate identifiers for each subexpression. /// /// An identifier contains information of the expression itself and its sub-expression. /// This visitor implementation use a stack `visit_stack` to track traversal, which @@ -679,9 +703,10 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { } } -/// Rewrite expression by replacing detected common sub-expression with -/// the corresponding temporary column name. That column contains the -/// evaluate result of replaced expression. +/// Rewrite expression by common sub-expression with a corresponding temporary +/// column name that will compute the subexpression. +/// +/// `affected_id` is updated with any sub expressions that were replaced struct CommonSubexprRewriter<'a> { expr_set: &'a ExprSet, /// Which identifier is replaced. @@ -726,6 +751,8 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> { } } +/// Replace common sub-expression in `expr` with the corresponding temporary +/// column name, updating `affected_id` with any replaced expressions fn replace_common_expr( expr: Expr, expr_set: &ExprSet, diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index a200217af6e1..8aeeb06c1909 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -113,4 +113,95 @@ statement error DataFusion error: Invalid or Unsupported Configuration: Config v CREATE EXTERNAL TABLE csv_table (column1 int) STORED AS CSV LOCATION 'foo.csv' -OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') \ No newline at end of file +OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') + +# Partitioned table on a single file +query error DataFusion error: Error during planning: Can't create a partitioned table backed by a single file, perhaps the URL is missing a trailing slash\? +CREATE EXTERNAL TABLE single_file_partition(c1 int) +PARTITIONED BY (p2 string, p1 string) +STORED AS CSV +LOCATION 'foo.csv'; + +# Wrong partition order error + +statement ok +CREATE EXTERNAL TABLE partitioned (c1 int) +PARTITIONED BY (p1 string, p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +query ITT +INSERT INTO partitioned VALUES (1, 'x', 'y'); +---- +1 + +query error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2", "p1"\] +CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int) +PARTITIONED BY (p2 string, p1 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +statement error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2"\] +CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int) +PARTITIONED BY (p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +# But allows partial partition selection + +statement ok +CREATE EXTERNAL TABLE partial_partitioned (c1 int) +PARTITIONED BY (p1 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +query IT +SELECT * FROM partial_partitioned; +---- +1 x + +statement ok +CREATE EXTERNAL TABLE inner_partition (c1 int) +PARTITIONED BY (p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/'; + +query IT +SELECT * FROM inner_partition; +---- +1 y + +# Simulate manual creation of invalid (mixed) partitions on disk + +statement ok +CREATE EXTERNAL TABLE test(name string) +PARTITIONED BY (year string, month string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; + +statement ok +-- passes the partition check since the previous statement didn't write to disk +CREATE EXTERNAL TABLE test2(name string) +PARTITIONED BY (month string, year string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; + +query TTT +-- creates year -> month partitions +INSERT INTO test VALUES('name', '2024', '03'); +---- +1 + +query TTT +-- creates month -> year partitions. +-- now table have both partitions (year -> month and month -> year) +INSERT INTO test2 VALUES('name', '2024', '03'); +---- +1 + +statement error DataFusion error: Error during planning: Found mixed partition values on disk \[\["month", "year"\], \["year", "month"\]\] +-- fails to infer as partitions are not consistent +CREATE EXTERNAL TABLE test3(name string) +PARTITIONED BY (month string, year string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';