diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 704cb291335f..6c4bf156ce56 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -120,6 +120,13 @@ impl PhysicalSortExpr { } } +/// Access the PhysicalSortExpr as a PhysicalExpr +impl AsRef for PhysicalSortExpr { + fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) { + self.expr.as_ref() + } +} + impl PartialEq for PhysicalSortExpr { fn eq(&self, other: &PhysicalSortExpr) -> bool { self.options == other.options && self.expr.eq(&other.expr) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 00708b4540aa..c1851ddb22b5 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -30,7 +30,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; -#[derive(Debug, Clone)] /// A structure representing a expression known to be constant in a physical execution plan. /// /// The `ConstExpr` struct encapsulates an expression that is constant during the execution @@ -41,9 +40,10 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// /// - `expr`: Constant expression for a node in the physical plan. /// -/// - `across_partitions`: A boolean flag indicating whether the constant expression is -/// valid across partitions. If set to `true`, the constant expression has same value for all partitions. -/// If set to `false`, the constant expression may have different values for different partitions. +/// - `across_partitions`: A boolean flag indicating whether the constant +/// expression is the same across partitions. If set to `true`, the constant +/// expression has same value for all partitions. If set to `false`, the +/// constant expression may have different values for different partitions. /// /// # Example /// @@ -56,11 +56,22 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// // create a constant expression from a physical expression /// let const_expr = ConstExpr::from(col); /// ``` +#[derive(Debug, Clone)] pub struct ConstExpr { + /// The expression that is known to be constant (e.g. a `Column`) expr: Arc, + /// Does the constant have the same value across all partitions? See + /// struct docs for more details across_partitions: bool, } +impl PartialEq for ConstExpr { + fn eq(&self, other: &Self) -> bool { + self.across_partitions == other.across_partitions + && self.expr.eq(other.expr.as_any()) + } +} + impl ConstExpr { /// Create a new constant expression from a physical expression. /// @@ -74,11 +85,17 @@ impl ConstExpr { } } + /// Set the `across_partitions` flag + /// + /// See struct docs for more details pub fn with_across_partitions(mut self, across_partitions: bool) -> Self { self.across_partitions = across_partitions; self } + /// Is the expression the same across all partitions? + /// + /// See struct docs for more details pub fn across_partitions(&self) -> bool { self.across_partitions } @@ -101,6 +118,31 @@ impl ConstExpr { across_partitions: self.across_partitions, }) } + + /// Returns true if this constant expression is equal to the given expression + pub fn eq_expr(&self, other: impl AsRef) -> bool { + self.expr.eq(other.as_ref().as_any()) + } + + /// Returns a [`Display`]able list of `ConstExpr`. + pub fn format_list(input: &[ConstExpr]) -> impl Display + '_ { + struct DisplayableList<'a>(&'a [ConstExpr]); + impl<'a> Display for DisplayableList<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let mut first = true; + for const_expr in self.0 { + if first { + first = false; + } else { + write!(f, ",")?; + } + write!(f, "{}", const_expr)?; + } + Ok(()) + } + } + DisplayableList(input) + } } /// Display implementation for `ConstExpr` diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 1c12f4697005..4ab0cac58257 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -17,6 +17,8 @@ use std::fmt::Display; use std::hash::{Hash, Hasher}; +use std::iter::Peekable; +use std::slice::Iter; use std::sync::Arc; use super::ordering::collapse_lex_ordering; @@ -279,6 +281,12 @@ impl EquivalenceProperties { self.with_constants(constants) } + /// Remove the specified constant + pub fn remove_constant(mut self, c: &ConstExpr) -> Self { + self.constants.retain(|existing| existing != c); + self + } + /// Track/register physical expressions with constant values. pub fn with_constants( mut self, @@ -1120,15 +1128,7 @@ impl Display for EquivalenceProperties { write!(f, ", eq: {}", self.eq_group)?; } if !self.constants.is_empty() { - write!(f, ", const: [")?; - let mut iter = self.constants.iter(); - if let Some(c) = iter.next() { - write!(f, "{}", c)?; - } - for c in iter { - write!(f, ", {}", c)?; - } - write!(f, "]")?; + write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?; } Ok(()) } @@ -1646,58 +1646,62 @@ impl Hash for ExprWrapper { /// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties` /// of `lhs` and `rhs` according to the schema of `lhs`. +/// +/// Rules: The UnionExec does not interleave its inputs: instead it passes each +/// input partition from the children as its own output. +/// +/// Since the output equivalence properties are properties that are true for +/// *all* output partitions, that is the same as being true for all *input* +/// partitions fn calculate_union_binary( - lhs: EquivalenceProperties, + mut lhs: EquivalenceProperties, mut rhs: EquivalenceProperties, ) -> Result { - // TODO: In some cases, we should be able to preserve some equivalence - // classes. Add support for such cases. - // Harmonize the schema of the rhs with the schema of the lhs (which is the accumulator schema): if !rhs.schema.eq(&lhs.schema) { rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?; } - // First, calculate valid constants for the union. A quantity is constant - // after the union if it is constant in both sides. - let constants = lhs + // First, calculate valid constants for the union. An expression is constant + // at the output of the union if it is constant in both sides. + let constants: Vec<_> = lhs .constants() .iter() .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) .map(|const_expr| { - // TODO: When both sides' constants are valid across partitions, - // the union's constant should also be valid if values are - // the same. However, we do not have the capability to - // check this yet. + // TODO: When both sides have a constant column, and the actual + // constant value is the same, then the output properties could + // reflect the constant is valid across all partitions. However we + // don't track the actual value that the ConstExpr takes on, so we + // can't determine that yet ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) }) .collect(); + // remove any constants that are shared in both outputs (avoid double counting them) + for c in &constants { + lhs = lhs.remove_constant(c); + rhs = rhs.remove_constant(c); + } + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = vec![]; - for mut ordering in lhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !rhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } - for mut ordering in rhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !lhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } - let mut eq_properties = EquivalenceProperties::new(lhs.schema); - eq_properties.constants = constants; + let mut orderings = UnionEquivalentOrderingBuilder::new(); + orderings.add_satisfied_orderings( + lhs.normalized_oeq_class().orderings, + lhs.constants(), + &rhs, + ); + orderings.add_satisfied_orderings( + rhs.normalized_oeq_class().orderings, + rhs.constants(), + &lhs, + ); + let orderings = orderings.build(); + + let mut eq_properties = + EquivalenceProperties::new(lhs.schema).with_constants(constants); + eq_properties.add_new_orderings(orderings); Ok(eq_properties) } @@ -1730,6 +1734,206 @@ pub fn calculate_union( Ok(acc) } +#[derive(Debug)] +enum AddedOrdering { + /// The ordering was added to the in progress result + Yes, + /// The ordering was not added + No(LexOrdering), +} + +/// Builds valid output orderings of a `UnionExec` +#[derive(Debug)] +struct UnionEquivalentOrderingBuilder { + orderings: Vec, +} + +impl UnionEquivalentOrderingBuilder { + fn new() -> Self { + Self { orderings: vec![] } + } + + /// Add all orderings from `orderings` that satisfy `properties`, + /// potentially augmented with`constants`. + /// + /// Note: any column that is known to be constant can be inserted into the + /// ordering without changing its meaning + /// + /// For example: + /// * `orderings` contains `[a ASC, c ASC]` and `constants` contains `b` + /// * `properties` has required ordering `[a ASC, b ASC]` + /// + /// Then this will add `[a ASC, b ASC]` to the `orderings` list (as `a` was + /// in the sort order and `b` was a constant). + fn add_satisfied_orderings( + &mut self, + orderings: impl IntoIterator, + constants: &[ConstExpr], + properties: &EquivalenceProperties, + ) { + for mut ordering in orderings.into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + loop { + match self.try_add_ordering(ordering, constants, properties) { + AddedOrdering::Yes => break, + AddedOrdering::No(o) => { + ordering = o; + ordering.pop(); + } + } + } + } + } + + /// Adds `ordering`, potentially augmented with constants, if it satisfies + /// the target `properties` properties. + /// + /// Returns + /// + /// * [`AddedOrdering::Yes`] if the ordering was added (either directly or + /// augmented), or was empty. + /// + /// * [`AddedOrdering::No`] if the ordering was not added + fn try_add_ordering( + &mut self, + ordering: LexOrdering, + constants: &[ConstExpr], + properties: &EquivalenceProperties, + ) -> AddedOrdering { + if ordering.is_empty() { + AddedOrdering::Yes + } else if constants.is_empty() && properties.ordering_satisfy(&ordering) { + // If the ordering satisfies the target properties, no need to + // augment it with constants. + self.orderings.push(ordering); + AddedOrdering::Yes + } else { + // Did not satisfy target properties, try and augment with constants + // to match the properties + if self.try_find_augmented_ordering(&ordering, constants, properties) { + AddedOrdering::Yes + } else { + AddedOrdering::No(ordering) + } + } + } + + /// Attempts to add `constants` to `ordering` to satisfy the properties. + /// + /// returns true if any orderings were added, false otherwise + fn try_find_augmented_ordering( + &mut self, + ordering: &LexOrdering, + constants: &[ConstExpr], + properties: &EquivalenceProperties, + ) -> bool { + // can't augment if there is nothing to augment with + if constants.is_empty() { + return false; + } + let start_num_orderings = self.orderings.len(); + + // for each equivalent ordering in properties, try and augment + // `ordering` it with the constants to match + for existing_ordering in &properties.oeq_class.orderings { + if let Some(augmented_ordering) = self.augment_ordering( + ordering, + constants, + existing_ordering, + &properties.constants, + ) { + if !augmented_ordering.is_empty() { + assert!(properties.ordering_satisfy(&augmented_ordering)); + self.orderings.push(augmented_ordering); + } + } + } + + self.orderings.len() > start_num_orderings + } + + /// Attempts to augment the ordering with constants to match the + /// `existing_ordering` + /// + /// Returns Some(ordering) if an augmented ordering was found, None otherwise + fn augment_ordering( + &mut self, + ordering: &LexOrdering, + constants: &[ConstExpr], + existing_ordering: &LexOrdering, + existing_constants: &[ConstExpr], + ) -> Option { + let mut augmented_ordering = vec![]; + let mut sort_expr_iter = ordering.iter().peekable(); + let mut existing_sort_expr_iter = existing_ordering.iter().peekable(); + + // walk in parallel down the two orderings, trying to match them up + while sort_expr_iter.peek().is_some() || existing_sort_expr_iter.peek().is_some() + { + // If the next expressions are equal, add the next match + // otherwise try and match with a constant + if let Some(expr) = + advance_if_match(&mut sort_expr_iter, &mut existing_sort_expr_iter) + { + augmented_ordering.push(expr); + } else if let Some(expr) = + advance_if_matches_constant(&mut sort_expr_iter, existing_constants) + { + augmented_ordering.push(expr); + } else if let Some(expr) = + advance_if_matches_constant(&mut existing_sort_expr_iter, constants) + { + augmented_ordering.push(expr); + } else { + // no match, can't continue the ordering, return what we have + break; + } + } + + Some(augmented_ordering) + } + + fn build(self) -> Vec { + self.orderings + } +} + +/// Advances two iterators in parallel +/// +/// If the next expressions are equal, the iterators are advanced and returns +/// the matched expression . +/// +/// Otherwise, the iterators are left unchanged and return `None` +fn advance_if_match( + iter1: &mut Peekable>, + iter2: &mut Peekable>, +) -> Option { + if matches!((iter1.peek(), iter2.peek()), (Some(expr1), Some(expr2)) if expr1.eq(expr2)) + { + iter1.next().unwrap(); + iter2.next().cloned() + } else { + None + } +} + +/// Advances the iterator with a constant +/// +/// If the next expression matches one of the constants, advances the iterator +/// returning the matched expression +/// +/// Otherwise, the iterator is left unchanged and returns `None` +fn advance_if_matches_constant( + iter: &mut Peekable>, + constants: &[ConstExpr], +) -> Option { + let expr = iter.peek()?; + let const_expr = constants.iter().find(|c| c.eq_expr(expr))?; + let found_expr = PhysicalSortExpr::new(Arc::clone(const_expr.expr()), expr.options); + iter.next(); + Some(found_expr) +} + #[cfg(test)] mod tests { use std::ops::Not; @@ -2874,7 +3078,7 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_1() { + fn test_union_equivalence_properties_constants_common_constants() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -2898,10 +3102,9 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_2() { + fn test_union_equivalence_properties_constants_prefix() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) - // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC] .with_child_sort_and_const_exprs( // First child: [a ASC], const [] vec![vec!["a"]], @@ -2923,10 +3126,9 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_3() { + fn test_union_equivalence_properties_constants_asc_desc_mismatch() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) - // Meet ordering between [a ASC], [a DESC] should be [] .with_child_sort_and_const_exprs( // First child: [a ASC], const [] vec![vec!["a"]], @@ -2948,7 +3150,7 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_4() { + fn test_union_equivalence_properties_constants_different_schemas() { let schema = create_test_schema().unwrap(); let schema2 = append_fields(&schema, "1"); UnionEquivalenceTest::new(&schema) @@ -2965,11 +3167,10 @@ mod tests { &schema2, ) .with_expected_sort_and_const_exprs( - // Union orderings: - // should be [a ASC] + // Union orderings: [a ASC] // - // Where a, and a1 ath the same index for their corresponding - // schemas. + // Note that a, and a1 are at the same index for their + // corresponding schemas. vec![vec!["a"]], vec![], ) @@ -2977,9 +3178,7 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 - fn test_union_equivalence_properties_constants() { + fn test_union_equivalence_properties_constants_fill_gaps() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -3006,13 +3205,58 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 - fn test_union_equivalence_properties_constants_desc() { + fn test_union_equivalence_properties_constants_no_fill_gaps() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child orderings: [a ASC, c ASC], const [d] // some other constant + vec![vec!["a", "c"]], + vec!["d"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings: [b ASC, c ASC], const [a] + vec![vec!["b", "c"]], + vec!["a"], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings: [[a]] (only a is constant) + vec![vec!["a"]], + vec![], + ) + .run() + } + + #[test] + fn test_union_equivalence_properties_constants_fill_some_gaps() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child orderings: [c ASC], const [a, b] // some other constant + vec![vec!["c"]], + vec!["a", "b"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings: [a DESC, b], const [] + vec![vec!["a DESC", "b"]], + vec![], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings: [[a, b]] (can fill in the a/b with constants) + vec![vec!["a DESC", "b"]], + vec![], + ) + .run() + } + + #[test] + fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( - // NB `b DESC` in the second child // First child orderings: [a ASC, c ASC], const [b] vec![vec!["a", "c"]], vec!["b"], @@ -3036,9 +3280,7 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 - fn test_union_equivalence_properties_constants_middle() { + fn test_union_equivalence_properties_constants_gap_fill_symmetric() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -3055,8 +3297,8 @@ mod tests { ) .with_expected_sort_and_const_exprs( // Union orderings: - // [a, b, d] (c constant) - // [a, c, d] (b constant) + // [a, b, c, d] + // [a, c, b, d] vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]], vec![], ) @@ -3064,8 +3306,31 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 + fn test_union_equivalence_properties_constants_gap_fill_and_common() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child: [a DESC, d ASC], const [b, c] + vec![vec!["a DESC", "d"]], + vec!["b", "c"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child: [a DESC, c ASC, d ASC], const [b] + vec![vec!["a DESC", "c", "d"]], + vec!["b"], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings: + // [a DESC, c, d] [b] + vec![vec!["a DESC", "c", "d"]], + vec!["b"], + ) + .run() + } + + #[test] fn test_union_equivalence_properties_constants_middle_desc() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -3176,18 +3441,32 @@ mod tests { child_properties, expected_properties, } = self; + let expected_properties = expected_properties.expect("expected_properties not set"); - let actual_properties = - calculate_union(child_properties, Arc::clone(&output_schema)) - .expect("failed to calculate union equivalence properties"); - assert_eq_properties_same( - &actual_properties, - &expected_properties, - format!( - "expected: {expected_properties:?}\nactual: {actual_properties:?}" - ), - ); + + // try all permutations of the children + // as the code treats lhs and rhs differently + for child_properties in child_properties + .iter() + .cloned() + .permutations(child_properties.len()) + { + println!("--- permutation ---"); + for c in &child_properties { + println!("{c}"); + } + let actual_properties = + calculate_union(child_properties, Arc::clone(&output_schema)) + .expect("failed to calculate union equivalence properties"); + assert_eq_properties_same( + &actual_properties, + &expected_properties, + format!( + "expected: {expected_properties:?}\nactual: {actual_properties:?}" + ), + ); + } } /// Make equivalence properties for the specified columns named in orderings and constants