diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 35b9237938a8..49bedf420abf 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -662,12 +662,16 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; + use datafusion_common::{Result, ScalarValue}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use datafusion_physical_plan::memory::MemoryExec; + use datafusion_physical_plan::projection::ProjectionExec; + use itertools::Itertools; use rstest::rstest; fn create_test_schema() -> Result { @@ -1052,6 +1056,144 @@ mod tests { Ok(()) } + /// Return a `null` literal representing a struct type like: `{ name: Utf8 }` + fn struct_literal(name: &str) -> Arc { + let struct_literal = ScalarValue::try_from(DataType::Struct( + vec![Field::new(name, DataType::Utf8, false)].into(), + )) + .unwrap(); + + datafusion_physical_expr::expressions::lit(struct_literal) + } + + /// Build a tag field + fn build_tag_field(i: &usize) -> Field { + let name = format!("col{i}"); + Field::new( + format!("col{i}"), + DataType::Struct(vec![Field::new(&name, DataType::Utf8, false)].into()), + true, + ) + } + + // Convert each tuple to PhysicalSortExpr + fn convert_to_sort_exprs( + in_data: &[(&Arc, SortOptions)], + ) -> Vec { + in_data + .iter() + .map(|(expr, options)| PhysicalSortExpr { + expr: Arc::clone(*expr), + options: *options, + }) + .collect::>() + } + + use rand::distributions::DistString; + + #[tokio::test] + async fn test_dlw() -> Result<()> { + let build_schema = |included_tag_idx: Vec| -> Arc { + let mut fields = vec![Field::new("col0", DataType::Utf8, false)]; + let tag_fields = included_tag_idx + .iter() + .map(build_tag_field) + .collect::>(); + fields.extend_from_slice(&tag_fields); + Arc::new(Schema::new(fields)) + }; + + /* 1. schema containing all fields */ + let schema = build_schema((1..=27).into_iter().collect_vec()); + + let build_sort_expr = |cols_included: Vec| -> Vec { + let options = SortOptions::default(); + let cols_included = cols_included + .iter() + .map(|i| col(&format!("col{i}"), &schema).unwrap()) + .collect::>(); + + convert_to_sort_exprs( + &cols_included + .iter() + .map(|col| (col, options)) + .collect::>(), + ) + }; + + let make_child = |not_null: Vec| -> Result> { + // build sort ordering + let sort_orderings = build_sort_expr(not_null.clone()); + + /* build projection for union's input projection */ + // projections 1..=27 are either col are NULL + let union_child_projection = (1..=27) + .into_iter() + .map(|i| { + let name = format!("col{}", &i); + let expr = if not_null.contains(&i) { + col(&name, &schema).unwrap() + } else { + struct_literal(&name) + }; + (expr, name) + }) + .collect::>(); + // projection 1 is a constant, with a different value across each child + let constant = rand::distributions::Alphanumeric + .sample_string(&mut rand::thread_rng(), 16); + let union_child_projection = [ + vec![( + datafusion_physical_expr::expressions::lit(constant), + "col0".into(), + )], + union_child_projection, + ] + .concat(); + + Ok(Arc::new(ProjectionExec::try_new( + union_child_projection, + Arc::new( + MemoryExec::try_new(&[], Arc::clone(&schema), None)? + .with_sort_information(vec![sort_orderings]), + ), + )?)) + }; + + /* 2. build union children */ + // first 4 children exactly the same, except a different constant + let child = make_child(vec![9, 15, 19, 25, 26, 27])?; + let mut union_children = (0..4) + .into_iter() + .map(|_| Arc::clone(&child)) + .collect::>(); + // then a few children with some of the fields + union_children.push(make_child(vec![5, 9, 25, 26, 27])?); + union_children.push(make_child(vec![9, 25, 26, 27])?); + union_children.push(make_child(vec![9, 15, 19, 25, 26, 27])?); + // then the last child has all of the fields + let has_all_fields = (0..=27).into_iter().collect(); + union_children.push(make_child(has_all_fields)?); + + /* 3. create the `Sort(Union(children))` */ + let final_sort = build_sort_expr((1..=27).into_iter().collect()); + let sort_union_plan = sort_exec(final_sort, union_exec(union_children)); + + /* 4. run the enforce sorting rule */ + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let mut plan = sort_union_plan.clone(); + let rules = vec![ + Arc::new(EnforceDistribution::new()) as Arc, + Arc::new(EnforceSorting::new()) as Arc, + ]; + for rule in rules { + plan = rule.optimize(plan, state.config_options())?; + } + + Ok(()) + } + #[tokio::test] async fn test_remove_unnecessary_sort5() -> Result<()> { let left_schema = create_test_schema2()?; diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 4ab0cac58257..22f71bfef1a7 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1767,7 +1767,7 @@ impl UnionEquivalentOrderingBuilder { /// in the sort order and `b` was a constant). fn add_satisfied_orderings( &mut self, - orderings: impl IntoIterator, + orderings: impl IntoIterator + std::fmt::Debug, constants: &[ConstExpr], properties: &EquivalenceProperties, ) { @@ -2047,6 +2047,90 @@ mod tests { Ok(()) } + // since our example case was UNIONing many partitions which were mostly constants (due to gap filling), + // make a test case with many more constants, with sort orders have many more constants. + // Include constants which are also flipped ordering themselves. + #[test] + fn project_equivalence_properties_test_multi_with_constants() -> Result<()> { + // test multiple input orderings with equivalence properties + let input_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c1", DataType::Int64, true), + Field::new("c2", DataType::Int64, true), + Field::new("c3", DataType::Int64, true), + Field::new("c4", DataType::Int64, true), + Field::new("c5", DataType::Int64, true), + Field::new("c6", DataType::Int64, true), + Field::new("d", DataType::Int64, true), + Field::new("e", DataType::Int64, true), + ])); + + // have constants + let constants = vec![ + ConstExpr::from(col("c1", &input_schema).unwrap()), + ConstExpr::from(col("c2", &input_schema).unwrap()), + ConstExpr::from(col("c3", &input_schema).unwrap()), + ConstExpr::from(col("c4", &input_schema).unwrap()), + ConstExpr::from(col("c5", &input_schema).unwrap()), + ConstExpr::from(col("c6", &input_schema).unwrap()), + ]; + + let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema)) + .with_constants(constants); + + // add equivalent ordering [a, c, b, d] + input_properties.add_new_ordering(vec![ + parse_sort_expr("a", &input_schema), + parse_sort_expr("d", &input_schema), + parse_sort_expr("c2", &input_schema), // c is constant + parse_sort_expr("c1", &input_schema), // c is constant + parse_sort_expr("c3", &input_schema), // c is constant + parse_sort_expr("b", &input_schema), // NB b and c's are swapped + parse_sort_expr("e", &input_schema), + parse_sort_expr("c6", &input_schema), // c is constant + parse_sort_expr("c5", &input_schema), // c is constant + parse_sort_expr("c4", &input_schema), // c is constant + ]); + + // add equivalent ordering [a, b, c, d] + input_properties.add_new_ordering(vec![ + parse_sort_expr("a", &input_schema), + parse_sort_expr("b", &input_schema), + parse_sort_expr("c1", &input_schema), + parse_sort_expr("c2", &input_schema), + parse_sort_expr("c3", &input_schema), + parse_sort_expr("d", &input_schema), + parse_sort_expr("c4", &input_schema), // NB e and c's are swapped + parse_sort_expr("c5", &input_schema), + parse_sort_expr("c6", &input_schema), + parse_sort_expr("e", &input_schema), + ]); + + // simply project all the columns in order + let proj_exprs = vec![ + (col("a", &input_schema)?, "a".to_string()), + (col("b", &input_schema)?, "b".to_string()), + (col("c1", &input_schema)?, "c1".to_string()), + (col("c2", &input_schema)?, "c2".to_string()), + (col("c3", &input_schema)?, "c3".to_string()), + (col("c4", &input_schema)?, "c4".to_string()), + (col("c5", &input_schema)?, "c5".to_string()), + (col("c6", &input_schema)?, "c6".to_string()), + (col("d", &input_schema)?, "d".to_string()), + (col("e", &input_schema)?, "d".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; + let out_properties = input_properties.project(&projection_mapping, input_schema); + + assert_eq!( + out_properties.to_string(), + "order: [[a@0 ASC,b@1 ASC,d@8 ASC,d@9 ASC], [a@0 ASC,d@8 ASC,b@1 ASC,d@9 ASC]], const: [c1@2,c2@3,c3@4,c4@5,c5@6,c6@7]" + ); + + Ok(()) + } + #[test] fn test_join_equivalence_properties() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 895647864384..4ed85127a13f 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -600,6 +600,7 @@ mod tests { use super::*; use crate::collect; use crate::memory::MemoryExec; + use crate::projection::ProjectionExec; use crate::test; use arrow_schema::{DataType, SortOptions}; @@ -740,6 +741,117 @@ mod tests { assert_eq!(result, expected); } + /// Return a `null` literal representing a struct type like: `{ name: Utf8 }` + fn struct_literal(name: &str) -> Arc { + let struct_literal = ScalarValue::try_from(DataType::Struct( + vec![Field::new(name, DataType::Utf8, false)].into(), + )) + .unwrap(); + + datafusion_physical_expr::expressions::lit(struct_literal) + } + + /// Build a tag field + fn build_tag_field(i: &usize) -> Field { + let name = format!("col{i}"); + Field::new( + format!("col{i}"), + DataType::Struct(vec![Field::new(&name, DataType::Utf8, false)].into()), + true, + ) + } + + use rand::distributions::DistString; + + #[tokio::test] + async fn test_dlw() -> Result<()> { + let build_schema = |included_tag_idx: Vec| -> Arc { + let mut fields = vec![Field::new("col0", DataType::Utf8, false)]; + let tag_fields = included_tag_idx + .iter() + .map(build_tag_field) + .collect::>(); + fields.extend_from_slice(&tag_fields); + Arc::new(Schema::new(fields)) + }; + + // schema containing all fields + let schema = build_schema((1..=27).into_iter().collect_vec()); + let options = SortOptions::default(); + + let make_child = |not_null: Vec| -> Result> { + // build sort ordering + let not_null_cols = not_null + .iter() + .map(|i| col(&format!("col{i}"), &schema).unwrap()) + .collect::>(); + let sort_orderings = convert_to_sort_exprs( + ¬_null_cols + .iter() + .map(|col| (col, options)) + .collect::>(), + ); + + /* build projection for union's input projection */ + // projections 1..=27 are either col are NULL + let union_child_projection = (1..=27) + .into_iter() + .map(|i| { + let name = format!("col{}", &i); + let expr = if not_null.contains(&i) { + col(&name, &schema).unwrap() + } else { + struct_literal(&name) + }; + (expr, name) + }) + .collect::>(); + // projection 1 is a constant, with a different value across each child + let constant = rand::distributions::Alphanumeric + .sample_string(&mut rand::thread_rng(), 16); + let union_child_projection = [ + vec![( + datafusion_physical_expr::expressions::lit(constant), + "col0".into(), + )], + union_child_projection, + ] + .concat(); + + Ok(Arc::new(ProjectionExec::try_new( + union_child_projection, + Arc::new( + MemoryExec::try_new(&[], Arc::clone(&schema), None)? + .with_sort_information(vec![sort_orderings]), + ), + )?)) + }; + + // first 4 children exactly the same, except a different constant + let child = make_child(vec![9, 15, 19, 25, 26, 27])?; + let mut union_children = (0..4) + .into_iter() + .map(|_| Arc::clone(&child)) + .collect::>(); + + // then a few children with some of the fields + union_children.push(make_child(vec![5, 9, 25, 26, 27])?); + union_children.push(make_child(vec![9, 25, 26, 27])?); + union_children.push(make_child(vec![9, 15, 19, 25, 26, 27])?); + + // then the last child has all of the fields + let all_children = (0..=27).into_iter().collect(); + union_children.push(make_child(all_children)?); + + let props = UnionExec::compute_properties(&union_children, schema)?; + println!( + "equivalence_properties: {:?}", + props.equivalence_properties() + ); + + Ok(()) + } + #[tokio::test] async fn test_union_equivalence_properties() -> Result<()> { let schema = create_test_schema()?;