diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 968d8215ca4d..30ab9a339b54 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1362,6 +1362,8 @@ impl TableOptions { } } +/// Options that control how Parquet files are read, including global options +/// that apply to all columns and optional column-specific overrides #[derive(Clone, Default, Debug, PartialEq)] pub struct TableParquetOptions { /// Global Parquet options that propagates to all columns. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 377dad5cee6c..c4a888f5462d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -89,9 +89,10 @@ pub struct ParquetExec { metadata_size_hint: Option<usize>, /// Optional user defined parquet file reader factory parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>, + /// Cached plan properties such as equivalence properties, ordering, partitioning, etc. cache: PlanProperties, - /// Parquet Options - parquet_options: TableParquetOptions, + /// Options for reading Parquet files + table_parquet_options: TableParquetOptions, } impl ParquetExec { @@ -100,7 +101,7 @@ impl ParquetExec { base_config: FileScanConfig, predicate: Option<Arc<dyn PhysicalExpr>>, metadata_size_hint: Option<usize>, - parquet_options: TableParquetOptions, + table_parquet_options: TableParquetOptions, ) -> Self { debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -155,15 +156,20 @@ impl ParquetExec { metadata_size_hint, parquet_file_reader_factory: None, cache, - parquet_options, + table_parquet_options, } } - /// Ref to the base configs + /// [`FileScanConfig`] that controls this scan (such as which files to read) pub fn base_config(&self) -> &FileScanConfig { &self.base_config } + /// Options passed to the parquet reader for this scan + pub fn table_parquet_options(&self) -> &TableParquetOptions { + &self.table_parquet_options + } + /// Optional predicate. pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> { self.predicate.as_ref() @@ -197,13 +203,13 @@ impl ParquetExec { /// /// [`Expr`]: datafusion_expr::Expr pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { - self.parquet_options.global.pushdown_filters = pushdown_filters; + self.table_parquet_options.global.pushdown_filters = pushdown_filters; self } /// Return the value described in [`Self::with_pushdown_filters`] fn pushdown_filters(&self) -> bool { - self.parquet_options.global.pushdown_filters + self.table_parquet_options.global.pushdown_filters } /// If true, the `RowFilter` made by `pushdown_filters` may try to @@ -213,13 +219,13 @@ impl ParquetExec { /// /// [`Expr`]: datafusion_expr::Expr pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self { - self.parquet_options.global.reorder_filters = reorder_filters; + self.table_parquet_options.global.reorder_filters = reorder_filters; self } /// Return the value described in [`Self::with_reorder_filters`] fn reorder_filters(&self) -> bool { - self.parquet_options.global.reorder_filters + self.table_parquet_options.global.reorder_filters } /// If enabled, the reader will read the page index @@ -227,24 +233,24 @@ impl ParquetExec { /// via `RowSelector` and `RowFilter` by /// eliminating unnecessary IO and decoding pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self { - self.parquet_options.global.enable_page_index = enable_page_index; + self.table_parquet_options.global.enable_page_index = enable_page_index; self } /// Return the value described in [`Self::with_enable_page_index`] fn enable_page_index(&self) -> bool { - self.parquet_options.global.enable_page_index + self.table_parquet_options.global.enable_page_index } /// If enabled, the reader will read by the bloom filter pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self { - self.parquet_options.global.bloom_filter_enabled = enable_bloom_filter; + self.table_parquet_options.global.bloom_filter_enabled = enable_bloom_filter; self } /// Return the value described in [`Self::with_enable_bloom_filter`] fn enable_bloom_filter(&self) -> bool { - self.parquet_options.global.bloom_filter_enabled + self.table_parquet_options.global.bloom_filter_enabled } fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index a82c5d97a2b7..8df4925fc566 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -234,6 +234,10 @@ impl PruningStatistics for BloomFilterStatistics { ScalarValue::Int32(Some(v)) => sbbf.check(v), ScalarValue::Int16(Some(v)) => sbbf.check(v), ScalarValue::Int8(Some(v)) => sbbf.check(v), + ScalarValue::UInt64(Some(v)) => sbbf.check(v), + ScalarValue::UInt32(Some(v)) => sbbf.check(v), + ScalarValue::UInt16(Some(v)) => sbbf.check(v), + ScalarValue::UInt8(Some(v)) => sbbf.check(v), ScalarValue::Decimal128(Some(v), p, s) => match parquet_type { Type::INT32 => { //https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42 diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 1da86a0363a5..b4415d638ada 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -66,6 +66,7 @@ enum Scenario { Int, Int32Range, UInt, + UInt32Range, Float64, Decimal, DecimalBloomFilterInt32, @@ -455,6 +456,13 @@ fn make_int32_range(start: i32, end: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } +fn make_uint32_range(start: u32, end: u32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("u", DataType::UInt32, true)])); + let v = vec![start, end]; + let array = Arc::new(UInt32Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + /// Return record batch with f64 vector /// /// Columns are named @@ -659,6 +667,9 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> { make_uint_batches(250, 255), ] } + Scenario::UInt32Range => { + vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)] + } Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b70102f78a96..b7b434d1c3d3 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -339,7 +339,7 @@ macro_rules! int_tests { async fn [<prune_int $bits _scalar_fun_and_eq >]() { RowGroupPruningTest::new() .with_scenario(Scenario::Int) - .with_query(&format!("SELECT * FROM t where i{} = 1", $bits)) + .with_query(&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits)) .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) @@ -452,6 +452,144 @@ int_tests!(16, correct_bloom_filters: false); int_tests!(32, correct_bloom_filters: true); int_tests!(64, correct_bloom_filters: true); +// $bits: number of bits of the integer to test (8, 16, 32, 64) +// $correct_bloom_filters: if false, replicates the +// https://github.com/apache/arrow-datafusion/issues/9779 bug so that tests pass +// if and only if Bloom filters on UInt8 and UInt16 columns are still buggy. +macro_rules! uint_tests { + ($bits:expr, correct_bloom_filters: $correct_bloom_filters:expr) => { + paste::item! { + #[tokio::test] + async fn [<prune_uint $bits _lt >]() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} < 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(11) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn [<prune_uint $bits _eq >]() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + #[tokio::test] + async fn [<prune_uint $bits _scalar_fun_and_eq >]() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where power(u{}, 2) = 36 and u{} = 6", $bits, $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn [<prune_uint $bits _scalar_fun >]() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where power(u{}, 2) = 25", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn [<prune_uint $bits _complex_expr >]() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{}+1 = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn [<prune_uint $bits _eq_in_list >]() { + // result of sql "SELECT * FROM t where in (1)" + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} in (6)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn [<prune_uint $bits _eq_in_list_2 >]() { + // result of sql "SELECT * FROM t where in (1000)", prune all + // test whether statistics works + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} in (100)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(4)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(0) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn [<prune_uint $bits _eq_in_list_negated >]() { + // result of sql "SELECT * FROM t where not in (1)" prune nothing + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} not in (6)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(4)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(4)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(19) + .test_row_group_prune() + .await; + } + } + }; +} + +uint_tests!(8, correct_bloom_filters: false); +uint_tests!(16, correct_bloom_filters: false); +uint_tests!(32, correct_bloom_filters: true); +uint_tests!(64, correct_bloom_filters: true); + #[tokio::test] async fn prune_int32_eq_large_in_list() { // result of sql "SELECT * FROM t where i in (2050...2582)", prune all @@ -474,6 +612,28 @@ async fn prune_int32_eq_large_in_list() { .await; } +#[tokio::test] +async fn prune_uint32_eq_large_in_list() { + // result of sql "SELECT * FROM t where i in (2050...2582)", prune all + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt32Range) + .with_query( + format!( + "SELECT * FROM t where u in ({})", + (200050..200082).join(",") + ) + .as_str(), + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(0) + .test_row_group_prune() + .await; +} + #[tokio::test] async fn prune_f64_lt() { RowGroupPruningTest::new() diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index a1b3b717392e..744192e9a3e1 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -69,18 +69,8 @@ pub enum BuiltinScalarFunction { Pi, /// power Power, - /// radians - Radians, /// round Round, - /// signum - Signum, - /// sin - Sin, - /// sinh - Sinh, - /// sqrt - Sqrt, /// trunc Trunc, /// cot @@ -165,10 +155,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Pi => Volatility::Immutable, BuiltinScalarFunction::Power => Volatility::Immutable, BuiltinScalarFunction::Round => Volatility::Immutable, - BuiltinScalarFunction::Signum => Volatility::Immutable, - BuiltinScalarFunction::Sin => Volatility::Immutable, - BuiltinScalarFunction::Sinh => Volatility::Immutable, - BuiltinScalarFunction::Sqrt => Volatility::Immutable, BuiltinScalarFunction::Cbrt => Volatility::Immutable, BuiltinScalarFunction::Cot => Volatility::Immutable, BuiltinScalarFunction::Trunc => Volatility::Immutable, @@ -176,7 +162,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable, BuiltinScalarFunction::EndsWith => Volatility::Immutable, BuiltinScalarFunction::InitCap => Volatility::Immutable, - BuiltinScalarFunction::Radians => Volatility::Immutable, // Volatile builtin functions BuiltinScalarFunction::Random => Volatility::Volatile, @@ -241,12 +226,7 @@ impl BuiltinScalarFunction { | BuiltinScalarFunction::Degrees | BuiltinScalarFunction::Exp | BuiltinScalarFunction::Floor - | BuiltinScalarFunction::Radians | BuiltinScalarFunction::Round - | BuiltinScalarFunction::Signum - | BuiltinScalarFunction::Sin - | BuiltinScalarFunction::Sinh - | BuiltinScalarFunction::Sqrt | BuiltinScalarFunction::Cbrt | BuiltinScalarFunction::Trunc | BuiltinScalarFunction::Cot => match input_expr_types[0] { @@ -335,11 +315,6 @@ impl BuiltinScalarFunction { | BuiltinScalarFunction::Degrees | BuiltinScalarFunction::Exp | BuiltinScalarFunction::Floor - | BuiltinScalarFunction::Radians - | BuiltinScalarFunction::Signum - | BuiltinScalarFunction::Sin - | BuiltinScalarFunction::Sinh - | BuiltinScalarFunction::Sqrt | BuiltinScalarFunction::Cot => { // math expressions expect 1 argument of type f64 or f32 // priority is given to f64 because e.g. `sqrt(1i32)` is in IR (real numbers) and thus we @@ -366,11 +341,7 @@ impl BuiltinScalarFunction { | BuiltinScalarFunction::Exp | BuiltinScalarFunction::Factorial | BuiltinScalarFunction::Floor - | BuiltinScalarFunction::Radians | BuiltinScalarFunction::Round - | BuiltinScalarFunction::Signum - | BuiltinScalarFunction::Sinh - | BuiltinScalarFunction::Sqrt | BuiltinScalarFunction::Cbrt | BuiltinScalarFunction::Trunc | BuiltinScalarFunction::Pi @@ -402,13 +373,8 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Nanvl => &["nanvl"], BuiltinScalarFunction::Pi => &["pi"], BuiltinScalarFunction::Power => &["power", "pow"], - BuiltinScalarFunction::Radians => &["radians"], BuiltinScalarFunction::Random => &["random"], BuiltinScalarFunction::Round => &["round"], - BuiltinScalarFunction::Signum => &["signum"], - BuiltinScalarFunction::Sin => &["sin"], - BuiltinScalarFunction::Sinh => &["sinh"], - BuiltinScalarFunction::Sqrt => &["sqrt"], BuiltinScalarFunction::Trunc => &["trunc"], // conditional functions diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a2015787040f..1aa063b17539 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -534,12 +534,9 @@ macro_rules! nary_scalar_expr { // generate methods for creating the supported unary/binary expressions // math functions -scalar_expr!(Sqrt, sqrt, num, "square root of a number"); scalar_expr!(Cbrt, cbrt, num, "cube root of a number"); -scalar_expr!(Sin, sin, num, "sine"); scalar_expr!(Cos, cos, num, "cosine"); scalar_expr!(Cot, cot, num, "cotangent"); -scalar_expr!(Sinh, sinh, num, "hyperbolic sine"); scalar_expr!(Cosh, cosh, num, "hyperbolic cosine"); scalar_expr!(Factorial, factorial, num, "factorial"); scalar_expr!( @@ -555,14 +552,12 @@ scalar_expr!( "nearest integer greater than or equal to argument" ); scalar_expr!(Degrees, degrees, num, "converts radians to degrees"); -scalar_expr!(Radians, radians, num, "converts degrees to radians"); nary_scalar_expr!(Round, round, "round to nearest integer"); nary_scalar_expr!( Trunc, trunc, "truncate toward zero, with optional precision" ); -scalar_expr!(Signum, signum, num, "sign of the argument (-1, 0, +1) "); scalar_expr!(Exp, exp, num, "exponential"); scalar_expr!(Gcd, gcd, arg_1 arg_2, "greatest common divisor"); scalar_expr!(Lcm, lcm, arg_1 arg_2, "least common multiple"); @@ -885,8 +880,8 @@ impl WindowUDFImpl for SimpleWindowUDF { /// ``` /// use datafusion_expr::{col, lit, call_fn}; /// -/// // create the expression sin(x) < 0.2 -/// let expr = call_fn("sin", vec![col("x")]).unwrap().lt(lit(0.2)); +/// // create the expression trunc(x) < 0.2 +/// let expr = call_fn("trunc", vec![col("x")]).unwrap().lt(lit(0.2)); /// ``` pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) -> Result<Expr> { match name.as_ref().parse::<BuiltinScalarFunction>() { @@ -967,23 +962,18 @@ mod test { #[test] fn scalar_function_definitions() { - test_unary_scalar_expr!(Sqrt, sqrt); test_unary_scalar_expr!(Cbrt, cbrt); - test_unary_scalar_expr!(Sin, sin); test_unary_scalar_expr!(Cos, cos); test_unary_scalar_expr!(Cot, cot); - test_unary_scalar_expr!(Sinh, sinh); test_unary_scalar_expr!(Cosh, cosh); test_unary_scalar_expr!(Factorial, factorial); test_unary_scalar_expr!(Floor, floor); test_unary_scalar_expr!(Ceil, ceil); test_unary_scalar_expr!(Degrees, degrees); - test_unary_scalar_expr!(Radians, radians); test_nary_scalar_expr!(Round, round, input); test_nary_scalar_expr!(Round, round, input, decimal_places); test_nary_scalar_expr!(Trunc, trunc, num); test_nary_scalar_expr!(Trunc, trunc, num, precision); - test_unary_scalar_expr!(Signum, signum); test_unary_scalar_expr!(Exp, exp); test_scalar_expr!(Nanvl, nanvl, x, y); test_scalar_expr!(Iszero, iszero, input); diff --git a/datafusion/functions/src/math/mod.rs b/datafusion/functions/src/math/mod.rs index ee53fcf96a8b..d6a44cfbdbf5 100644 --- a/datafusion/functions/src/math/mod.rs +++ b/datafusion/functions/src/math/mod.rs @@ -39,6 +39,12 @@ make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh, Some(vec![Some(true)])); make_math_unary_udf!(AtanFunc, ATAN, atan, atan, Some(vec![Some(true)])); make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, Some(vec![Some(true)])); +make_math_unary_udf!(RadiansFunc, RADIANS, radians, to_radians, None); +make_math_unary_udf!(SignumFunc, SIGNUM, signum, signum, None); +make_math_unary_udf!(SinFunc, SIN, sin, sin, None); +make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, None); +make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, None); + // Export the functions out of this package, both as expr_fn as well as a list of functions export_functions!( ( @@ -66,5 +72,10 @@ export_functions!( (asinh, num, "returns inverse hyperbolic sine"), (acosh, num, "returns inverse hyperbolic cosine"), (atan, num, "returns inverse tangent"), - (atan2, y x, "returns inverse tangent of a division given in the argument") + (atan2, y x, "returns inverse tangent of a division given in the argument"), + (radians, num, "converts degrees to radians"), + (signum, num, "sign of the argument (-1, 0, +1)"), + (sin, num, "sine"), + (sinh, num, "hyperbolic sine"), + (sqrt, num, "square root of a number") ); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index a1e471bdd422..f7be2704ab79 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -200,15 +200,10 @@ pub fn create_physical_fun( BuiltinScalarFunction::Nanvl => { Arc::new(|args| make_scalar_function_inner(math_expressions::nanvl)(args)) } - BuiltinScalarFunction::Radians => Arc::new(math_expressions::to_radians), BuiltinScalarFunction::Random => Arc::new(math_expressions::random), BuiltinScalarFunction::Round => { Arc::new(|args| make_scalar_function_inner(math_expressions::round)(args)) } - BuiltinScalarFunction::Signum => Arc::new(math_expressions::signum), - BuiltinScalarFunction::Sin => Arc::new(math_expressions::sin), - BuiltinScalarFunction::Sinh => Arc::new(math_expressions::sinh), - BuiltinScalarFunction::Sqrt => Arc::new(math_expressions::sqrt), BuiltinScalarFunction::Cbrt => Arc::new(math_expressions::cbrt), BuiltinScalarFunction::Trunc => { Arc::new(|args| make_scalar_function_inner(math_expressions::trunc)(args)) diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 5339c12f6e93..acccb9cb3cd3 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -155,11 +155,8 @@ macro_rules! make_function_scalar_inputs_return_type { }}; } -math_unary_function!("sqrt", sqrt); math_unary_function!("cbrt", cbrt); -math_unary_function!("sin", sin); math_unary_function!("cos", cos); -math_unary_function!("sinh", sinh); math_unary_function!("cosh", cosh); math_unary_function!("asin", asin); math_unary_function!("acos", acos); @@ -169,13 +166,11 @@ math_unary_function!("acosh", acosh); math_unary_function!("atanh", atanh); math_unary_function!("floor", floor); math_unary_function!("ceil", ceil); -math_unary_function!("signum", signum); math_unary_function!("exp", exp); math_unary_function!("ln", ln); math_unary_function!("log2", log2); math_unary_function!("log10", log10); math_unary_function!("degrees", to_degrees); -math_unary_function!("radians", to_radians); /// Factorial SQL function pub fn factorial(args: &[ArrayRef]) -> Result<ArrayRef> { diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index 5e8ff72df35c..2037ddb70c2d 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -123,6 +123,15 @@ impl<'a> MetricBuilder<'a> { count } + /// Consume self and create a new counter for recording the total spilled rows + /// triggered by an operator + pub fn spilled_rows(self, partition: usize) -> Count { + let count = Count::new(); + self.with_partition(partition) + .build(MetricValue::SpilledRows(count.clone())); + count + } + /// Consume self and create a new gauge for reporting current memory usage pub fn mem_used(self, partition: usize) -> Gauge { let gauge = Gauge::new(); diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index b2e0086f69e9..9232865aa09c 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -70,7 +70,7 @@ pub struct Metric { /// The value of the metric value: MetricValue, - /// arbitrary name=value pairs identifiying this metric + /// arbitrary name=value pairs identifying this metric labels: Vec<Label>, /// To which partition of an operators output did this metric @@ -209,6 +209,13 @@ impl MetricsSet { .map(|v| v.as_usize()) } + /// Convenience: return the total rows of spills, aggregated + /// across partitions or `None` if no metric is present + pub fn spilled_rows(&self) -> Option<usize> { + self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_))) + .map(|v| v.as_usize()) + } + /// Convenience: return the amount of elapsed CPU time spent, /// aggregated across partitions or `None` if no metric is present pub fn elapsed_compute(&self) -> Option<usize> { @@ -251,6 +258,7 @@ impl MetricsSet { MetricValue::ElapsedCompute(_) => false, MetricValue::SpillCount(_) => false, MetricValue::SpilledBytes(_) => false, + MetricValue::SpilledRows(_) => false, MetricValue::CurrentMemoryUsage(_) => false, MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index ab87cd9e3780..22db8f1e4e88 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -364,13 +364,15 @@ pub enum MetricValue { /// Note 2: *Does* includes time when the thread could have made /// progress but the OS did not schedule it (e.g. due to CPU /// contention), thus making this value different than the - /// classical defintion of "cpu_time", which is the time reported + /// classical definition of "cpu_time", which is the time reported /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`. ElapsedCompute(Time), /// Number of spills produced: "spill_count" metric SpillCount(Count), /// Total size of spilled bytes produced: "spilled_bytes" metric SpilledBytes(Count), + /// Total size of spilled rows produced: "spilled_rows" metric + SpilledRows(Count), /// Current memory used CurrentMemoryUsage(Gauge), /// Operator defined count. @@ -407,6 +409,7 @@ impl MetricValue { Self::OutputRows(_) => "output_rows", Self::SpillCount(_) => "spill_count", Self::SpilledBytes(_) => "spilled_bytes", + Self::SpilledRows(_) => "spilled_rows", Self::CurrentMemoryUsage(_) => "mem_used", Self::ElapsedCompute(_) => "elapsed_compute", Self::Count { name, .. } => name.borrow(), @@ -423,6 +426,7 @@ impl MetricValue { Self::OutputRows(count) => count.value(), Self::SpillCount(count) => count.value(), Self::SpilledBytes(bytes) => bytes.value(), + Self::SpilledRows(count) => count.value(), Self::CurrentMemoryUsage(used) => used.value(), Self::ElapsedCompute(time) => time.value(), Self::Count { count, .. } => count.value(), @@ -448,6 +452,7 @@ impl MetricValue { Self::OutputRows(_) => Self::OutputRows(Count::new()), Self::SpillCount(_) => Self::SpillCount(Count::new()), Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()), + Self::SpilledRows(_) => Self::SpilledRows(Count::new()), Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()), Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()), Self::Count { name, .. } => Self::Count { @@ -481,6 +486,7 @@ impl MetricValue { (Self::OutputRows(count), Self::OutputRows(other_count)) | (Self::SpillCount(count), Self::SpillCount(other_count)) | (Self::SpilledBytes(count), Self::SpilledBytes(other_count)) + | (Self::SpilledRows(count), Self::SpilledRows(other_count)) | ( Self::Count { count, .. }, Self::Count { @@ -526,12 +532,13 @@ impl MetricValue { Self::ElapsedCompute(_) => 1, // show second Self::SpillCount(_) => 2, Self::SpilledBytes(_) => 3, - Self::CurrentMemoryUsage(_) => 4, - Self::Count { .. } => 5, - Self::Gauge { .. } => 6, - Self::Time { .. } => 7, - Self::StartTimestamp(_) => 8, // show timestamps last - Self::EndTimestamp(_) => 9, + Self::SpilledRows(_) => 4, + Self::CurrentMemoryUsage(_) => 5, + Self::Count { .. } => 6, + Self::Gauge { .. } => 7, + Self::Time { .. } => 8, + Self::StartTimestamp(_) => 9, // show timestamps last + Self::EndTimestamp(_) => 10, } } @@ -541,13 +548,14 @@ impl MetricValue { } } -impl std::fmt::Display for MetricValue { +impl Display for MetricValue { /// Prints the value of this metric fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::OutputRows(count) | Self::SpillCount(count) | Self::SpilledBytes(count) + | Self::SpilledRows(count) | Self::Count { count, .. } => { write!(f, "{count}") } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a6f47d3d2fc9..2f4276c5ba77 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -71,6 +71,9 @@ struct ExternalSorterMetrics { /// total spilled bytes during the execution of the operator spilled_bytes: Count, + + /// total spilled rows during the execution of the operator + spilled_rows: Count, } impl ExternalSorterMetrics { @@ -79,6 +82,7 @@ impl ExternalSorterMetrics { baseline: BaselineMetrics::new(metrics, partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), + spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition), } } } @@ -231,13 +235,13 @@ struct ExternalSorter { /// prior to spilling. sort_spill_reservation_bytes: usize, /// If the in size of buffered memory batches is below this size, - /// the data will be concated and sorted in place rather than + /// the data will be concatenated and sorted in place rather than /// sort/merged. sort_in_place_threshold_bytes: usize, } impl ExternalSorter { - // TOOD: make a builder or some other nicer API to avoid the + // TODO: make a builder or some other nicer API to avoid the // clippy warning #[allow(clippy::too_many_arguments)] pub fn new( @@ -371,13 +375,18 @@ impl ExternalSorter { self.metrics.spilled_bytes.value() } + /// How many rows have been spilled to disk? + fn spilled_rows(&self) -> usize { + self.metrics.spilled_rows.value() + } + /// How many spill files have been created? fn spill_count(&self) -> usize { self.metrics.spill_count.value() } /// Writes any `in_memory_batches` to a spill file and clears - /// the batches. The contents of the spil file are sorted. + /// the batches. The contents of the spill file are sorted. /// /// Returns the amount of memory freed. async fn spill(&mut self) -> Result<usize> { @@ -390,13 +399,15 @@ impl ExternalSorter { self.in_mem_sort().await?; - let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; + let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; + let spilled_rows = + spill_sorted_batches(batches, spill_file.path(), self.schema.clone()).await?; let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); - self.spills.push(spillfile); + self.metrics.spilled_rows.add(spilled_rows as usize); + self.spills.push(spill_file); Ok(used) } @@ -576,6 +587,7 @@ impl Debug for ExternalSorter { f.debug_struct("ExternalSorter") .field("memory_used", &self.used()) .field("spilled_bytes", &self.spilled_bytes()) + .field("spilled_rows", &self.spilled_rows()) .field("spill_count", &self.spill_count()) .finish() } @@ -650,11 +662,14 @@ pub(crate) fn lexsort_to_indices_multi_columns( Ok(indices) } +/// Spills sorted `in_memory_batches` to disk. +/// +/// Returns number of the rows spilled to disk. async fn spill_sorted_batches( batches: Vec<RecordBatch>, path: &Path, schema: SchemaRef, -) -> Result<()> { +) -> Result<u64> { let path: PathBuf = path.into(); let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema)); match task.join().await { @@ -685,7 +700,7 @@ fn write_sorted( batches: Vec<RecordBatch>, path: PathBuf, schema: SchemaRef, -) -> Result<()> { +) -> Result<u64> { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; for batch in batches { writer.write(&batch)?; @@ -697,7 +712,7 @@ fn write_sorted( writer.num_rows, human_readable_size(writer.num_bytes as usize), ); - Ok(()) + Ok(writer.num_rows) } fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> { @@ -1062,8 +1077,9 @@ mod tests { assert_eq!(metrics.output_rows().unwrap(), 10000); assert!(metrics.elapsed_compute().unwrap() > 0); - assert!(metrics.spill_count().unwrap() > 0); - assert!(metrics.spilled_bytes().unwrap() > 0); + assert_eq!(metrics.spill_count().unwrap(), 4); + assert_eq!(metrics.spilled_bytes().unwrap(), 38784); + assert_eq!(metrics.spilled_rows().unwrap(), 9600); let columns = result[0].columns(); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e959cad2a810..22a73aff1837 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -556,9 +556,9 @@ enum ScalarFunction { // 12 was Log10 // 13 was Log2 Round = 14; - Signum = 15; - Sin = 16; - Sqrt = 17; + // 15 was Signum + // 16 was Sin + // 17 was Sqrt // Tan = 18; Trunc = 19; // 20 was Array @@ -618,12 +618,12 @@ enum ScalarFunction { // 74 Acosh // 75 was Asinh // 76 was Atanh - Sinh = 77; + // 77 was Sinh Cosh = 78; // Tanh = 79; Pi = 80; Degrees = 81; - Radians = 82; + // 82 was Radians Factorial = 83; Lcm = 84; Gcd = 85; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index d900d0031df3..aafb5b535b09 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22920,9 +22920,6 @@ impl serde::Serialize for ScalarFunction { Self::Floor => "Floor", Self::Log => "Log", Self::Round => "Round", - Self::Signum => "Signum", - Self::Sin => "Sin", - Self::Sqrt => "Sqrt", Self::Trunc => "Trunc", Self::Concat => "Concat", Self::ConcatWithSeparator => "ConcatWithSeparator", @@ -22931,11 +22928,9 @@ impl serde::Serialize for ScalarFunction { Self::Coalesce => "Coalesce", Self::Power => "Power", Self::Cbrt => "Cbrt", - Self::Sinh => "Sinh", Self::Cosh => "Cosh", Self::Pi => "Pi", Self::Degrees => "Degrees", - Self::Radians => "Radians", Self::Factorial => "Factorial", Self::Lcm => "Lcm", Self::Gcd => "Gcd", @@ -22961,9 +22956,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Floor", "Log", "Round", - "Signum", - "Sin", - "Sqrt", "Trunc", "Concat", "ConcatWithSeparator", @@ -22972,11 +22964,9 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Coalesce", "Power", "Cbrt", - "Sinh", "Cosh", "Pi", "Degrees", - "Radians", "Factorial", "Lcm", "Gcd", @@ -23031,9 +23021,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Floor" => Ok(ScalarFunction::Floor), "Log" => Ok(ScalarFunction::Log), "Round" => Ok(ScalarFunction::Round), - "Signum" => Ok(ScalarFunction::Signum), - "Sin" => Ok(ScalarFunction::Sin), - "Sqrt" => Ok(ScalarFunction::Sqrt), "Trunc" => Ok(ScalarFunction::Trunc), "Concat" => Ok(ScalarFunction::Concat), "ConcatWithSeparator" => Ok(ScalarFunction::ConcatWithSeparator), @@ -23042,11 +23029,9 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Coalesce" => Ok(ScalarFunction::Coalesce), "Power" => Ok(ScalarFunction::Power), "Cbrt" => Ok(ScalarFunction::Cbrt), - "Sinh" => Ok(ScalarFunction::Sinh), "Cosh" => Ok(ScalarFunction::Cosh), "Pi" => Ok(ScalarFunction::Pi), "Degrees" => Ok(ScalarFunction::Degrees), - "Radians" => Ok(ScalarFunction::Radians), "Factorial" => Ok(ScalarFunction::Factorial), "Lcm" => Ok(ScalarFunction::Lcm), "Gcd" => Ok(ScalarFunction::Gcd), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 753abb4e2756..81f390fff184 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2855,9 +2855,9 @@ pub enum ScalarFunction { /// 12 was Log10 /// 13 was Log2 Round = 14, - Signum = 15, - Sin = 16, - Sqrt = 17, + /// 15 was Signum + /// 16 was Sin + /// 17 was Sqrt /// Tan = 18; Trunc = 19, /// 20 was Array @@ -2917,12 +2917,12 @@ pub enum ScalarFunction { /// 74 Acosh /// 75 was Asinh /// 76 was Atanh - Sinh = 77, + /// 77 was Sinh Cosh = 78, /// Tanh = 79; Pi = 80, Degrees = 81, - Radians = 82, + /// 82 was Radians Factorial = 83, Lcm = 84, Gcd = 85, @@ -2993,9 +2993,6 @@ impl ScalarFunction { ScalarFunction::Floor => "Floor", ScalarFunction::Log => "Log", ScalarFunction::Round => "Round", - ScalarFunction::Signum => "Signum", - ScalarFunction::Sin => "Sin", - ScalarFunction::Sqrt => "Sqrt", ScalarFunction::Trunc => "Trunc", ScalarFunction::Concat => "Concat", ScalarFunction::ConcatWithSeparator => "ConcatWithSeparator", @@ -3004,11 +3001,9 @@ impl ScalarFunction { ScalarFunction::Coalesce => "Coalesce", ScalarFunction::Power => "Power", ScalarFunction::Cbrt => "Cbrt", - ScalarFunction::Sinh => "Sinh", ScalarFunction::Cosh => "Cosh", ScalarFunction::Pi => "Pi", ScalarFunction::Degrees => "Degrees", - ScalarFunction::Radians => "Radians", ScalarFunction::Factorial => "Factorial", ScalarFunction::Lcm => "Lcm", ScalarFunction::Gcd => "Gcd", @@ -3028,9 +3023,6 @@ impl ScalarFunction { "Floor" => Some(Self::Floor), "Log" => Some(Self::Log), "Round" => Some(Self::Round), - "Signum" => Some(Self::Signum), - "Sin" => Some(Self::Sin), - "Sqrt" => Some(Self::Sqrt), "Trunc" => Some(Self::Trunc), "Concat" => Some(Self::Concat), "ConcatWithSeparator" => Some(Self::ConcatWithSeparator), @@ -3039,11 +3031,9 @@ impl ScalarFunction { "Coalesce" => Some(Self::Coalesce), "Power" => Some(Self::Power), "Cbrt" => Some(Self::Cbrt), - "Sinh" => Some(Self::Sinh), "Cosh" => Some(Self::Cosh), "Pi" => Some(Self::Pi), "Degrees" => Some(Self::Degrees), - "Radians" => Some(Self::Radians), "Factorial" => Some(Self::Factorial), "Lcm" => Some(Self::Lcm), "Gcd" => Some(Self::Gcd), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index b39ce41bbe26..3694418412b1 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -42,9 +42,9 @@ use datafusion_expr::{ expr::{self, InList, Sort, WindowFunction}, factorial, floor, gcd, initcap, iszero, lcm, log, logical_plan::{PlanType, StringifiedPlan}, - nanvl, pi, power, radians, random, round, signum, sin, sinh, sqrt, trunc, - AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction, - Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet, + nanvl, pi, power, random, round, trunc, AggregateFunction, Between, BinaryExpr, + BuiltInWindowFunction, BuiltinScalarFunction, Case, Cast, Expr, GetFieldAccess, + GetIndexedField, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -421,17 +421,13 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { use protobuf::ScalarFunction; match f { ScalarFunction::Unknown => todo!(), - ScalarFunction::Sqrt => Self::Sqrt, ScalarFunction::Cbrt => Self::Cbrt, - ScalarFunction::Sin => Self::Sin, ScalarFunction::Cos => Self::Cos, ScalarFunction::Cot => Self::Cot, - ScalarFunction::Sinh => Self::Sinh, ScalarFunction::Cosh => Self::Cosh, ScalarFunction::Exp => Self::Exp, ScalarFunction::Log => Self::Log, ScalarFunction::Degrees => Self::Degrees, - ScalarFunction::Radians => Self::Radians, ScalarFunction::Factorial => Self::Factorial, ScalarFunction::Gcd => Self::Gcd, ScalarFunction::Lcm => Self::Lcm, @@ -440,7 +436,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Round => Self::Round, ScalarFunction::Trunc => Self::Trunc, ScalarFunction::Concat => Self::Concat, - ScalarFunction::Signum => Self::Signum, ScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator, ScalarFunction::EndsWith => Self::EndsWith, ScalarFunction::InitCap => Self::InitCap, @@ -1311,19 +1306,13 @@ pub fn parse_expr( match scalar_function { ScalarFunction::Unknown => Err(proto_error("Unknown scalar function")), - ScalarFunction::Sqrt => Ok(sqrt(parse_expr(&args[0], registry, codec)?)), ScalarFunction::Cbrt => Ok(cbrt(parse_expr(&args[0], registry, codec)?)), - ScalarFunction::Sin => Ok(sin(parse_expr(&args[0], registry, codec)?)), ScalarFunction::Cos => Ok(cos(parse_expr(&args[0], registry, codec)?)), - ScalarFunction::Sinh => Ok(sinh(parse_expr(&args[0], registry, codec)?)), ScalarFunction::Cosh => Ok(cosh(parse_expr(&args[0], registry, codec)?)), ScalarFunction::Exp => Ok(exp(parse_expr(&args[0], registry, codec)?)), ScalarFunction::Degrees => { Ok(degrees(parse_expr(&args[0], registry, codec)?)) } - ScalarFunction::Radians => { - Ok(radians(parse_expr(&args[0], registry, codec)?)) - } ScalarFunction::Floor => { Ok(floor(parse_expr(&args[0], registry, codec)?)) } @@ -1333,9 +1322,6 @@ pub fn parse_expr( ScalarFunction::Ceil => Ok(ceil(parse_expr(&args[0], registry, codec)?)), ScalarFunction::Round => Ok(round(parse_exprs(args, registry, codec)?)), ScalarFunction::Trunc => Ok(trunc(parse_exprs(args, registry, codec)?)), - ScalarFunction::Signum => { - Ok(signum(parse_expr(&args[0], registry, codec)?)) - } ScalarFunction::InitCap => { Ok(initcap(parse_expr(&args[0], registry, codec)?)) } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 39f8a913db94..ab488f3f551b 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1408,12 +1408,9 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { fn try_from(scalar: &BuiltinScalarFunction) -> Result<Self, Self::Error> { let scalar_function = match scalar { - BuiltinScalarFunction::Sqrt => Self::Sqrt, BuiltinScalarFunction::Cbrt => Self::Cbrt, - BuiltinScalarFunction::Sin => Self::Sin, BuiltinScalarFunction::Cos => Self::Cos, BuiltinScalarFunction::Cot => Self::Cot, - BuiltinScalarFunction::Sinh => Self::Sinh, BuiltinScalarFunction::Cosh => Self::Cosh, BuiltinScalarFunction::Exp => Self::Exp, BuiltinScalarFunction::Factorial => Self::Factorial, @@ -1421,13 +1418,11 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Lcm => Self::Lcm, BuiltinScalarFunction::Log => Self::Log, BuiltinScalarFunction::Degrees => Self::Degrees, - BuiltinScalarFunction::Radians => Self::Radians, BuiltinScalarFunction::Floor => Self::Floor, BuiltinScalarFunction::Ceil => Self::Ceil, BuiltinScalarFunction::Round => Self::Round, BuiltinScalarFunction::Trunc => Self::Trunc, BuiltinScalarFunction::Concat => Self::Concat, - BuiltinScalarFunction::Signum => Self::Signum, BuiltinScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator, BuiltinScalarFunction::EndsWith => Self::EndsWith, BuiltinScalarFunction::InitCap => Self::InitCap, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 22543c0dd1bf..4cd133dc21d4 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -45,11 +45,10 @@ use datafusion_expr::expr::{ }; use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore}; use datafusion_expr::{ - col, create_udaf, lit, Accumulator, AggregateFunction, BuiltinScalarFunction::Sqrt, - ColumnarValue, Expr, ExprSchemable, LogicalPlan, Operator, PartitionEvaluator, - ScalarUDF, ScalarUDFImpl, Signature, TryCast, Volatility, WindowFrame, - WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, - WindowUDFImpl, + col, create_udaf, lit, Accumulator, AggregateFunction, ColumnarValue, Expr, + ExprSchemable, LogicalPlan, Operator, PartitionEvaluator, ScalarUDF, ScalarUDFImpl, + Signature, TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, + WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, @@ -1623,13 +1622,6 @@ fn roundtrip_qualified_wildcard() { roundtrip_expr_test(test_expr, ctx); } -#[test] -fn roundtrip_sqrt() { - let test_expr = Expr::ScalarFunction(ScalarFunction::new(Sqrt, vec![col("col")])); - let ctx = SessionContext::new(); - roundtrip_expr_test(test_expr, ctx); -} - #[test] fn roundtrip_like() { fn like(negated: bool, escape_char: Option<char>) { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 4924128ae190..0238291c77e1 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -609,14 +609,14 @@ fn roundtrip_builtin_scalar_function() -> Result<()> { let input = Arc::new(EmptyExec::new(schema.clone())); - let fun_def = ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Sin); + let fun_def = ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Trunc); let expr = ScalarFunctionExpr::new( - "sin", + "trunc", fun_def, vec![col("a", &schema)?], DataType::Float64, - None, + Some(vec![Some(true)]), false, ); diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index a34f8f07fe92..f2f188105faf 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2704,7 +2704,8 @@ fn logical_plan_with_dialect_and_options( "date_trunc", vec![DataType::Utf8, DataType::Timestamp(Nanosecond, None)], DataType::Int32, - )); + )) + .with_udf(make_udf("sqrt", vec![DataType::Int64], DataType::Int64)); let planner = SqlToRel::new_with_options(&context, options); let result = DFParser::parse_sql_with_dialect(sql, dialect); let mut ast = result?;