From 701abf72b118d2379c401de3bd12d3ce977d1bae Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Fri, 5 Apr 2024 09:18:37 +0300 Subject: [PATCH 1/2] Prune out constant expressions from output ordering. (#9947) * Initial commit * Add new unit test * Make requirement explicit * Minor changes * Minor changes --- .../enforce_distribution.rs | 96 ++++++++++++------- .../src/equivalence/properties.rs | 10 ++ datafusion/physical-plan/src/lib.rs | 4 +- 3 files changed, 76 insertions(+), 34 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index a58f8698d6ce..145f08af76dd 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1327,11 +1327,6 @@ pub(crate) mod tests { } impl SortRequiredExec { - fn new(input: Arc) -> Self { - let expr = input.output_ordering().unwrap_or(&[]).to_vec(); - Self::new_with_requirement(input, expr) - } - fn new_with_requirement( input: Arc, requirement: Vec, @@ -1391,10 +1386,11 @@ pub(crate) mod tests { // model that it requires the output ordering of its input fn required_input_ordering(&self) -> Vec>> { - vec![self - .properties() - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs)] + if self.expr.is_empty() { + vec![None] + } else { + vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))] + } } fn with_new_children( @@ -1677,10 +1673,6 @@ pub(crate) mod tests { Arc::new(UnionExec::new(input)) } - fn sort_required_exec(input: Arc) -> Arc { - Arc::new(SortRequiredExec::new(input)) - } - fn sort_required_exec_with_req( input: Arc, sort_exprs: LexOrdering, @@ -3206,8 +3198,10 @@ pub(crate) mod tests { expr: col("c", &schema).unwrap(), options: SortOptions::default(), }]; - let plan = - sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(), false))); + let plan = sort_required_exec_with_req( + filter_exec(sort_exec(sort_key.clone(), parquet_exec(), false)), + sort_key, + ); let expected = &[ "SortRequiredExec: [c@2 ASC]", @@ -3367,18 +3361,20 @@ pub(crate) mod tests { // Parquet(sorted) let schema = schema(); let sort_key = vec![PhysicalSortExpr { - expr: col("c", &schema).unwrap(), + expr: col("d", &schema).unwrap(), options: SortOptions::default(), }]; - let plan = - sort_required_exec(filter_exec(parquet_exec_with_sort(vec![sort_key]))); + let plan = sort_required_exec_with_req( + filter_exec(parquet_exec_with_sort(vec![sort_key.clone()])), + sort_key, + ); // during repartitioning ordering is preserved let expected = &[ - "SortRequiredExec: [c@2 ASC]", + "SortRequiredExec: [d@3 ASC]", "FilterExec: c@2 = 0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC]", ]; assert_optimized!(expected, plan.clone(), true, true); @@ -3403,7 +3399,10 @@ pub(crate) mod tests { expr: col("c", &schema).unwrap(), options: SortOptions::default(), }]; - let input1 = sort_required_exec(parquet_exec_with_sort(vec![sort_key])); + let input1 = sort_required_exec_with_req( + parquet_exec_with_sort(vec![sort_key.clone()]), + sort_key, + ); let input2 = filter_exec(parquet_exec()); let plan = union_exec(vec![input1, input2]); @@ -3481,10 +3480,13 @@ pub(crate) mod tests { ("c".to_string(), "c".to_string()), ]; // sorted input - let plan = sort_required_exec(projection_exec_with_alias( - parquet_exec_multiple_sorted(vec![sort_key]), - alias, - )); + let plan = sort_required_exec_with_req( + projection_exec_with_alias( + parquet_exec_multiple_sorted(vec![sort_key.clone()]), + alias, + ), + sort_key, + ); let expected = &[ "SortRequiredExec: [c@2 ASC]", @@ -3639,8 +3641,8 @@ pub(crate) mod tests { options: SortOptions::default(), }]; - let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key])); - let plan = sort_required_exec(plan); + let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()])); + let plan = sort_required_exec_with_req(plan, sort_key); // The groups must have only contiguous ranges of rows from the same file // if any group has rows from multiple files, the data is no longer sorted destroyed @@ -4025,9 +4027,14 @@ pub(crate) mod tests { }]; // SortRequired // Parquet(sorted) - let plan_parquet = - sort_required_exec(parquet_exec_with_sort(vec![sort_key.clone()])); - let plan_csv = sort_required_exec(csv_exec_with_sort(vec![sort_key])); + let plan_parquet = sort_required_exec_with_req( + parquet_exec_with_sort(vec![sort_key.clone()]), + sort_key.clone(), + ); + let plan_csv = sort_required_exec_with_req( + csv_exec_with_sort(vec![sort_key.clone()]), + sort_key, + ); // no parallelization, because SortRequiredExec doesn't benefit from increased parallelism let expected_parquet = &[ @@ -4150,7 +4157,7 @@ pub(crate) mod tests { } #[test] - fn preserve_ordering_through_repartition() -> Result<()> { + fn remove_unnecessary_spm_after_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { expr: col("c", &schema).unwrap(), @@ -4159,8 +4166,10 @@ pub(crate) mod tests { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + // Original plan expects its output to be ordered by c@2 ASC. + // This is still satisfied since, after filter that column is constant. let expected = &[ - "SortPreservingMergeExec: [c@2 ASC]", + "CoalescePartitionsExec", "FilterExec: c@2 = 0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", @@ -4172,6 +4181,29 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn preserve_ordering_through_repartition() -> Result<()> { + let schema = schema(); + let sort_key = vec![PhysicalSortExpr { + expr: col("d", &schema).unwrap(), + options: SortOptions::default(), + }]; + let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); + let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + + let expected = &[ + "SortPreservingMergeExec: [d@3 ASC]", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC]", + ]; + // last flag sets config.optimizer.PREFER_EXISTING_SORT + assert_optimized!(expected, physical_plan.clone(), true, true); + assert_optimized!(expected, physical_plan, false, true); + + Ok(()) + } + #[test] fn do_not_preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 5eb9d6eb1b86..7ce540b267b2 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -134,6 +134,16 @@ impl EquivalenceProperties { &self.constants } + /// Returns the output ordering of the properties. + pub fn output_ordering(&self) -> Option { + let constants = self.constants(); + let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default(); + // Prune out constant expressions + output_ordering + .retain(|sort_expr| !physical_exprs_contains(constants, &sort_expr.expr)); + (!output_ordering.is_empty()).then_some(output_ordering) + } + /// Returns the normalized version of the ordering equivalence class within. /// Normalization removes constants and duplicates as well as standardizing /// expressions according to the equivalence group within. diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 3e8e439c9a38..3decf2e34015 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -574,7 +574,7 @@ impl PlanProperties { execution_mode: ExecutionMode, ) -> Self { // Output ordering can be derived from `eq_properties`. - let output_ordering = eq_properties.oeq_class().output_ordering(); + let output_ordering = eq_properties.output_ordering(); Self { eq_properties, partitioning, @@ -599,7 +599,7 @@ impl PlanProperties { pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { // Changing equivalence properties also changes output ordering, so // make sure to overwrite it: - self.output_ordering = eq_properties.oeq_class().output_ordering(); + self.output_ordering = eq_properties.output_ordering(); self.eq_properties = eq_properties; self } From 1dd535451198777693ac962a21b79a680bd4b7e8 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 5 Apr 2024 17:57:49 +0800 Subject: [PATCH 2/2] Move `AggregateExpr`, `PhysicalExpr` and `PhysicalSortExpr` to physical-expr-core (#9926) * move PhysicalExpr Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * move physical sort Signed-off-by: jayzhan211 * cleanup dependencies Signed-off-by: jayzhan211 * add readme Signed-off-by: jayzhan211 * disable doc test Signed-off-by: jayzhan211 * move column Signed-off-by: jayzhan211 * fmt Signed-off-by: jayzhan211 * move aggregatexp Signed-off-by: jayzhan211 * move other two utils Signed-off-by: jayzhan211 * license Signed-off-by: jayzhan211 * switch to ignore Signed-off-by: jayzhan211 * move reverse order Signed-off-by: jayzhan211 * rename to common Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- Cargo.toml | 2 + datafusion-cli/Cargo.lock | 10 + datafusion/physical-expr-common/Cargo.toml | 38 +++ datafusion/physical-expr-common/README.md | 27 ++ .../physical-expr-common/src/aggregate/mod.rs | 102 +++++++ .../src/aggregate/utils.rs | 69 +++++ .../src/expressions/column.rs | 137 ++++++++++ .../src/expressions/mod.rs | 18 ++ datafusion/physical-expr-common/src/lib.rs | 24 ++ .../physical-expr-common/src/physical_expr.rs | 211 +++++++++++++++ .../src/sort_expr.rs | 6 +- .../src/sort_properties.rs | 4 +- .../src/tree_node.rs | 0 datafusion/physical-expr-common/src/utils.rs | 156 +++++++++++ datafusion/physical-expr/Cargo.toml | 1 + datafusion/physical-expr/src/aggregate/mod.rs | 80 +----- .../physical-expr/src/aggregate/utils.rs | 50 +--- .../physical-expr/src/expressions/column.rs | 106 -------- .../physical-expr/src/expressions/mod.rs | 3 +- datafusion/physical-expr/src/lib.rs | 32 ++- datafusion/physical-expr/src/physical_expr.rs | 251 +----------------- datafusion/physical-expr/src/planner.rs | 61 +++++ datafusion/physical-expr/src/utils/mod.rs | 129 +-------- 23 files changed, 896 insertions(+), 621 deletions(-) create mode 100644 datafusion/physical-expr-common/Cargo.toml create mode 100644 datafusion/physical-expr-common/README.md create mode 100644 datafusion/physical-expr-common/src/aggregate/mod.rs create mode 100644 datafusion/physical-expr-common/src/aggregate/utils.rs create mode 100644 datafusion/physical-expr-common/src/expressions/column.rs create mode 100644 datafusion/physical-expr-common/src/expressions/mod.rs create mode 100644 datafusion/physical-expr-common/src/lib.rs create mode 100644 datafusion/physical-expr-common/src/physical_expr.rs rename datafusion/{physical-expr => physical-expr-common}/src/sort_expr.rs (99%) rename datafusion/{physical-expr => physical-expr-common}/src/sort_properties.rs (99%) rename datafusion/{physical-expr => physical-expr-common}/src/tree_node.rs (100%) create mode 100644 datafusion/physical-expr-common/src/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 9df489724d46..ca34ea9c2a24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "datafusion/functions", "datafusion/functions-array", "datafusion/optimizer", + "datafusion/physical-expr-common", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", @@ -80,6 +81,7 @@ datafusion-functions = { path = "datafusion/functions", version = "37.0.0" } datafusion-functions-array = { path = "datafusion/functions-array", version = "37.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "37.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "37.0.0", default-features = false } +datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "37.0.0", default-features = false } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "37.0.0" } datafusion-proto = { path = "datafusion/proto", version = "37.0.0" } datafusion-sql = { path = "datafusion/sql", version = "37.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 3be92221d3ee..d744a891c6a6 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1331,6 +1331,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-physical-expr-common", "half", "hashbrown 0.14.3", "hex", @@ -1345,6 +1346,15 @@ dependencies = [ "sha2", ] +[[package]] +name = "datafusion-physical-expr-common" +version = "37.0.0" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-expr", +] + [[package]] name = "datafusion-physical-plan" version = "37.0.0" diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml new file mode 100644 index 000000000000..89a41a5d10ce --- /dev/null +++ b/datafusion/physical-expr-common/Cargo.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-physical-expr-common" +description = "Common functionality of physical expression for DataFusion query engine" +keywords = ["arrow", "query", "sql"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lib] +name = "datafusion_physical_expr_common" +path = "src/lib.rs" + +[dependencies] +arrow = { workspace = true } +datafusion-common = { workspace = true, default-features = true } +datafusion-expr = { workspace = true } diff --git a/datafusion/physical-expr-common/README.md b/datafusion/physical-expr-common/README.md new file mode 100644 index 000000000000..7a1eff77d3b4 --- /dev/null +++ b/datafusion/physical-expr-common/README.md @@ -0,0 +1,27 @@ + + +# DataFusion Core Physical Expressions + +[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate is a submodule of DataFusion that provides shared APIs for implementing +physical expressions such as `PhysicalExpr` and `PhysicalSortExpr`. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs new file mode 100644 index 000000000000..579f51815d84 --- /dev/null +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod utils; + +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +use crate::physical_expr::PhysicalExpr; +use crate::sort_expr::PhysicalSortExpr; + +use arrow::datatypes::Field; +use datafusion_common::{not_impl_err, Result}; +use datafusion_expr::{Accumulator, GroupsAccumulator}; + +/// An aggregate expression that: +/// * knows its resulting field +/// * knows how to create its accumulator +/// * knows its accumulator's state's field +/// * knows the expressions from whose its accumulator will receive values +/// +/// Any implementation of this trait also needs to implement the +/// `PartialEq` to allows comparing equality between the +/// trait objects. +pub trait AggregateExpr: Send + Sync + Debug + PartialEq { + /// Returns the aggregate expression as [`Any`] so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// the field of the final result of this aggregation. + fn field(&self) -> Result; + + /// the accumulator used to accumulate values from the expressions. + /// the accumulator expects the same number of arguments as `expressions` and must + /// return states with the same description as `state_fields` + fn create_accumulator(&self) -> Result>; + + /// the fields that encapsulate the Accumulator's state + /// the number of fields here equals the number of states that the accumulator contains + fn state_fields(&self) -> Result>; + + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + + /// Order by requirements for the aggregate function + /// By default it is `None` (there is no requirement) + /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this + fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + /// Human readable name such as `"MIN(c2)"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "AggregateExpr: default name" + } + + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. + fn groups_accumulator_supported(&self) -> bool { + false + } + + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. + fn create_groups_accumulator(&self) -> Result> { + not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") + } + + /// Construct an expression that calculates the aggregate in reverse. + /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). + /// For aggregates that do not support calculation in reverse, + /// returns None (which is the default value). + fn reverse_expr(&self) -> Option> { + None + } + + /// Creates accumulator implementation that supports retract + fn create_sliding_accumulator(&self) -> Result> { + not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") + } +} diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs b/datafusion/physical-expr-common/src/aggregate/utils.rs new file mode 100644 index 000000000000..9821ba626b18 --- /dev/null +++ b/datafusion/physical-expr-common/src/aggregate/utils.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use arrow::{ + compute::SortOptions, + datatypes::{DataType, Field}, +}; + +use crate::sort_expr::PhysicalSortExpr; + +use super::AggregateExpr; + +/// Downcast a `Box` or `Arc` +/// and return the inner trait object as [`Any`] so +/// that it can be downcast to a specific implementation. +/// +/// This method is used when implementing the `PartialEq` +/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality +/// between the trait objects. +pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { + if let Some(obj) = any.downcast_ref::>() { + obj.as_any() + } else if let Some(obj) = any.downcast_ref::>() { + obj.as_any() + } else { + any + } +} + +/// Construct corresponding fields for lexicographical ordering requirement expression +pub fn ordering_fields( + ordering_req: &[PhysicalSortExpr], + // Data type of each expression in the ordering requirement + data_types: &[DataType], +) -> Vec { + ordering_req + .iter() + .zip(data_types.iter()) + .map(|(sort_expr, dtype)| { + Field::new( + sort_expr.expr.to_string().as_str(), + dtype.clone(), + // Multi partitions may be empty hence field should be nullable. + true, + ) + }) + .collect() +} + +/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. +pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { + ordering_req.iter().map(|item| item.options).collect() +} diff --git a/datafusion/physical-expr-common/src/expressions/column.rs b/datafusion/physical-expr-common/src/expressions/column.rs new file mode 100644 index 000000000000..2cd52d6332fb --- /dev/null +++ b/datafusion/physical-expr-common/src/expressions/column.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Column expression + +use std::any::Any; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::{ + datatypes::{DataType, Schema}, + record_batch::RecordBatch, +}; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::ColumnarValue; + +use crate::physical_expr::{down_cast_any_ref, PhysicalExpr}; + +/// Represents the column at a given index in a RecordBatch +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +pub struct Column { + name: String, + index: usize, +} + +impl Column { + /// Create a new column expression + pub fn new(name: &str, index: usize) -> Self { + Self { + name: name.to_owned(), + index, + } + } + + /// Create a new column expression based on column name and schema + pub fn new_with_schema(name: &str, schema: &Schema) -> Result { + Ok(Column::new(name, schema.index_of(name)?)) + } + + /// Get the column name + pub fn name(&self) -> &str { + &self.name + } + + /// Get the column index + pub fn index(&self) -> usize { + self.index + } +} + +impl std::fmt::Display for Column { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}@{}", self.name, self.index) + } +} + +impl PhysicalExpr for Column { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn std::any::Any { + self + } + + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, input_schema: &Schema) -> Result { + self.bounds_check(input_schema)?; + Ok(input_schema.field(self.index).data_type().clone()) + } + + /// Decide whehter this expression is nullable, given the schema of the input + fn nullable(&self, input_schema: &Schema) -> Result { + self.bounds_check(input_schema)?; + Ok(input_schema.field(self.index).is_nullable()) + } + + /// Evaluate the expression + fn evaluate(&self, batch: &RecordBatch) -> Result { + self.bounds_check(batch.schema().as_ref())?; + Ok(ColumnarValue::Array(batch.column(self.index).clone())) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } +} + +impl PartialEq for Column { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self == x) + .unwrap_or(false) + } +} + +impl Column { + fn bounds_check(&self, input_schema: &Schema) -> Result<()> { + if self.index < input_schema.fields.len() { + Ok(()) + } else { + internal_err!( + "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}", + self.name, + self.index, input_schema.fields.len(), input_schema.fields().iter().map(|f| f.name().clone()).collect::>()) + } + } +} + +/// Create a column expression +pub fn col(name: &str, schema: &Schema) -> Result> { + Ok(Arc::new(Column::new_with_schema(name, schema)?)) +} diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs b/datafusion/physical-expr-common/src/expressions/mod.rs new file mode 100644 index 000000000000..d102422081dc --- /dev/null +++ b/datafusion/physical-expr-common/src/expressions/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod column; diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs new file mode 100644 index 000000000000..53e3134a1b05 --- /dev/null +++ b/datafusion/physical-expr-common/src/lib.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod aggregate; +pub mod expressions; +pub mod physical_expr; +pub mod sort_expr; +pub mod sort_properties; +pub mod tree_node; +pub mod utils; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs new file mode 100644 index 000000000000..be6358e73c99 --- /dev/null +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Display}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::BooleanArray; +use arrow::compute::filter_record_batch; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion_common::utils::DataPtr; +use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::ColumnarValue; + +use crate::sort_properties::SortProperties; +use crate::utils::scatter; + +/// See [create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html) +/// for examples of creating `PhysicalExpr` from `Expr` +pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { + /// Returns the physical expression as [`Any`] so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, input_schema: &Schema) -> Result; + /// Determine whether this expression is nullable, given the schema of the input + fn nullable(&self, input_schema: &Schema) -> Result; + /// Evaluate an expression against a RecordBatch + fn evaluate(&self, batch: &RecordBatch) -> Result; + /// Evaluate an expression against a RecordBatch after first applying a + /// validity array + fn evaluate_selection( + &self, + batch: &RecordBatch, + selection: &BooleanArray, + ) -> Result { + let tmp_batch = filter_record_batch(batch, selection)?; + + let tmp_result = self.evaluate(&tmp_batch)?; + + if batch.num_rows() == tmp_batch.num_rows() { + // All values from the `selection` filter are true. + Ok(tmp_result) + } else if let ColumnarValue::Array(a) = tmp_result { + scatter(selection, a.as_ref()).map(ColumnarValue::Array) + } else { + Ok(tmp_result) + } + } + + /// Get a list of child PhysicalExpr that provide the input for this expr. + fn children(&self) -> Vec>; + + /// Returns a new PhysicalExpr where all children were replaced by new exprs. + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result>; + + /// Computes the output interval for the expression, given the input + /// intervals. + /// + /// # Arguments + /// + /// * `children` are the intervals for the children (inputs) of this + /// expression. + /// + /// # Example + /// + /// If the expression is `a + b`, and the input intervals are `a: [1, 2]` + /// and `b: [3, 4]`, then the output interval would be `[4, 6]`. + fn evaluate_bounds(&self, _children: &[&Interval]) -> Result { + not_impl_err!("Not implemented for {self}") + } + + /// Updates bounds for child expressions, given a known interval for this + /// expression. + /// + /// This is used to propagate constraints down through an expression tree. + /// + /// # Arguments + /// + /// * `interval` is the currently known interval for this expression. + /// * `children` are the current intervals for the children of this expression. + /// + /// # Returns + /// + /// A `Vec` of new intervals for the children, in order. + /// + /// If constraint propagation reveals an infeasibility for any child, returns + /// [`None`]. If none of the children intervals change as a result of propagation, + /// may return an empty vector instead of cloning `children`. This is the default + /// (and conservative) return value. + /// + /// # Example + /// + /// If the expression is `a + b`, the current `interval` is `[4, 5]` and the + /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, then + /// propagation would would return `[0, 2]` and `[2, 4]` as `b` must be at + /// least `2` to make the output at least `4`. + fn propagate_constraints( + &self, + _interval: &Interval, + _children: &[&Interval], + ) -> Result>> { + Ok(Some(vec![])) + } + + /// Update the hash `state` with this expression requirements from + /// [`Hash`]. + /// + /// This method is required to support hashing [`PhysicalExpr`]s. To + /// implement it, typically the type implementing + /// [`PhysicalExpr`] implements [`Hash`] and + /// then the following boiler plate is used: + /// + /// # Example: + /// ``` + /// // User defined expression that derives Hash + /// #[derive(Hash, Debug, PartialEq, Eq)] + /// struct MyExpr { + /// val: u64 + /// } + /// + /// // impl PhysicalExpr { + /// // ... + /// # impl MyExpr { + /// // Boiler plate to call the derived Hash impl + /// fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + /// use std::hash::Hash; + /// let mut s = state; + /// self.hash(&mut s); + /// } + /// // } + /// # } + /// ``` + /// Note: [`PhysicalExpr`] is not constrained by [`Hash`] + /// directly because it must remain object safe. + fn dyn_hash(&self, _state: &mut dyn Hasher); + + /// The order information of a PhysicalExpr can be estimated from its children. + /// This is especially helpful for projection expressions. If we can ensure that the + /// order of a PhysicalExpr to project matches with the order of SortExec, we can + /// eliminate that SortExecs. + /// + /// By recursively calling this function, we can obtain the overall order + /// information of the PhysicalExpr. Since `SortOptions` cannot fully handle + /// the propagation of unordered columns and literals, the `SortProperties` + /// struct is used. + fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties { + SortProperties::Unordered + } +} + +impl Hash for dyn PhysicalExpr { + fn hash(&self, state: &mut H) { + self.dyn_hash(state); + } +} + +/// Returns a copy of this expr if we change any child according to the pointer comparison. +/// The size of `children` must be equal to the size of `PhysicalExpr::children()`. +pub fn with_new_children_if_necessary( + expr: Arc, + children: Vec>, +) -> Result> { + let old_children = expr.children(); + if children.len() != old_children.len() { + internal_err!("PhysicalExpr: Wrong number of children") + } else if children.is_empty() + || children + .iter() + .zip(old_children.iter()) + .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2)) + { + Ok(expr.with_new_children(children)?) + } else { + Ok(expr) + } +} + +pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { + if any.is::>() { + any.downcast_ref::>() + .unwrap() + .as_any() + } else if any.is::>() { + any.downcast_ref::>() + .unwrap() + .as_any() + } else { + any + } +} diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs similarity index 99% rename from datafusion/physical-expr/src/sort_expr.rs rename to datafusion/physical-expr-common/src/sort_expr.rs index 914d76f9261a..1e1187212d96 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -21,14 +21,14 @@ use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use crate::PhysicalExpr; - use arrow::compute::kernels::sort::{SortColumn, SortOptions}; +use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; -use arrow_schema::Schema; use datafusion_common::Result; use datafusion_expr::ColumnarValue; +use crate::physical_expr::PhysicalExpr; + /// Represents Sort operation for a column in a RecordBatch #[derive(Clone, Debug)] pub struct PhysicalSortExpr { diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr-common/src/sort_properties.rs similarity index 99% rename from datafusion/physical-expr/src/sort_properties.rs rename to datafusion/physical-expr-common/src/sort_properties.rs index 4df29ced2f01..47a5d5ba5e3b 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr-common/src/sort_properties.rs @@ -17,9 +17,9 @@ use std::ops::Neg; -use crate::tree_node::ExprContext; +use arrow::compute::SortOptions; -use arrow_schema::SortOptions; +use crate::tree_node::ExprContext; /// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient /// to simply use `Option`: There must be a differentiation between diff --git a/datafusion/physical-expr/src/tree_node.rs b/datafusion/physical-expr-common/src/tree_node.rs similarity index 100% rename from datafusion/physical-expr/src/tree_node.rs rename to datafusion/physical-expr-common/src/tree_node.rs diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs new file mode 100644 index 000000000000..459b5a4849cb --- /dev/null +++ b/datafusion/physical-expr-common/src/utils.rs @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::{ + array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}, + compute::{and_kleene, is_not_null, SlicesIterator}, +}; +use datafusion_common::Result; + +use crate::sort_expr::PhysicalSortExpr; + +/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy` +/// are taken, when the mask evaluates `false` values null values are filled. +/// +/// # Arguments +/// * `mask` - Boolean values used to determine where to put the `truthy` values +/// * `truthy` - All values of this array are to scatter according to `mask` into final result. +pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { + let truthy = truthy.to_data(); + + // update the mask so that any null values become false + // (SlicesIterator doesn't respect nulls) + let mask = and_kleene(mask, &is_not_null(mask)?)?; + + let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len()); + + // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to + // fill with falsy values + + // keep track of how much is filled + let mut filled = 0; + // keep track of current position we have in truthy array + let mut true_pos = 0; + + SlicesIterator::new(&mask).for_each(|(start, end)| { + // the gap needs to be filled with nulls + if start > filled { + mutable.extend_nulls(start - filled); + } + // fill with truthy values + let len = end - start; + mutable.extend(0, true_pos, true_pos + len); + true_pos += len; + filled = end; + }); + // the remaining part is falsy + if filled < mask.len() { + mutable.extend_nulls(mask.len() - filled); + } + + let data = mutable.freeze(); + Ok(make_array(data)) +} + +/// Reverses the ORDER BY expression, which is useful during equivalent window +/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into +/// 'ORDER BY a DESC, NULLS FIRST'. +pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec { + order_bys + .iter() + .map(|e| PhysicalSortExpr { + expr: e.expr.clone(), + options: !e.options, + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::Int32Array; + use datafusion_common::cast::{as_boolean_array, as_int32_array}; + + use super::*; + + #[test] + fn scatter_int() -> Result<()> { + let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100])); + let mask = BooleanArray::from(vec![true, true, false, false, true]); + + // the output array is expected to be the same length as the mask array + let expected = + Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]); + let result = scatter(&mask, truthy.as_ref())?; + let result = as_int32_array(&result)?; + + assert_eq!(&expected, result); + Ok(()) + } + + #[test] + fn scatter_int_end_with_false() -> Result<()> { + let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100])); + let mask = BooleanArray::from(vec![true, false, true, false, false, false]); + + // output should be same length as mask + let expected = + Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]); + let result = scatter(&mask, truthy.as_ref())?; + let result = as_int32_array(&result)?; + + assert_eq!(&expected, result); + Ok(()) + } + + #[test] + fn scatter_with_null_mask() -> Result<()> { + let truthy = Arc::new(Int32Array::from(vec![1, 10, 11])); + let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None] + .into_iter() + .collect(); + + // output should treat nulls as though they are false + let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]); + let result = scatter(&mask, truthy.as_ref())?; + let result = as_int32_array(&result)?; + + assert_eq!(&expected, result); + Ok(()) + } + + #[test] + fn scatter_boolean() -> Result<()> { + let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true])); + let mask = BooleanArray::from(vec![true, true, false, false, true]); + + // the output array is expected to be the same length as the mask array + let expected = BooleanArray::from_iter(vec![ + Some(false), + Some(false), + None, + None, + Some(false), + ]); + let result = scatter(&mask, truthy.as_ref())?; + let result = as_boolean_array(&result)?; + + assert_eq!(&expected, result); + Ok(()) + } +} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 56b3f3c91eee..87d73183d0dd 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -59,6 +59,7 @@ chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } half = { workspace = true } hashbrown = { version = "0.14", features = ["raw"] } hex = { version = "0.4", optional = true } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 893178f29d08..e176084ae6ec 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,16 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; -use std::fmt::Debug; use std::sync::Arc; use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg}; -use crate::{PhysicalExpr, PhysicalSortExpr}; -use arrow::datatypes::Field; -use datafusion_common::{not_impl_err, Result}; -use datafusion_expr::{Accumulator, GroupsAccumulator}; +pub use datafusion_physical_expr_common::aggregate::AggregateExpr; mod hyperloglog; mod tdigest; @@ -62,79 +57,6 @@ pub mod build_in; pub mod moving_min_max; pub mod utils; -/// An aggregate expression that: -/// * knows its resulting field -/// * knows how to create its accumulator -/// * knows its accumulator's state's field -/// * knows the expressions from whose its accumulator will receive values -/// -/// Any implementation of this trait also needs to implement the -/// `PartialEq` to allows comparing equality between the -/// trait objects. -pub trait AggregateExpr: Send + Sync + Debug + PartialEq { - /// Returns the aggregate expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// the accumulator used to accumulate values from the expressions. - /// the accumulator expects the same number of arguments as `expressions` and must - /// return states with the same description as `state_fields` - fn create_accumulator(&self) -> Result>; - - /// the fields that encapsulate the Accumulator's state - /// the number of fields here equals the number of states that the accumulator contains - fn state_fields(&self) -> Result>; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Order by requirements for the aggregate function - /// By default it is `None` (there is no requirement) - /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - /// Human readable name such as `"MIN(c2)"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "AggregateExpr: default name" - } - - /// If the aggregate expression has a specialized - /// [`GroupsAccumulator`] implementation. If this returns true, - /// `[Self::create_groups_accumulator`] will be called. - fn groups_accumulator_supported(&self) -> bool { - false - } - - /// Return a specialized [`GroupsAccumulator`] that manages state - /// for all groups. - /// - /// For maximum performance, a [`GroupsAccumulator`] should be - /// implemented in addition to [`Accumulator`]. - fn create_groups_accumulator(&self) -> Result> { - not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") - } - - /// Construct an expression that calculates the aggregate in reverse. - /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). - /// For aggregates that do not support calculation in reverse, - /// returns None (which is the default value). - fn reverse_expr(&self) -> Option> { - None - } - - /// Creates accumulator implementation that supports retract - fn create_sliding_accumulator(&self) -> Result> { - not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") - } -} - /// Checks whether the given aggregate expression is order-sensitive. /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. /// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering. diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index 613f6118e907..d14a52f5752d 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -17,10 +17,12 @@ //! Utilities used in aggregates -use std::any::Any; use std::sync::Arc; -use crate::{AggregateExpr, PhysicalSortExpr}; +// For backwards compatibility +pub use datafusion_physical_expr_common::aggregate::utils::down_cast_any_ref; +pub use datafusion_physical_expr_common::aggregate::utils::get_sort_options; +pub use datafusion_physical_expr_common::aggregate::utils::ordering_fields; use arrow::array::{ArrayRef, ArrowNativeTypeOp}; use arrow_array::cast::AsArray; @@ -29,7 +31,7 @@ use arrow_array::types::{ TimestampNanosecondType, TimestampSecondType, }; use arrow_buffer::{ArrowNativeType, ToByteSlice}; -use arrow_schema::{DataType, Field, SortOptions}; +use arrow_schema::DataType; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; @@ -170,48 +172,6 @@ pub fn adjust_output_array( Ok(array) } -/// Downcast a `Box` or `Arc` -/// and return the inner trait object as [`Any`] so -/// that it can be downcast to a specific implementation. -/// -/// This method is used when implementing the `PartialEq` -/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality -/// between the trait objects. -pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { - if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else { - any - } -} - -/// Construct corresponding fields for lexicographical ordering requirement expression -pub fn ordering_fields( - ordering_req: &[PhysicalSortExpr], - // Data type of each expression in the ordering requirement - data_types: &[DataType], -) -> Vec { - ordering_req - .iter() - .zip(data_types.iter()) - .map(|(sort_expr, dtype)| { - Field::new( - sort_expr.expr.to_string().as_str(), - dtype.clone(), - // Multi partitions may be empty hence field should be nullable. - true, - ) - }) - .collect() -} - -/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. -pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { - ordering_req.iter().map(|item| item.options).collect() -} - /// A wrapper around a type to provide hash for floats #[derive(Copy, Clone, Debug)] pub(crate) struct Hashable(pub T); diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index a07f36e785e3..634a56d1d683 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -31,107 +31,6 @@ use arrow::{ use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; -/// Represents the column at a given index in a RecordBatch -#[derive(Debug, Hash, PartialEq, Eq, Clone)] -pub struct Column { - name: String, - index: usize, -} - -impl Column { - /// Create a new column expression - pub fn new(name: &str, index: usize) -> Self { - Self { - name: name.to_owned(), - index, - } - } - - /// Create a new column expression based on column name and schema - pub fn new_with_schema(name: &str, schema: &Schema) -> Result { - Ok(Column::new(name, schema.index_of(name)?)) - } - - /// Get the column name - pub fn name(&self) -> &str { - &self.name - } - - /// Get the column index - pub fn index(&self) -> usize { - self.index - } -} - -impl std::fmt::Display for Column { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}@{}", self.name, self.index) - } -} - -impl PhysicalExpr for Column { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn std::any::Any { - self - } - - /// Get the data type of this expression, given the schema of the input - fn data_type(&self, input_schema: &Schema) -> Result { - self.bounds_check(input_schema)?; - Ok(input_schema.field(self.index).data_type().clone()) - } - - /// Decide whehter this expression is nullable, given the schema of the input - fn nullable(&self, input_schema: &Schema) -> Result { - self.bounds_check(input_schema)?; - Ok(input_schema.field(self.index).is_nullable()) - } - - /// Evaluate the expression - fn evaluate(&self, batch: &RecordBatch) -> Result { - self.bounds_check(batch.schema().as_ref())?; - Ok(ColumnarValue::Array(batch.column(self.index).clone())) - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> Result> { - Ok(self) - } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.hash(&mut s); - } -} - -impl PartialEq for Column { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self == x) - .unwrap_or(false) - } -} - -impl Column { - fn bounds_check(&self, input_schema: &Schema) -> Result<()> { - if self.index < input_schema.fields.len() { - Ok(()) - } else { - internal_err!( - "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}", - self.name, - self.index, input_schema.fields.len(), input_schema.fields().iter().map(|f| f.name().clone()).collect::>()) - } - } -} - #[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct UnKnownColumn { name: String, @@ -204,11 +103,6 @@ impl PartialEq for UnKnownColumn { } } -/// Create a column expression -pub fn col(name: &str, schema: &Schema) -> Result> { - Ok(Arc::new(Column::new_with_schema(name, schema)?)) -} - #[cfg(test)] mod test { use crate::expressions::Column; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 7c4ea07dfbcb..f0cc4b175ea5 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -80,7 +80,8 @@ pub use crate::PhysicalSortExpr; pub use binary::{binary, BinaryExpr}; pub use case::{case, CaseExpr}; pub use cast::{cast, cast_with_options, CastExpr}; -pub use column::{col, Column, UnKnownColumn}; +pub use column::UnKnownColumn; +pub use datafusion_physical_expr_common::expressions::column::{col, Column}; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 655771270a6b..c88f1b32bbc6 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -28,10 +28,7 @@ mod partitioning; mod physical_expr; pub mod planner; mod scalar_function; -mod sort_expr; -pub mod sort_properties; pub mod string_expressions; -pub mod tree_node; pub mod udf; pub mod utils; pub mod window; @@ -43,20 +40,37 @@ pub mod execution_props { } pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; -pub use aggregate::AggregateExpr; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; +pub use datafusion_physical_expr_common::aggregate::AggregateExpr; pub use equivalence::EquivalenceProperties; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, - PhysicalExpr, PhysicalExprRef, + PhysicalExprRef, }; -pub use planner::{create_physical_expr, create_physical_exprs}; -pub use scalar_function::ScalarFunctionExpr; -pub use sort_expr::{ + +pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +pub use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement, }; -pub use utils::{reverse_order_bys, split_conjunction}; + +pub use planner::{create_physical_expr, create_physical_exprs}; +pub use scalar_function::ScalarFunctionExpr; + +pub use datafusion_physical_expr_common::utils::reverse_order_bys; +pub use utils::split_conjunction; pub use aggregate::first_last::create_first_value_accumulator; + +// For backwards compatibility +pub mod sort_properties { + pub use datafusion_physical_expr_common::sort_properties::{ + ExprOrdering, SortProperties, + }; +} + +// For backwards compatibility +pub mod tree_node { + pub use datafusion_physical_expr_common::tree_node::ExprContext; +} diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 861a4ad02801..bc265d3819a5 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -15,263 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; -use std::fmt::{Debug, Display}; -use std::hash::{Hash, Hasher}; use std::sync::Arc; -use crate::sort_properties::SortProperties; -use crate::utils::scatter; - -use arrow::array::BooleanArray; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use datafusion_common::utils::DataPtr; -use datafusion_common::{internal_err, not_impl_err, Result}; -use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::ColumnarValue; - +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::izip; -/// `PhysicalExpr` evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 -/// AS int)`. -/// -/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical -/// planning, and can be evaluated directly on a [`RecordBatch`]. They are -/// normally created from `Expr` by a [`PhysicalPlanner`] and can be created -/// directly using [`create_physical_expr`]. -/// -/// A Physical expression knows its type, nullability and how to evaluate itself. -/// -/// [`PhysicalPlanner`]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html -/// [`create_physical_expr`]: crate::create_physical_expr -/// [`Expr`]: datafusion_expr::Expr -/// -/// # Example: Create `PhysicalExpr` from `Expr` -/// ``` -/// # use arrow_schema::{DataType, Field, Schema}; -/// # use datafusion_common::DFSchema; -/// # use datafusion_expr::{Expr, col, lit}; -/// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_expr::execution_props::ExecutionProps; -/// // For a logical expression `a = 1`, we can create a physical expression -/// let expr = col("a").eq(lit(1)); -/// // To create a PhysicalExpr we need 1. a schema -/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); -/// let df_schema = DFSchema::try_from(schema).unwrap(); -/// // 2. ExecutionProps -/// let props = ExecutionProps::new(); -/// // We can now create a PhysicalExpr: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); -/// ``` -/// -/// # Example: Executing a PhysicalExpr to obtain [`ColumnarValue`] -/// ``` -/// # use std::sync::Arc; -/// # use arrow_array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; -/// # use arrow_schema::{DataType, Field, Schema}; -/// # use datafusion_common::{assert_batches_eq, DFSchema}; -/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; -/// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_expr::execution_props::ExecutionProps; -/// # let expr = col("a").eq(lit(1)); -/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); -/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); -/// # let props = ExecutionProps::new(); -/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); -/// // Input of [1,2,3] -/// let input_batch = RecordBatch::try_from_iter(vec![ -/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) -/// ]).unwrap(); -/// // The result is a ColumnarValue (either an Array or a Scalar) -/// let result = physical_expr.evaluate(&input_batch).unwrap(); -/// // In this case, a BooleanArray with the result of the comparison -/// let ColumnarValue::Array(arr) = result else { -/// panic!("Expected an array") -/// }; -/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); -/// ``` -pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { - /// Returns the physical expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - /// Get the data type of this expression, given the schema of the input - fn data_type(&self, input_schema: &Schema) -> Result; - /// Determine whether this expression is nullable, given the schema of the input - fn nullable(&self, input_schema: &Schema) -> Result; - /// Evaluate an expression against a RecordBatch - fn evaluate(&self, batch: &RecordBatch) -> Result; - /// Evaluate an expression against a RecordBatch after first applying a - /// validity array - fn evaluate_selection( - &self, - batch: &RecordBatch, - selection: &BooleanArray, - ) -> Result { - let tmp_batch = filter_record_batch(batch, selection)?; - - let tmp_result = self.evaluate(&tmp_batch)?; - - if batch.num_rows() == tmp_batch.num_rows() { - // All values from the `selection` filter are true. - Ok(tmp_result) - } else if let ColumnarValue::Array(a) = tmp_result { - scatter(selection, a.as_ref()).map(ColumnarValue::Array) - } else { - Ok(tmp_result) - } - } - - /// Get a list of child PhysicalExpr that provide the input for this expr. - fn children(&self) -> Vec>; - - /// Returns a new PhysicalExpr where all children were replaced by new exprs. - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result>; - - /// Computes the output interval for the expression, given the input - /// intervals. - /// - /// # Arguments - /// - /// * `children` are the intervals for the children (inputs) of this - /// expression. - /// - /// # Example - /// - /// If the expression is `a + b`, and the input intervals are `a: [1, 2]` - /// and `b: [3, 4]`, then the output interval would be `[4, 6]`. - fn evaluate_bounds(&self, _children: &[&Interval]) -> Result { - not_impl_err!("Not implemented for {self}") - } - - /// Updates bounds for child expressions, given a known interval for this - /// expression. - /// - /// This is used to propagate constraints down through an expression tree. - /// - /// # Arguments - /// - /// * `interval` is the currently known interval for this expression. - /// * `children` are the current intervals for the children of this expression. - /// - /// # Returns - /// - /// A `Vec` of new intervals for the children, in order. - /// - /// If constraint propagation reveals an infeasibility for any child, returns - /// [`None`]. If none of the children intervals change as a result of propagation, - /// may return an empty vector instead of cloning `children`. This is the default - /// (and conservative) return value. - /// - /// # Example - /// - /// If the expression is `a + b`, the current `interval` is `[4, 5]` and the - /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, then - /// propagation would would return `[0, 2]` and `[2, 4]` as `b` must be at - /// least `2` to make the output at least `4`. - fn propagate_constraints( - &self, - _interval: &Interval, - _children: &[&Interval], - ) -> Result>> { - Ok(Some(vec![])) - } - - /// Update the hash `state` with this expression requirements from - /// [`Hash`]. - /// - /// This method is required to support hashing [`PhysicalExpr`]s. To - /// implement it, typically the type implementing - /// [`PhysicalExpr`] implements [`Hash`] and - /// then the following boiler plate is used: - /// - /// # Example: - /// ``` - /// // User defined expression that derives Hash - /// #[derive(Hash, Debug, PartialEq, Eq)] - /// struct MyExpr { - /// val: u64 - /// } - /// - /// // impl PhysicalExpr { - /// // ... - /// # impl MyExpr { - /// // Boiler plate to call the derived Hash impl - /// fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { - /// use std::hash::Hash; - /// let mut s = state; - /// self.hash(&mut s); - /// } - /// // } - /// # } - /// ``` - /// Note: [`PhysicalExpr`] is not constrained by [`Hash`] - /// directly because it must remain object safe. - fn dyn_hash(&self, _state: &mut dyn Hasher); - - /// The order information of a PhysicalExpr can be estimated from its children. - /// This is especially helpful for projection expressions. If we can ensure that the - /// order of a PhysicalExpr to project matches with the order of SortExec, we can - /// eliminate that SortExecs. - /// - /// By recursively calling this function, we can obtain the overall order - /// information of the PhysicalExpr. Since `SortOptions` cannot fully handle - /// the propagation of unordered columns and literals, the `SortProperties` - /// struct is used. - fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties { - SortProperties::Unordered - } -} - -impl Hash for dyn PhysicalExpr { - fn hash(&self, state: &mut H) { - self.dyn_hash(state); - } -} +pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref; /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; -/// Returns a copy of this expr if we change any child according to the pointer comparison. -/// The size of `children` must be equal to the size of `PhysicalExpr::children()`. -pub fn with_new_children_if_necessary( - expr: Arc, - children: Vec>, -) -> Result> { - let old_children = expr.children(); - if children.len() != old_children.len() { - internal_err!("PhysicalExpr: Wrong number of children") - } else if children.is_empty() - || children - .iter() - .zip(old_children.iter()) - .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2)) - { - Ok(expr.with_new_children(children)?) - } else { - Ok(expr) - } -} - -pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { - if any.is::>() { - any.downcast_ref::>() - .unwrap() - .as_any() - } else if any.is::>() { - any.downcast_ref::>() - .unwrap() - .as_any() - } else { - any - } -} - /// This function is similar to the `contains` method of `Vec`. It finds /// whether `expr` is among `physical_exprs`. pub fn physical_exprs_contains( diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 0dbea09ffb51..44c9d33d6052 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -33,6 +33,67 @@ use datafusion_expr::{ }; use std::sync::Arc; +/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 +/// AS int)`. +/// +/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical +/// planning, and can be evaluated directly on a [RecordBatch]. They are +/// normally created from [Expr] by a [PhysicalPlanner] and can be created +/// directly using [create_physical_expr]. +/// +/// A Physical expression knows its type, nullability and how to evaluate itself. +/// +/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html +/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html +/// +/// # Example: Create `PhysicalExpr` from `Expr` +/// ``` +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::DFSchema; +/// # use datafusion_expr::{Expr, col, lit}; +/// # use datafusion_physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// // For a logical expression `a = 1`, we can create a physical expression +/// let expr = col("a").eq(lit(1)); +/// // To create a PhysicalExpr we need 1. a schema +/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// let df_schema = DFSchema::try_from(schema).unwrap(); +/// // 2. ExecutionProps +/// let props = ExecutionProps::new(); +/// // We can now create a PhysicalExpr: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// ``` +/// +/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] +/// ``` +/// # use std::sync::Arc; +/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::{assert_batches_eq, DFSchema}; +/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; +/// # use datafusion_physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// # let expr = col("a").eq(lit(1)); +/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); +/// # let props = ExecutionProps::new(); +/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// // Input of [1,2,3] +/// let input_batch = RecordBatch::try_from_iter(vec![ +/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) +/// ]).unwrap(); +/// // The result is a ColumnarValue (either an Array or a Scalar) +/// let result = physical_expr.evaluate(&input_batch).unwrap(); +/// // In this case, a BooleanArray with the result of the comparison +/// let ColumnarValue::Array(arr) = result else { +/// panic!("Expected an array") +/// }; +/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); +/// ``` +/// +/// [ColumnarValue]: datafusion_expr::ColumnarValue +/// /// Create a physical expression from a logical expression ([Expr]). /// /// # Arguments diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index b8e99403d695..e55bc3d15665 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -24,10 +24,9 @@ use std::sync::Arc; use crate::expressions::{BinaryExpr, Column}; use crate::tree_node::ExprContext; -use crate::{PhysicalExpr, PhysicalSortExpr}; +use crate::PhysicalExpr; +use crate::PhysicalSortExpr; -use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; -use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, @@ -244,62 +243,6 @@ pub fn reassign_predicate_columns( .data() } -/// Reverses the ORDER BY expression, which is useful during equivalent window -/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into -/// 'ORDER BY a DESC, NULLS FIRST'. -pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec { - order_bys - .iter() - .map(|e| PhysicalSortExpr { - expr: e.expr.clone(), - options: !e.options, - }) - .collect() -} - -/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy` -/// are taken, when the mask evaluates `false` values null values are filled. -/// -/// # Arguments -/// * `mask` - Boolean values used to determine where to put the `truthy` values -/// * `truthy` - All values of this array are to scatter according to `mask` into final result. -pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { - let truthy = truthy.to_data(); - - // update the mask so that any null values become false - // (SlicesIterator doesn't respect nulls) - let mask = and_kleene(mask, &is_not_null(mask)?)?; - - let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len()); - - // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to - // fill with falsy values - - // keep track of how much is filled - let mut filled = 0; - // keep track of current position we have in truthy array - let mut true_pos = 0; - - SlicesIterator::new(&mask).for_each(|(start, end)| { - // the gap needs to be filled with nulls - if start > filled { - mutable.extend_nulls(start - filled); - } - // fill with truthy values - let len = end - start; - mutable.extend(0, true_pos, true_pos + len); - true_pos += len; - filled = end; - }); - // the remaining part is falsy - if filled < mask.len() { - mutable.extend_nulls(mask.len() - filled); - } - - let data = mutable.freeze(); - Ok(make_array(data)) -} - /// Merge left and right sort expressions, checking for duplicates. pub fn merge_vectors( left: &[PhysicalSortExpr], @@ -321,9 +264,7 @@ mod tests { use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal}; use crate::PhysicalSortExpr; - use arrow_array::Int32Array; use arrow_schema::{DataType, Field, Schema}; - use datafusion_common::cast::{as_boolean_array, as_int32_array}; use datafusion_common::{Result, ScalarValue}; use petgraph::visit::Bfs; @@ -517,70 +458,4 @@ mod tests { assert_eq!(collect_columns(&expr3), expected); Ok(()) } - - #[test] - fn scatter_int() -> Result<()> { - let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100])); - let mask = BooleanArray::from(vec![true, true, false, false, true]); - - // the output array is expected to be the same length as the mask array - let expected = - Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]); - let result = scatter(&mask, truthy.as_ref())?; - let result = as_int32_array(&result)?; - - assert_eq!(&expected, result); - Ok(()) - } - - #[test] - fn scatter_int_end_with_false() -> Result<()> { - let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100])); - let mask = BooleanArray::from(vec![true, false, true, false, false, false]); - - // output should be same length as mask - let expected = - Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]); - let result = scatter(&mask, truthy.as_ref())?; - let result = as_int32_array(&result)?; - - assert_eq!(&expected, result); - Ok(()) - } - - #[test] - fn scatter_with_null_mask() -> Result<()> { - let truthy = Arc::new(Int32Array::from(vec![1, 10, 11])); - let mask: BooleanArray = vec![Some(false), None, Some(true), Some(true), None] - .into_iter() - .collect(); - - // output should treat nulls as though they are false - let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]); - let result = scatter(&mask, truthy.as_ref())?; - let result = as_int32_array(&result)?; - - assert_eq!(&expected, result); - Ok(()) - } - - #[test] - fn scatter_boolean() -> Result<()> { - let truthy = Arc::new(BooleanArray::from(vec![false, false, false, true])); - let mask = BooleanArray::from(vec![true, true, false, false, true]); - - // the output array is expected to be the same length as the mask array - let expected = BooleanArray::from_iter(vec![ - Some(false), - Some(false), - None, - None, - Some(false), - ]); - let result = scatter(&mask, truthy.as_ref())?; - let result = as_boolean_array(&result)?; - - assert_eq!(&expected, result); - Ok(()) - } }