From c352dce54f3e343b0b7b4d4386968208b8af55ce Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Dec 2024 16:20:29 -0800 Subject: [PATCH] Revert "Account for constant equivalence properties in union, tests (#12562)" This reverts commit 577e4bba0f5838846862621e1f5318c949cff2cb. --- .../physical-expr-common/src/sort_expr.rs | 7 - .../physical-expr/src/equivalence/class.rs | 50 +- .../src/equivalence/properties.rs | 433 ++++-------------- 3 files changed, 81 insertions(+), 409 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 6c4bf156ce56..704cb291335f 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -120,13 +120,6 @@ impl PhysicalSortExpr { } } -/// Access the PhysicalSortExpr as a PhysicalExpr -impl AsRef for PhysicalSortExpr { - fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) { - self.expr.as_ref() - } -} - impl PartialEq for PhysicalSortExpr { fn eq(&self, other: &PhysicalSortExpr) -> bool { self.options == other.options && self.expr.eq(&other.expr) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index c1851ddb22b5..00708b4540aa 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -30,6 +30,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; +#[derive(Debug, Clone)] /// A structure representing a expression known to be constant in a physical execution plan. /// /// The `ConstExpr` struct encapsulates an expression that is constant during the execution @@ -40,10 +41,9 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// /// - `expr`: Constant expression for a node in the physical plan. /// -/// - `across_partitions`: A boolean flag indicating whether the constant -/// expression is the same across partitions. If set to `true`, the constant -/// expression has same value for all partitions. If set to `false`, the -/// constant expression may have different values for different partitions. +/// - `across_partitions`: A boolean flag indicating whether the constant expression is +/// valid across partitions. If set to `true`, the constant expression has same value for all partitions. +/// If set to `false`, the constant expression may have different values for different partitions. /// /// # Example /// @@ -56,22 +56,11 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// // create a constant expression from a physical expression /// let const_expr = ConstExpr::from(col); /// ``` -#[derive(Debug, Clone)] pub struct ConstExpr { - /// The expression that is known to be constant (e.g. a `Column`) expr: Arc, - /// Does the constant have the same value across all partitions? See - /// struct docs for more details across_partitions: bool, } -impl PartialEq for ConstExpr { - fn eq(&self, other: &Self) -> bool { - self.across_partitions == other.across_partitions - && self.expr.eq(other.expr.as_any()) - } -} - impl ConstExpr { /// Create a new constant expression from a physical expression. /// @@ -85,17 +74,11 @@ impl ConstExpr { } } - /// Set the `across_partitions` flag - /// - /// See struct docs for more details pub fn with_across_partitions(mut self, across_partitions: bool) -> Self { self.across_partitions = across_partitions; self } - /// Is the expression the same across all partitions? - /// - /// See struct docs for more details pub fn across_partitions(&self) -> bool { self.across_partitions } @@ -118,31 +101,6 @@ impl ConstExpr { across_partitions: self.across_partitions, }) } - - /// Returns true if this constant expression is equal to the given expression - pub fn eq_expr(&self, other: impl AsRef) -> bool { - self.expr.eq(other.as_ref().as_any()) - } - - /// Returns a [`Display`]able list of `ConstExpr`. - pub fn format_list(input: &[ConstExpr]) -> impl Display + '_ { - struct DisplayableList<'a>(&'a [ConstExpr]); - impl<'a> Display for DisplayableList<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let mut first = true; - for const_expr in self.0 { - if first { - first = false; - } else { - write!(f, ",")?; - } - write!(f, "{}", const_expr)?; - } - Ok(()) - } - } - DisplayableList(input) - } } /// Display implementation for `ConstExpr` diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a0cc29685f77..184a817297c4 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -18,8 +18,6 @@ use std::fmt; use std::fmt::Display; use std::hash::{Hash, Hasher}; -use std::iter::Peekable; -use std::slice::Iter; use std::sync::Arc; use super::ordering::collapse_lex_ordering; @@ -282,12 +280,6 @@ impl EquivalenceProperties { self.with_constants(constants) } - /// Remove the specified constant - pub fn remove_constant(mut self, c: &ConstExpr) -> Self { - self.constants.retain(|existing| existing != c); - self - } - /// Track/register physical expressions with constant values. pub fn with_constants( mut self, @@ -1127,7 +1119,15 @@ impl Display for EquivalenceProperties { write!(f, ", eq: {}", self.eq_group)?; } if !self.constants.is_empty() { - write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?; + write!(f, ", const: [")?; + let mut iter = self.constants.iter(); + if let Some(c) = iter.next() { + write!(f, "{}", c)?; + } + for c in iter { + write!(f, ", {}", c)?; + } + write!(f, "]")?; } Ok(()) } @@ -1805,62 +1805,58 @@ impl Hash for ExprWrapper { /// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties` /// of `lhs` and `rhs` according to the schema of `lhs`. -/// -/// Rules: The UnionExec does not interleave its inputs: instead it passes each -/// input partition from the children as its own output. -/// -/// Since the output equivalence properties are properties that are true for -/// *all* output partitions, that is the same as being true for all *input* -/// partitions fn calculate_union_binary( - mut lhs: EquivalenceProperties, + lhs: EquivalenceProperties, mut rhs: EquivalenceProperties, ) -> Result { + // TODO: In some cases, we should be able to preserve some equivalence + // classes. Add support for such cases. + // Harmonize the schema of the rhs with the schema of the lhs (which is the accumulator schema): if !rhs.schema.eq(&lhs.schema) { rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?; } - // First, calculate valid constants for the union. An expression is constant - // at the output of the union if it is constant in both sides. - let constants: Vec<_> = lhs + // First, calculate valid constants for the union. A quantity is constant + // after the union if it is constant in both sides. + let constants = lhs .constants() .iter() .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) .map(|const_expr| { - // TODO: When both sides have a constant column, and the actual - // constant value is the same, then the output properties could - // reflect the constant is valid across all partitions. However we - // don't track the actual value that the ConstExpr takes on, so we - // can't determine that yet + // TODO: When both sides' constants are valid across partitions, + // the union's constant should also be valid if values are + // the same. However, we do not have the capability to + // check this yet. ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) }) .collect(); - // remove any constants that are shared in both outputs (avoid double counting them) - for c in &constants { - lhs = lhs.remove_constant(c); - rhs = rhs.remove_constant(c); - } - // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings( - lhs.normalized_oeq_class().orderings, - lhs.constants(), - &rhs, - ); - orderings.add_satisfied_orderings( - rhs.normalized_oeq_class().orderings, - rhs.constants(), - &lhs, - ); - let orderings = orderings.build(); - - let mut eq_properties = - EquivalenceProperties::new(lhs.schema).with_constants(constants); - + let mut orderings = vec![]; + for mut ordering in lhs.normalized_oeq_class().orderings { + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for mut ordering in rhs.normalized_oeq_class().orderings { + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + let mut eq_properties = EquivalenceProperties::new(lhs.schema); + eq_properties.constants = constants; eq_properties.add_new_orderings(orderings); Ok(eq_properties) } @@ -1893,206 +1889,6 @@ pub fn calculate_union( Ok(acc) } -#[derive(Debug)] -enum AddedOrdering { - /// The ordering was added to the in progress result - Yes, - /// The ordering was not added - No(LexOrdering), -} - -/// Builds valid output orderings of a `UnionExec` -#[derive(Debug)] -struct UnionEquivalentOrderingBuilder { - orderings: Vec, -} - -impl UnionEquivalentOrderingBuilder { - fn new() -> Self { - Self { orderings: vec![] } - } - - /// Add all orderings from `orderings` that satisfy `properties`, - /// potentially augmented with`constants`. - /// - /// Note: any column that is known to be constant can be inserted into the - /// ordering without changing its meaning - /// - /// For example: - /// * `orderings` contains `[a ASC, c ASC]` and `constants` contains `b` - /// * `properties` has required ordering `[a ASC, b ASC]` - /// - /// Then this will add `[a ASC, b ASC]` to the `orderings` list (as `a` was - /// in the sort order and `b` was a constant). - fn add_satisfied_orderings( - &mut self, - orderings: impl IntoIterator, - constants: &[ConstExpr], - properties: &EquivalenceProperties, - ) { - for mut ordering in orderings.into_iter() { - // Progressively shorten the ordering to search for a satisfied prefix: - loop { - match self.try_add_ordering(ordering, constants, properties) { - AddedOrdering::Yes => break, - AddedOrdering::No(o) => { - ordering = o; - ordering.pop(); - } - } - } - } - } - - /// Adds `ordering`, potentially augmented with constants, if it satisfies - /// the target `properties` properties. - /// - /// Returns - /// - /// * [`AddedOrdering::Yes`] if the ordering was added (either directly or - /// augmented), or was empty. - /// - /// * [`AddedOrdering::No`] if the ordering was not added - fn try_add_ordering( - &mut self, - ordering: LexOrdering, - constants: &[ConstExpr], - properties: &EquivalenceProperties, - ) -> AddedOrdering { - if ordering.is_empty() { - AddedOrdering::Yes - } else if constants.is_empty() && properties.ordering_satisfy(&ordering) { - // If the ordering satisfies the target properties, no need to - // augment it with constants. - self.orderings.push(ordering); - AddedOrdering::Yes - } else { - // Did not satisfy target properties, try and augment with constants - // to match the properties - if self.try_find_augmented_ordering(&ordering, constants, properties) { - AddedOrdering::Yes - } else { - AddedOrdering::No(ordering) - } - } - } - - /// Attempts to add `constants` to `ordering` to satisfy the properties. - /// - /// returns true if any orderings were added, false otherwise - fn try_find_augmented_ordering( - &mut self, - ordering: &LexOrdering, - constants: &[ConstExpr], - properties: &EquivalenceProperties, - ) -> bool { - // can't augment if there is nothing to augment with - if constants.is_empty() { - return false; - } - let start_num_orderings = self.orderings.len(); - - // for each equivalent ordering in properties, try and augment - // `ordering` it with the constants to match - for existing_ordering in &properties.oeq_class.orderings { - if let Some(augmented_ordering) = self.augment_ordering( - ordering, - constants, - existing_ordering, - &properties.constants, - ) { - if !augmented_ordering.is_empty() { - assert!(properties.ordering_satisfy(&augmented_ordering)); - self.orderings.push(augmented_ordering); - } - } - } - - self.orderings.len() > start_num_orderings - } - - /// Attempts to augment the ordering with constants to match the - /// `existing_ordering` - /// - /// Returns Some(ordering) if an augmented ordering was found, None otherwise - fn augment_ordering( - &mut self, - ordering: &LexOrdering, - constants: &[ConstExpr], - existing_ordering: &LexOrdering, - existing_constants: &[ConstExpr], - ) -> Option { - let mut augmented_ordering = vec![]; - let mut sort_expr_iter = ordering.iter().peekable(); - let mut existing_sort_expr_iter = existing_ordering.iter().peekable(); - - // walk in parallel down the two orderings, trying to match them up - while sort_expr_iter.peek().is_some() || existing_sort_expr_iter.peek().is_some() - { - // If the next expressions are equal, add the next match - // otherwise try and match with a constant - if let Some(expr) = - advance_if_match(&mut sort_expr_iter, &mut existing_sort_expr_iter) - { - augmented_ordering.push(expr); - } else if let Some(expr) = - advance_if_matches_constant(&mut sort_expr_iter, existing_constants) - { - augmented_ordering.push(expr); - } else if let Some(expr) = - advance_if_matches_constant(&mut existing_sort_expr_iter, constants) - { - augmented_ordering.push(expr); - } else { - // no match, can't continue the ordering, return what we have - break; - } - } - - Some(augmented_ordering) - } - - fn build(self) -> Vec { - self.orderings - } -} - -/// Advances two iterators in parallel -/// -/// If the next expressions are equal, the iterators are advanced and returns -/// the matched expression . -/// -/// Otherwise, the iterators are left unchanged and return `None` -fn advance_if_match( - iter1: &mut Peekable>, - iter2: &mut Peekable>, -) -> Option { - if matches!((iter1.peek(), iter2.peek()), (Some(expr1), Some(expr2)) if expr1.eq(expr2)) - { - iter1.next().unwrap(); - iter2.next().cloned() - } else { - None - } -} - -/// Advances the iterator with a constant -/// -/// If the next expression matches one of the constants, advances the iterator -/// returning the matched expression -/// -/// Otherwise, the iterator is left unchanged and returns `None` -fn advance_if_matches_constant( - iter: &mut Peekable>, - constants: &[ConstExpr], -) -> Option { - let expr = iter.peek()?; - let const_expr = constants.iter().find(|c| c.eq_expr(expr))?; - let found_expr = PhysicalSortExpr::new(Arc::clone(const_expr.expr()), expr.options); - iter.next(); - Some(found_expr) -} - #[cfg(test)] mod tests { use std::ops::Not; @@ -3157,7 +2953,7 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_common_constants() { + fn test_union_equivalence_properties_constants_1() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -3181,9 +2977,10 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_prefix() { + fn test_union_equivalence_properties_constants_2() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) + // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC] .with_child_sort_and_const_exprs( // First child: [a ASC], const [] vec![vec!["a"]], @@ -3205,9 +3002,10 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_asc_desc_mismatch() { + fn test_union_equivalence_properties_constants_3() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) + // Meet ordering between [a ASC], [a DESC] should be [] .with_child_sort_and_const_exprs( // First child: [a ASC], const [] vec![vec!["a"]], @@ -3229,7 +3027,7 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_different_schemas() { + fn test_union_equivalence_properties_constants_4() { let schema = create_test_schema().unwrap(); let schema2 = append_fields(&schema, "1"); UnionEquivalenceTest::new(&schema) @@ -3246,10 +3044,11 @@ mod tests { &schema2, ) .with_expected_sort_and_const_exprs( - // Union orderings: [a ASC] + // Union orderings: + // should be [a ASC] // - // Note that a, and a1 are at the same index for their - // corresponding schemas. + // Where a, and a1 ath the same index for their corresponding + // schemas. vec![vec!["a"]], vec![], ) @@ -3257,7 +3056,9 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_fill_gaps() { + #[ignore] + // ignored due to https://github.com/apache/datafusion/issues/12446 + fn test_union_equivalence_properties_constants() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -3284,58 +3085,13 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_no_fill_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [d] // some other constant - vec![vec!["a", "c"]], - vec!["d"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [[a]] (only a is constant) - vec![vec!["a"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_fill_some_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [c ASC], const [a, b] // some other constant - vec![vec!["c"]], - vec!["a", "b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [a DESC, b], const [] - vec![vec!["a DESC", "b"]], - vec![], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [[a, b]] (can fill in the a/b with constants) - vec![vec!["a DESC", "b"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() { + #[ignore] + // ignored due to https://github.com/apache/datafusion/issues/12446 + fn test_union_equivalence_properties_constants_desc() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( + // NB `b DESC` in the second child // First child orderings: [a ASC, c ASC], const [b] vec![vec!["a", "c"]], vec!["b"], @@ -3359,7 +3115,9 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_gap_fill_symmetric() { + #[ignore] + // ignored due to https://github.com/apache/datafusion/issues/12446 + fn test_union_equivalence_properties_constants_middle() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -3376,8 +3134,8 @@ mod tests { ) .with_expected_sort_and_const_exprs( // Union orderings: - // [a, b, c, d] - // [a, c, b, d] + // [a, b, d] (c constant) + // [a, c, d] (b constant) vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]], vec![], ) @@ -3385,31 +3143,8 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_gap_fill_and_common() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child: [a DESC, d ASC], const [b, c] - vec![vec!["a DESC", "d"]], - vec!["b", "c"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child: [a DESC, c ASC, d ASC], const [b] - vec![vec!["a DESC", "c", "d"]], - vec!["b"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: - // [a DESC, c, d] [b] - vec![vec!["a DESC", "c", "d"]], - vec!["b"], - ) - .run() - } - - #[test] + #[ignore] + // ignored due to https://github.com/apache/datafusion/issues/12446 fn test_union_equivalence_properties_constants_middle_desc() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -3520,32 +3255,18 @@ mod tests { child_properties, expected_properties, } = self; - let expected_properties = expected_properties.expect("expected_properties not set"); - - // try all permutations of the children - // as the code treats lhs and rhs differently - for child_properties in child_properties - .iter() - .cloned() - .permutations(child_properties.len()) - { - println!("--- permutation ---"); - for c in &child_properties { - println!("{c}"); - } - let actual_properties = - calculate_union(child_properties, Arc::clone(&output_schema)) - .expect("failed to calculate union equivalence properties"); - assert_eq_properties_same( - &actual_properties, - &expected_properties, - format!( - "expected: {expected_properties:?}\nactual: {actual_properties:?}" - ), - ); - } + let actual_properties = + calculate_union(child_properties, Arc::clone(&output_schema)) + .expect("failed to calculate union equivalence properties"); + assert_eq_properties_same( + &actual_properties, + &expected_properties, + format!( + "expected: {expected_properties:?}\nactual: {actual_properties:?}" + ), + ); } /// Make equivalence properties for the specified columns named in orderings and constants