From 58e0b599a2b7def75ce7da29632f6a859abef551 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Apr 2024 03:27:07 -0400 Subject: [PATCH 1/6] Introduce `OptimizerRule::rewrite` to rewrite in place, Rewrite `SimplifyExprs` to avoid copies (#9954) --- datafusion/core/tests/simplification.rs | 25 +-- datafusion/optimizer/src/optimizer.rs | 50 ++++-- .../simplify_expressions/simplify_exprs.rs | 152 ++++++++++-------- 3 files changed, 139 insertions(+), 88 deletions(-) diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index a0bcdda84d64..dc075e669564 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -32,6 +32,7 @@ use datafusion_expr::{ LogicalPlanBuilder, ScalarUDF, Volatility, }; use datafusion_functions::math; +use datafusion_optimizer::optimizer::Optimizer; use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions}; use datafusion_optimizer::{OptimizerContext, OptimizerRule}; use std::sync::Arc; @@ -109,14 +110,14 @@ fn test_table_scan() -> LogicalPlan { .expect("building plan") } -fn get_optimized_plan_formatted(plan: &LogicalPlan, date_time: &DateTime) -> String { +fn get_optimized_plan_formatted(plan: LogicalPlan, date_time: &DateTime) -> String { let config = OptimizerContext::new().with_query_execution_start_time(*date_time); - let rule = SimplifyExpressions::new(); - let optimized_plan = rule - .try_optimize(plan, &config) - .unwrap() - .expect("failed to optimize plan"); + // Use Optimizer to do plan traversal + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + let optimizer = Optimizer::with_rules(vec![Arc::new(SimplifyExpressions::new())]); + let optimized_plan = optimizer.optimize(plan, &config, observe).unwrap(); + format!("{optimized_plan:?}") } @@ -238,7 +239,7 @@ fn to_timestamp_expr_folded() -> Result<()> { let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS to_timestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\ \n TableScan: test" .to_string(); - let actual = get_optimized_plan_formatted(&plan, &Utc::now()); + let actual = get_optimized_plan_formatted(plan, &Utc::now()); assert_eq!(expected, actual); Ok(()) } @@ -262,7 +263,7 @@ fn now_less_than_timestamp() -> Result<()> { // expression down to a single constant (true) let expected = "Filter: Boolean(true)\ \n TableScan: test"; - let actual = get_optimized_plan_formatted(&plan, &time); + let actual = get_optimized_plan_formatted(plan, &time); assert_eq!(expected, actual); Ok(()) @@ -290,7 +291,7 @@ fn select_date_plus_interval() -> Result<()> { // expression down to a single constant (true) let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408") TableScan: test"#; - let actual = get_optimized_plan_formatted(&plan, &time); + let actual = get_optimized_plan_formatted(plan, &time); assert_eq!(expected, actual); Ok(()) @@ -308,7 +309,7 @@ fn simplify_project_scalar_fn() -> Result<()> { // after simplify: t.f as "power(t.f, 1.0)" let expected = "Projection: test.f AS power(test.f,Float64(1))\ \n TableScan: test"; - let actual = get_optimized_plan_formatted(&plan, &Utc::now()); + let actual = get_optimized_plan_formatted(plan, &Utc::now()); assert_eq!(expected, actual); Ok(()) } @@ -330,7 +331,7 @@ fn simplify_scan_predicate() -> Result<()> { // before simplify: t.g = power(t.f, 1.0) // after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)" let expected = "TableScan: test, full_filters=[g = f AS g = power(f,Float64(1))]"; - let actual = get_optimized_plan_formatted(&plan, &Utc::now()); + let actual = get_optimized_plan_formatted(plan, &Utc::now()); assert_eq!(expected, actual); Ok(()) } @@ -461,7 +462,7 @@ fn multiple_now() -> Result<()> { .build()?; // expect the same timestamp appears in both exprs - let actual = get_optimized_plan_formatted(&plan, &time); + let actual = get_optimized_plan_formatted(plan, &time); let expected = format!( "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\ \n TableScan: test", diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 032f9c57321c..ff692681ccd6 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -27,7 +27,7 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; -use datafusion_common::{DFSchema, DataFusionError, Result}; +use datafusion_common::{internal_err, DFSchema, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; @@ -69,8 +69,12 @@ use crate::utils::log_plan; /// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.add_optimizer_rule pub trait OptimizerRule { - /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be - /// optimized by this rule. + /// Try and rewrite `plan` to an optimized form, returning None if the plan + /// cannot be optimized by this rule. + /// + /// Note this API will be deprecated in the future as it requires `clone`ing + /// the input plan, which can be expensive. OptimizerRules should implement + /// [`Self::rewrite`] instead. fn try_optimize( &self, plan: &LogicalPlan, @@ -80,12 +84,31 @@ pub trait OptimizerRule { /// A human readable name for this optimizer rule fn name(&self) -> &str; - /// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details. + /// How should the rule be applied by the optimizer? See comments on + /// [`ApplyOrder`] for details. /// - /// If a rule use default None, it should traverse recursively plan inside itself + /// If returns `None`, the default, the rule must handle recursion itself fn apply_order(&self) -> Option { None } + + /// Does this rule support rewriting owned plans (rather than by reference)? + fn supports_rewrite(&self) -> bool { + false + } + + /// Try to rewrite `plan` to an optimized form, returning `Transformed::yes` + /// if the plan was rewritten and `Transformed::no` if it was not. + /// + /// Note: this function is only called if [`Self::supports_rewrite`] returns + /// true. Otherwise the Optimizer calls [`Self::try_optimize`] + fn rewrite( + &self, + _plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + internal_err!("rewrite is not implemented for {}", self.name()) + } } /// Options to control the DataFusion Optimizer. @@ -298,12 +321,19 @@ fn optimize_plan_node( rule: &dyn OptimizerRule, config: &dyn OptimizerConfig, ) -> Result> { - // TODO: add API to OptimizerRule to allow rewriting by ownership - rule.try_optimize(&plan, config) - .map(|maybe_plan| match maybe_plan { - Some(new_plan) => Transformed::yes(new_plan), + if rule.supports_rewrite() { + return rule.rewrite(plan, config); + } + + rule.try_optimize(&plan, config).map(|maybe_plan| { + match maybe_plan { + Some(new_plan) => { + // if the node was rewritten by the optimizer, replace the node + Transformed::yes(new_plan) + } None => Transformed::no(plan), - }) + } + }) } impl Optimizer { diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 4e06730133d9..17312fa6548a 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,12 +19,14 @@ use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{internal_err, DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::merge_schema; +use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use super::ExprSimplifier; @@ -46,29 +48,47 @@ use super::ExprSimplifier; pub struct SimplifyExpressions {} impl OptimizerRule for SimplifyExpressions { + fn try_optimize( + &self, + _plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + internal_err!("Should have called SimplifyExpressions::try_optimize_owned") + } + fn name(&self) -> &str { "simplify_expressions" } - fn try_optimize( + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn supports_rewrite(&self) -> bool { + true + } + + /// if supports_owned returns true, the Optimizer calls + /// [`Self::rewrite`] instead of [`Self::try_optimize`] + fn rewrite( &self, - plan: &LogicalPlan, + plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Result> { + ) -> Result, DataFusionError> { let mut execution_props = ExecutionProps::new(); execution_props.query_execution_start_time = config.query_execution_start_time(); - Ok(Some(Self::optimize_internal(plan, &execution_props)?)) + Self::optimize_internal(plan, &execution_props) } } impl SimplifyExpressions { fn optimize_internal( - plan: &LogicalPlan, + plan: LogicalPlan, execution_props: &ExecutionProps, - ) -> Result { + ) -> Result> { let schema = if !plan.inputs().is_empty() { DFSchemaRef::new(merge_schema(plan.inputs())) - } else if let LogicalPlan::TableScan(scan) = plan { + } else if let LogicalPlan::TableScan(scan) = &plan { // When predicates are pushed into a table scan, there is no input // schema to resolve predicates against, so it must be handled specially // @@ -86,13 +106,11 @@ impl SimplifyExpressions { } else { Arc::new(DFSchema::empty()) }; + let info = SimplifyContext::new(execution_props).with_schema(schema); - let new_inputs = plan - .inputs() - .iter() - .map(|input| Self::optimize_internal(input, execution_props)) - .collect::>>()?; + // Inputs have already been rewritten (due to bottom-up traversal handled by Optimizer) + // Just need to rewrite our own expressions let simplifier = ExprSimplifier::new(info); @@ -109,18 +127,22 @@ impl SimplifyExpressions { simplifier }; - let exprs = plan - .expressions() - .into_iter() - .map(|e| { + // the output schema of a filter or join is the input schema. Thus they + // can't handle aliased expressions + let use_alias = !matches!(plan, LogicalPlan::Filter(_) | LogicalPlan::Join(_)); + plan.map_expressions(|e| { + let new_e = if use_alias { // TODO: unify with `rewrite_preserving_name` let original_name = e.name_for_alias()?; - let new_e = simplifier.simplify(e)?; - new_e.alias_if_changed(original_name) - }) - .collect::>>()?; + simplifier.simplify(e)?.alias_if_changed(original_name) + } else { + simplifier.simplify(e) + }?; - plan.with_new_exprs(exprs, new_inputs) + // TODO it would be nice to have a way to know if the expression was simplified + // or not. For now conservatively return Transformed::yes + Ok(Transformed::yes(new_e)) + }) } } @@ -138,6 +160,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use chrono::{DateTime, Utc}; + use crate::optimizer::Optimizer; use datafusion_expr::logical_plan::builder::table_scan_with_filters; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ @@ -165,12 +188,12 @@ mod tests { .expect("building plan") } - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let rule = SimplifyExpressions::new(); - let optimized_plan = rule - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); + fn assert_optimized_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> { + // Use Optimizer to do plan traversal + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + let optimizer = Optimizer::with_rules(vec![Arc::new(SimplifyExpressions::new())]); + let optimized_plan = + optimizer.optimize(plan, &OptimizerContext::new(), observe)?; let formatted_plan = format!("{optimized_plan:?}"); assert_eq!(formatted_plan, expected); Ok(()) @@ -198,7 +221,7 @@ mod tests { let expected = "TableScan: test projection=[a], full_filters=[Boolean(true) AS b IS NOT NULL]"; - assert_optimized_plan_eq(&table_scan, expected) + assert_optimized_plan_eq(table_scan, expected) } #[test] @@ -210,7 +233,7 @@ mod tests { .build()?; assert_optimized_plan_eq( - &plan, + plan, "\ Filter: test.b > Int32(1)\ \n Projection: test.a\ @@ -227,7 +250,7 @@ mod tests { .build()?; assert_optimized_plan_eq( - &plan, + plan, "\ Filter: test.b > Int32(1)\ \n Projection: test.a\ @@ -244,7 +267,7 @@ mod tests { .build()?; assert_optimized_plan_eq( - &plan, + plan, "\ Filter: test.b > Int32(1)\ \n Projection: test.a\ @@ -265,7 +288,7 @@ mod tests { .build()?; assert_optimized_plan_eq( - &plan, + plan, "\ Filter: test.a > Int32(5) AND test.b < Int32(6)\ \n Projection: test.a, test.b\ @@ -288,7 +311,7 @@ mod tests { \n Filter: test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -308,7 +331,7 @@ mod tests { \n Filter: NOT test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -324,7 +347,7 @@ mod tests { \n Filter: NOT test.b AND test.c\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -340,7 +363,7 @@ mod tests { \n Filter: NOT test.b OR NOT test.c\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -356,7 +379,7 @@ mod tests { \n Filter: test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -370,7 +393,7 @@ mod tests { Projection: test.a, test.d, NOT test.b AS test.b = Boolean(false)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -392,7 +415,7 @@ mod tests { \n Projection: test.a, test.c, test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -413,20 +436,17 @@ mod tests { let expected = "\ Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } fn get_optimized_plan_formatted( - plan: &LogicalPlan, + plan: LogicalPlan, date_time: &DateTime, ) -> String { let config = OptimizerContext::new().with_query_execution_start_time(*date_time); let rule = SimplifyExpressions::new(); - let optimized_plan = rule - .try_optimize(plan, &config) - .unwrap() - .expect("failed to optimize plan"); + let optimized_plan = rule.rewrite(plan, &config).unwrap().data; format!("{optimized_plan:?}") } @@ -440,7 +460,7 @@ mod tests { let expected = "Projection: Int32(0) AS Utf8(\"0\")\ \n TableScan: test"; - let actual = get_optimized_plan_formatted(&plan, &Utc::now()); + let actual = get_optimized_plan_formatted(plan, &Utc::now()); assert_eq!(expected, actual); Ok(()) } @@ -457,7 +477,7 @@ mod tests { .project(proj)? .build()?; - let actual = get_optimized_plan_formatted(&plan, &time); + let actual = get_optimized_plan_formatted(plan, &time); let expected = "Projection: NOT test.a AS Boolean(true) OR Boolean(false) != test.a\ \n TableScan: test"; @@ -476,7 +496,7 @@ mod tests { let expected = "Filter: test.d <= Int32(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -489,7 +509,7 @@ mod tests { let expected = "Filter: test.d <= Int32(10) OR test.d >= Int32(100)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -502,7 +522,7 @@ mod tests { let expected = "Filter: test.d <= Int32(10) AND test.d >= Int32(100)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -515,7 +535,7 @@ mod tests { let expected = "Filter: test.d > Int32(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -528,7 +548,7 @@ mod tests { let expected = "Filter: test.e IS NOT NULL\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -541,7 +561,7 @@ mod tests { let expected = "Filter: test.e IS NULL\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -555,7 +575,7 @@ mod tests { "Filter: test.d != Int32(1) AND test.d != Int32(2) AND test.d != Int32(3)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -569,7 +589,7 @@ mod tests { "Filter: test.d = Int32(1) OR test.d = Int32(2) OR test.d = Int32(3)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -583,7 +603,7 @@ mod tests { let expected = "Filter: test.d < Int32(1) OR test.d > Int32(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -597,7 +617,7 @@ mod tests { let expected = "Filter: test.d >= Int32(1) AND test.d <= Int32(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -617,7 +637,7 @@ mod tests { let expected = "Filter: test.a NOT LIKE test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -637,7 +657,7 @@ mod tests { let expected = "Filter: test.a LIKE test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -657,7 +677,7 @@ mod tests { let expected = "Filter: test.a NOT ILIKE test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -670,7 +690,7 @@ mod tests { let expected = "Filter: test.d IS NOT DISTINCT FROM Int32(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -683,7 +703,7 @@ mod tests { let expected = "Filter: test.d IS DISTINCT FROM Int32(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -709,7 +729,7 @@ mod tests { \n TableScan: t1\ \n TableScan: t2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -722,7 +742,7 @@ mod tests { let expected = "Filter: Boolean(true)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } #[test] @@ -735,6 +755,6 @@ mod tests { let expected = "Filter: Boolean(false)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_eq(plan, expected) } } From e24b0589112edd65f2652d2bba9766c3cc18bc97 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Thu, 11 Apr 2024 15:36:02 +0300 Subject: [PATCH 2/6] Fix DistinctCount for timestamps with time zone (#10043) * Fix DistinctCount for timestamps with time zone Preserve the original data type in the aggregation state * Add tests for decimal count distinct --- .../src/aggregate/count_distinct/mod.rs | 42 +++++++++++-------- .../src/aggregate/count_distinct/native.rs | 15 +++++-- .../sqllogictest/test_files/aggregate.slt | 37 +++++++++++++--- .../sqllogictest/test_files/decimal.slt | 11 +++++ 4 files changed, 79 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs index 9c5605f495ea..ee63945eb249 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs @@ -109,12 +109,14 @@ impl AggregateExpr for DistinctCount { UInt16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), UInt32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), UInt64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Decimal128(_, _) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } - Decimal256(_, _) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } + dt @ Decimal128(_, _) => Box::new( + PrimitiveDistinctCountAccumulator::::new() + .with_data_type(dt.clone()), + ), + dt @ Decimal256(_, _) => Box::new( + PrimitiveDistinctCountAccumulator::::new() + .with_data_type(dt.clone()), + ), Date32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), Date64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), @@ -130,18 +132,22 @@ impl AggregateExpr for DistinctCount { Time64(Nanosecond) => { Box::new(PrimitiveDistinctCountAccumulator::::new()) } - Timestamp(Microsecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< - TimestampMicrosecondType, - >::new()), - Timestamp(Millisecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< - TimestampMillisecondType, - >::new()), - Timestamp(Nanosecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< - TimestampNanosecondType, - >::new()), - Timestamp(Second, _) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } + dt @ Timestamp(Microsecond, _) => Box::new( + PrimitiveDistinctCountAccumulator::::new() + .with_data_type(dt.clone()), + ), + dt @ Timestamp(Millisecond, _) => Box::new( + PrimitiveDistinctCountAccumulator::::new() + .with_data_type(dt.clone()), + ), + dt @ Timestamp(Nanosecond, _) => Box::new( + PrimitiveDistinctCountAccumulator::::new() + .with_data_type(dt.clone()), + ), + dt @ Timestamp(Second, _) => Box::new( + PrimitiveDistinctCountAccumulator::::new() + .with_data_type(dt.clone()), + ), Float16 => Box::new(FloatDistinctCountAccumulator::::new()), Float32 => Box::new(FloatDistinctCountAccumulator::::new()), diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs index a44e8b772e5a..8f3ce8acfe07 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -30,6 +30,7 @@ use ahash::RandomState; use arrow::array::ArrayRef; use arrow_array::types::ArrowPrimitiveType; use arrow_array::PrimitiveArray; +use arrow_schema::DataType; use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::array_into_list_array; @@ -45,6 +46,7 @@ where T::Native: Eq + Hash, { values: HashSet, + data_type: DataType, } impl PrimitiveDistinctCountAccumulator @@ -55,8 +57,14 @@ where pub(super) fn new() -> Self { Self { values: HashSet::default(), + data_type: T::DATA_TYPE, } } + + pub(super) fn with_data_type(mut self, data_type: DataType) -> Self { + self.data_type = data_type; + self + } } impl Accumulator for PrimitiveDistinctCountAccumulator @@ -65,9 +73,10 @@ where T::Native: Eq + Hash, { fn state(&mut self) -> datafusion_common::Result> { - let arr = Arc::new(PrimitiveArray::::from_iter_values( - self.values.iter().cloned(), - )) as ArrayRef; + let arr = Arc::new( + PrimitiveArray::::from_iter_values(self.values.iter().cloned()) + .with_data_type(self.data_type.clone()), + ); let list = Arc::new(array_into_list_array(arr)); Ok(vec![ScalarValue::List(list)]) } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 4929ab485d6d..966236db2732 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1876,18 +1876,22 @@ select arrow_cast(column1, 'Timestamp(Microsecond, None)') as micros, arrow_cast(column1, 'Timestamp(Millisecond, None)') as millis, arrow_cast(column1, 'Timestamp(Second, None)') as secs, + arrow_cast(column1, 'Timestamp(Nanosecond, Some("UTC"))') as nanos_utc, + arrow_cast(column1, 'Timestamp(Microsecond, Some("UTC"))') as micros_utc, + arrow_cast(column1, 'Timestamp(Millisecond, Some("UTC"))') as millis_utc, + arrow_cast(column1, 'Timestamp(Second, Some("UTC"))') as secs_utc, column2 as names, column3 as tag from t_source; # Demonstate the contents -query PPPPTT +query PPPPPPPPTT select * from t; ---- -2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 X -2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 X -NULL NULL NULL NULL Row 2 Y -2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 Y +2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 2018-11-13T17:11:10.011375885Z 2018-11-13T17:11:10.011375Z 2018-11-13T17:11:10.011Z 2018-11-13T17:11:10Z Row 0 X +2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 2011-12-13T11:13:10.123450Z 2011-12-13T11:13:10.123450Z 2011-12-13T11:13:10.123Z 2011-12-13T11:13:10Z Row 1 X +NULL NULL NULL NULL NULL NULL NULL NULL Row 2 Y +2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 2021-01-01T05:11:10.432Z 2021-01-01T05:11:10.432Z 2021-01-01T05:11:10.432Z 2021-01-01T05:11:10Z Row 3 Y # aggregate_timestamps_sum @@ -1933,6 +1937,17 @@ SELECT tag, max(nanos), max(micros), max(millis), max(secs) FROM t GROUP BY tag X 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Y 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 +# aggregate_timestamps_count_distinct_with_tz +query IIII +SELECT count(DISTINCT nanos_utc), count(DISTINCT micros_utc), count(DISTINCT millis_utc), count(DISTINCT secs_utc) FROM t; +---- +3 3 3 3 + +query TIIII +SELECT tag, count(DISTINCT nanos_utc), count(DISTINCT micros_utc), count(DISTINCT millis_utc), count(DISTINCT secs_utc) FROM t GROUP BY tag ORDER BY tag; +---- +X 2 2 2 2 +Y 1 1 1 1 # aggregate_timestamps_avg statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Timestamp\(Nanosecond, None\)\)'\. You might need to add explicit type casts\. @@ -2285,6 +2300,18 @@ select c2, avg(c1), arrow_typeof(avg(c1)) from d_table GROUP BY c2 ORDER BY c2 A 110.0045 Decimal128(14, 7) B -100.0045 Decimal128(14, 7) +# aggregate_decimal_count_distinct +query I +select count(DISTINCT cast(c1 AS DECIMAL(10, 2))) from d_table +---- +4 + +query TI +select c2, count(DISTINCT cast(c1 AS DECIMAL(10, 2))) from d_table GROUP BY c2 ORDER BY c2 +---- +A 2 +B 2 + # Use PostgresSQL dialect statement ok set datafusion.sql_parser.dialect = 'Postgres'; diff --git a/datafusion/sqllogictest/test_files/decimal.slt b/datafusion/sqllogictest/test_files/decimal.slt index c220a5fc9a52..3f75e42d9304 100644 --- a/datafusion/sqllogictest/test_files/decimal.slt +++ b/datafusion/sqllogictest/test_files/decimal.slt @@ -720,5 +720,16 @@ select count(*),c1 from decimal256_simple group by c1 order by c1; 4 0.00004 5 0.00005 +query I +select count(DISTINCT cast(c1 AS DECIMAL(42, 4))) from decimal256_simple; +---- +2 + +query BI +select c4, count(DISTINCT cast(c1 AS DECIMAL(42, 4))) from decimal256_simple GROUP BY c4 ORDER BY c4; +---- +false 2 +true 2 + statement ok drop table decimal256_simple; From f55c1d90215614ce531a4103c7dbebf318de1cfd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Apr 2024 10:59:44 -0400 Subject: [PATCH 3/6] Improve documentation on `LogicalPlan` TreeNode methods (#10037) --- datafusion/expr/src/logical_plan/tree_node.rs | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 1eb9d50277dd..3644f89e8b42 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -451,7 +451,10 @@ macro_rules! handle_transform_recursion_up { impl LogicalPlan { /// Calls `f` on all expressions in the current `LogicalPlan` node. /// - /// Note this does not include expressions in child `LogicalPlan` nodes. + /// # Notes + /// * Similar to [`TreeNode::apply`] but for this node's expressions. + /// * Does not include expressions in input `LogicalPlan` nodes + /// * Visits only the top level expressions (Does not recurse into each expression) pub fn apply_expressions Result>( &self, mut f: F, @@ -541,7 +544,9 @@ impl LogicalPlan { /// /// Returns the current node. /// - /// Note this does not include expressions in child `LogicalPlan` nodes. + /// # Notes + /// * Similar to [`TreeNode::map_children`] but for this node's expressions. + /// * Visits only the top level expressions (Does not recurse into each expression) pub fn map_expressions Result>>( self, mut f: F, @@ -757,7 +762,8 @@ impl LogicalPlan { }) } - /// Visits a plan similarly to [`Self::visit`], but including embedded subqueries. + /// Visits a plan similarly to [`Self::visit`], including subqueries that + /// may appear in expressions such as `IN (SELECT ...)`. pub fn visit_with_subqueries>( &self, visitor: &mut V, @@ -771,7 +777,9 @@ impl LogicalPlan { .visit_parent(|| visitor.f_up(self)) } - /// Rewrites a plan similarly t [`Self::visit`], but including embedded subqueries. + /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn rewrite_with_subqueries>( self, rewriter: &mut R, @@ -783,10 +791,9 @@ impl LogicalPlan { ) } - /// Calls `f` recursively on all children of the `LogicalPlan` node. - /// - /// Unlike [`Self::apply`], this method *does* includes `LogicalPlan`s that - /// are referenced in `Expr`s + /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn apply_with_subqueries Result>( &self, f: &mut F, @@ -796,6 +803,9 @@ impl LogicalPlan { .visit_sibling(|| self.apply_children(|c| c.apply_with_subqueries(f))) } + /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn transform_with_subqueries Result>>( self, f: &F, @@ -803,6 +813,9 @@ impl LogicalPlan { self.transform_up_with_subqueries(f) } + /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn transform_down_with_subqueries Result>>( self, f: &F, @@ -810,6 +823,9 @@ impl LogicalPlan { handle_transform_recursion_down!(f(self), |c| c.transform_down_with_subqueries(f)) } + /// Similarly to [`Self::transform_down_mut`], rewrites this node and its inputs using `f`, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn transform_down_mut_with_subqueries< F: FnMut(Self) -> Result>, >( @@ -820,6 +836,9 @@ impl LogicalPlan { .transform_down_mut_with_subqueries(f)) } + /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn transform_up_with_subqueries Result>>( self, f: &F, @@ -836,6 +855,9 @@ impl LogicalPlan { handle_transform_recursion_up!(self, |c| c.transform_up_mut_with_subqueries(f), f) } + /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`, + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn transform_down_up_with_subqueries< FD: FnMut(Self) -> Result>, FU: FnMut(Self) -> Result>, @@ -851,8 +873,9 @@ impl LogicalPlan { ) } - /// Calls `f` on all subqueries referenced in expressions of the current - /// `LogicalPlan` node. + /// Similarly to [`Self::apply`], calls `f` on this node and its inputs + /// including subqueries that may appear in expressions such as `IN (SELECT + /// ...)`. pub fn apply_subqueries Result>( &self, mut f: F, @@ -872,8 +895,8 @@ impl LogicalPlan { }) } - /// Rewrites all subquery `LogicalPlan` in the current `LogicalPlan` node - /// using `f`. + /// Similarly to [`Self::map_children`], rewrites all subqueries that may + /// appear in expressions such as `IN (SELECT ...)` using `f`. /// /// Returns the current node. pub fn map_subqueries Result>>( From 7dc20b81de8cf50bdb2b90e5f216a6324b79e340 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 11 Apr 2024 18:22:01 +0200 Subject: [PATCH 4/6] chore(deps): update prost-build requirement from =0.12.3 to =0.12.4 (#10045) * chore(deps): update prost-build requirement from =0.12.3 to =0.12.4 Updates the requirements on [prost-build](https://github.com/tokio-rs/prost) to permit the latest version. - [Release notes](https://github.com/tokio-rs/prost/releases) - [Commits](https://github.com/tokio-rs/prost/compare/v0.12.3...v0.12.4) --- updated-dependencies: - dependency-name: prost-build dependency-type: direct:production ... Signed-off-by: dependabot[bot] * re-gen protobuf code --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- datafusion/proto/gen/Cargo.toml | 2 +- datafusion/proto/src/generated/prost.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/gen/Cargo.toml b/datafusion/proto/gen/Cargo.toml index e843827a91ac..01ce92ee9e8e 100644 --- a/datafusion/proto/gen/Cargo.toml +++ b/datafusion/proto/gen/Cargo.toml @@ -32,4 +32,4 @@ publish = false [dependencies] # Pin these dependencies so that the generated output is deterministic pbjson-build = "=0.6.2" -prost-build = "=0.12.3" +prost-build = "=0.12.4" diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 042c794e19de..d6a27dbc5652 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnRelation { From feb9100432d453f19d40428265d2aa9a9f942d5d Mon Sep 17 00:00:00 2001 From: colommar Date: Fri, 12 Apr 2024 02:55:25 +0800 Subject: [PATCH 5/6] fix#9501 (#10028) --- benchmarks/queries/clickbench/README.md | 4 +- datafusion-cli/src/exec.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 2 +- datafusion/optimizer/README.md | 10 +- docs/source/library-user-guide/adding-udfs.md | 2 +- docs/source/user-guide/cli.md | 20 ++-- docs/source/user-guide/example-usage.md | 2 +- .../source/user-guide/sql/scalar_functions.md | 102 +++++++++--------- 9 files changed, 73 insertions(+), 73 deletions(-) diff --git a/benchmarks/queries/clickbench/README.md b/benchmarks/queries/clickbench/README.md index ef540ccf9c91..29b1a7588f17 100644 --- a/benchmarks/queries/clickbench/README.md +++ b/benchmarks/queries/clickbench/README.md @@ -63,7 +63,7 @@ LIMIT 10; Here are some interesting statistics about the data used in the queries Max length of `"SearchPhrase"` is 1113 characters ```sql -❯ select min(length("SearchPhrase")) as "SearchPhrase_len_min", max(length("SearchPhrase")) "SearchPhrase_len_max" from 'hits.parquet' limit 10; +> select min(length("SearchPhrase")) as "SearchPhrase_len_min", max(length("SearchPhrase")) "SearchPhrase_len_max" from 'hits.parquet' limit 10; +----------------------+----------------------+ | SearchPhrase_len_min | SearchPhrase_len_max | +----------------------+----------------------+ @@ -74,7 +74,7 @@ Max length of `"SearchPhrase"` is 1113 characters Here is the schema of the data ```sql -❯ describe 'hits.parquet'; +> describe 'hits.parquet'; +-----------------------+-----------+-------------+ | column_name | data_type | is_nullable | +-----------------------+-----------+-------------+ diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index be7e5275e8b2..2072cc7df002 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -131,7 +131,7 @@ pub async fn exec_from_repl( rl.load_history(".history").ok(); loop { - match rl.readline("❯ ") { + match rl.readline("> ") { Ok(line) if line.starts_with('\\') => { rl.add_history_entry(line.trim_end())?; let command = line.split_whitespace().collect::>().join(" "); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 2b9665954842..a4dfd9b96870 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1212,7 +1212,7 @@ mod tests { /// Return a test for data_index_bloom_encoding_stats.parquet /// Note the values in the `String` column are: /// ```sql - /// ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet'; + /// > select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet'; /// +-----------+ /// | String | /// +-----------+ diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d16dfb140353..cb8c97c71e38 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2127,7 +2127,7 @@ pub struct Prepare { /// # Example output: /// /// ```sql -/// ❯ describe traces; +/// > describe traces; /// +--------------------+-----------------------------+-------------+ /// | column_name | data_type | is_nullable | /// +--------------------+-----------------------------+-------------+ diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 4f9e0fb98526..b0f4c5a72014 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -104,7 +104,7 @@ Every expression in DataFusion has a name, which is used as the column name. For contains a single column with the name `"COUNT(aggregate_test_100.c9)"`: ```text -❯ select count(c9) from aggregate_test_100; +> select count(c9) from aggregate_test_100; +------------------------------+ | COUNT(aggregate_test_100.c9) | +------------------------------+ @@ -116,7 +116,7 @@ These names are used to refer to the columns in both subqueries as well as inter to another. For example: ```text -❯ select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq; +> select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq; +--------------------------------------------+ | sq.COUNT(aggregate_test_100.c9) + Int64(1) | +--------------------------------------------+ @@ -134,7 +134,7 @@ Here is a simple example of such a rewrite. The expression `1 + 2` can be intern displayed the same as `1 + 2`: ```text -❯ select 1 + 2; +> select 1 + 2; +---------------------+ | Int64(1) + Int64(2) | +---------------------+ @@ -146,7 +146,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re `3 as "1 + 2"`: ```text -❯ explain select 1 + 2; +> explain select 1 + 2; +---------------+-------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------+ @@ -289,7 +289,7 @@ The `EXPLAIN VERBOSE` command can be used to show the effect of each optimizatio In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time. ```text -❯ explain verbose select cast(1 + 2.2 as string) as foo; +> explain verbose select cast(1 + 2.2 as string) as foo; +------------------------------------------------------------+---------------------------------------------------------------------------+ | plan_type | plan | +------------------------------------------------------------+---------------------------------------------------------------------------+ diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index ad210724103d..653c1f9d3784 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -536,7 +536,7 @@ Because we're returning a `TableProvider`, in this example we'll use the `MemTab While this is a simple example for illustrative purposes, UDTFs have a lot of potential use cases. And can be particularly useful for reading data from external sources and interactive analysis. For example, see the [example][4] for a working example that reads from a CSV file. As another example, you could use the built-in UDTF `parquet_metadata` in the CLI to read the metadata from a Parquet file. ```console -❯ select filename, row_group_id, row_group_num_rows, row_group_bytes, stats_min, stats_max from parquet_metadata('./benchmarks/data/hits.parquet') where column_id = 17 limit 10; +> select filename, row_group_id, row_group_num_rows, row_group_bytes, stats_min, stats_max from parquet_metadata('./benchmarks/data/hits.parquet') where column_id = 17 limit 10; +--------------------------------+--------------+--------------------+-----------------+-----------+-----------+ | filename | row_group_id | row_group_num_rows | row_group_bytes | stats_min | stats_max | +--------------------------------+--------------+--------------------+-----------------+-----------+-----------+ diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index da4c9870545a..9c3fc8bd60c8 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -165,7 +165,7 @@ Query that single file (the CLI also supports parquet, compressed csv, avro, jso ```shell $ datafusion-cli DataFusion CLI v17.0.0 -❯ select * from 'data.csv'; +> select * from 'data.csv'; +---+---+ | a | b | +---+---+ @@ -184,7 +184,7 @@ data.csv data2.csv ```shell $ datafusion-cli DataFusion CLI v16.0.0 -❯ select * from 'data_dir'; +> select * from 'data_dir'; +---+---+ | a | b | +---+---+ @@ -335,9 +335,9 @@ $ export AWS_ACCESS_KEY_ID=****** $ datafusion-cli DataFusion CLI v21.0.0 -❯ create external table test stored as parquet location 's3://bucket/path/file.parquet'; +> create external table test stored as parquet location 's3://bucket/path/file.parquet'; 0 rows in set. Query took 0.374 seconds. -❯ select * from test; +> select * from test; +----------+----------+ | column_1 | column_2 | +----------+----------+ @@ -429,9 +429,9 @@ $ export GOOGLE_SERVICE_ACCOUNT=/tmp/gcs.json $ datafusion-cli DataFusion CLI v21.0.0 -❯ create external table test stored as parquet location 'gs://bucket/path/file.parquet'; +> create external table test stored as parquet location 'gs://bucket/path/file.parquet'; 0 rows in set. Query took 0.374 seconds. -❯ select * from test; +> select * from test; +----------+----------+ | column_1 | column_2 | +----------+----------+ @@ -619,7 +619,7 @@ appropriately: ```shell $ DATAFUSION_EXECUTION_BATCH_SIZE=1024 datafusion-cli DataFusion CLI v12.0.0 -❯ show all; +> show all; +-------------------------------------------------+---------+ | name | value | +-------------------------------------------------+---------+ @@ -639,7 +639,7 @@ You can change the configuration options using `SET` statement as well ```shell $ datafusion-cli DataFusion CLI v13.0.0 -❯ show datafusion.execution.batch_size; +> show datafusion.execution.batch_size; +---------------------------------+---------+ | name | value | +---------------------------------+---------+ @@ -647,10 +647,10 @@ DataFusion CLI v13.0.0 +---------------------------------+---------+ 1 row in set. Query took 0.011 seconds. -❯ set datafusion.execution.batch_size to 1024; +> set datafusion.execution.batch_size to 1024; 0 rows in set. Query took 0.000 seconds. -❯ show datafusion.execution.batch_size; +> show datafusion.execution.batch_size; +---------------------------------+---------+ | name | value | +---------------------------------+---------+ diff --git a/docs/source/user-guide/example-usage.md b/docs/source/user-guide/example-usage.md index 31b599ac3308..6e4bf68fa018 100644 --- a/docs/source/user-guide/example-usage.md +++ b/docs/source/user-guide/example-usage.md @@ -261,7 +261,7 @@ Set environment [variables](https://doc.rust-lang.org/std/backtrace/index.html#e ```bash RUST_BACKTRACE=1 ./target/debug/datafusion-cli DataFusion CLI v31.0.0 -❯ select row_numer() over (partition by a order by a) from (select 1 a); +> select row_numer() over (partition by a order by a) from (select 1 a); Error during planning: Invalid function 'row_numer'. Did you mean 'ROW_NUMBER'? diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 62b81ea7ea4b..217bd5f05a86 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -1650,13 +1650,13 @@ make_date(year, month, day) #### Example ``` -❯ select make_date(2023, 1, 31); +> select make_date(2023, 1, 31); +-------------------------------------------+ | make_date(Int64(2023),Int64(1),Int64(31)) | +-------------------------------------------+ | 2023-01-31 | +-------------------------------------------+ -❯ select make_date('2023', '01', '31'); +> select make_date('2023', '01', '31'); +-----------------------------------------------+ | make_date(Utf8("2023"),Utf8("01"),Utf8("31")) | +-----------------------------------------------+ @@ -1686,7 +1686,7 @@ to_char(expression, format) #### Example ``` -❯ ❯ select to_char('2023-03-01'::date, '%d-%m-%Y'); +> > select to_char('2023-03-01'::date, '%d-%m-%Y'); +----------------------------------------------+ | to_char(Utf8("2023-03-01"),Utf8("%d-%m-%Y")) | +----------------------------------------------+ @@ -1731,13 +1731,13 @@ to_timestamp(expression[, ..., format_n]) #### Example ``` -❯ select to_timestamp('2023-01-31T09:26:56.123456789-05:00'); +> select to_timestamp('2023-01-31T09:26:56.123456789-05:00'); +-----------------------------------------------------------+ | to_timestamp(Utf8("2023-01-31T09:26:56.123456789-05:00")) | +-----------------------------------------------------------+ | 2023-01-31T14:26:56.123456789 | +-----------------------------------------------------------+ -❯ select to_timestamp('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +> select to_timestamp('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +--------------------------------------------------------------------------------------------------------+ | to_timestamp(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) | +--------------------------------------------------------------------------------------------------------+ @@ -1770,13 +1770,13 @@ to_timestamp_millis(expression[, ..., format_n]) #### Example ``` -❯ select to_timestamp_millis('2023-01-31T09:26:56.123456789-05:00'); +> select to_timestamp_millis('2023-01-31T09:26:56.123456789-05:00'); +------------------------------------------------------------------+ | to_timestamp_millis(Utf8("2023-01-31T09:26:56.123456789-05:00")) | +------------------------------------------------------------------+ | 2023-01-31T14:26:56.123 | +------------------------------------------------------------------+ -❯ select to_timestamp_millis('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +> select to_timestamp_millis('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +---------------------------------------------------------------------------------------------------------------+ | to_timestamp_millis(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) | +---------------------------------------------------------------------------------------------------------------+ @@ -1809,13 +1809,13 @@ to_timestamp_micros(expression[, ..., format_n]) #### Example ``` -❯ select to_timestamp_micros('2023-01-31T09:26:56.123456789-05:00'); +> select to_timestamp_micros('2023-01-31T09:26:56.123456789-05:00'); +------------------------------------------------------------------+ | to_timestamp_micros(Utf8("2023-01-31T09:26:56.123456789-05:00")) | +------------------------------------------------------------------+ | 2023-01-31T14:26:56.123456 | +------------------------------------------------------------------+ -❯ select to_timestamp_micros('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +> select to_timestamp_micros('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +---------------------------------------------------------------------------------------------------------------+ | to_timestamp_micros(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) | +---------------------------------------------------------------------------------------------------------------+ @@ -1848,13 +1848,13 @@ to_timestamp_nanos(expression[, ..., format_n]) #### Example ``` -❯ select to_timestamp_nanos('2023-01-31T09:26:56.123456789-05:00'); +> select to_timestamp_nanos('2023-01-31T09:26:56.123456789-05:00'); +-----------------------------------------------------------------+ | to_timestamp_nanos(Utf8("2023-01-31T09:26:56.123456789-05:00")) | +-----------------------------------------------------------------+ | 2023-01-31T14:26:56.123456789 | +-----------------------------------------------------------------+ -❯ select to_timestamp_nanos('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +> select to_timestamp_nanos('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +--------------------------------------------------------------------------------------------------------------+ | to_timestamp_nanos(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) | +--------------------------------------------------------------------------------------------------------------+ @@ -1887,13 +1887,13 @@ to_timestamp_seconds(expression[, ..., format_n]) #### Example ``` -❯ select to_timestamp_seconds('2023-01-31T09:26:56.123456789-05:00'); +> select to_timestamp_seconds('2023-01-31T09:26:56.123456789-05:00'); +-------------------------------------------------------------------+ | to_timestamp_seconds(Utf8("2023-01-31T09:26:56.123456789-05:00")) | +-------------------------------------------------------------------+ | 2023-01-31T14:26:56 | +-------------------------------------------------------------------+ -❯ select to_timestamp_seconds('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +> select to_timestamp_seconds('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +----------------------------------------------------------------------------------------------------------------+ | to_timestamp_seconds(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) | +----------------------------------------------------------------------------------------------------------------+ @@ -2022,7 +2022,7 @@ array_append(array, element) #### Example ``` -❯ select array_append([1, 2, 3], 4); +> select array_append([1, 2, 3], 4); +--------------------------------------+ | array_append(List([1,2,3]),Int64(4)) | +--------------------------------------+ @@ -2054,7 +2054,7 @@ array_sort(array, desc, nulls_first) #### Example ``` -❯ select array_sort([3, 1, 2]); +> select array_sort([3, 1, 2]); +-----------------------------+ | array_sort(List([3,1,2])) | +-----------------------------+ @@ -2084,7 +2084,7 @@ array_resize(array, size, value) #### Example ``` -❯ select array_resize([1, 2, 3], 5, 0); +> select array_resize([1, 2, 3], 5, 0); +-------------------------------------+ | array_resize(List([1,2,3],5,0)) | +-------------------------------------+ @@ -2117,7 +2117,7 @@ array_concat(array[, ..., array_n]) #### Example ``` -❯ select array_concat([1, 2], [3, 4], [5, 6]); +> select array_concat([1, 2], [3, 4], [5, 6]); +---------------------------------------------------+ | array_concat(List([1,2]),List([3,4]),List([5,6])) | +---------------------------------------------------+ @@ -2208,7 +2208,7 @@ array_dims(array) #### Example ``` -❯ select array_dims([[1, 2, 3], [4, 5, 6]]); +> select array_dims([[1, 2, 3], [4, 5, 6]]); +---------------------------------+ | array_dims(List([1,2,3,4,5,6])) | +---------------------------------+ @@ -2236,7 +2236,7 @@ array_distinct(array) #### Example ``` -❯ select array_distinct([1, 3, 2, 3, 1, 2, 4]); +> select array_distinct([1, 3, 2, 3, 1, 2, 4]); +---------------------------------+ | array_distinct(List([1,2,3,4])) | +---------------------------------+ @@ -2265,7 +2265,7 @@ array_element(array, index) #### Example ``` -❯ select array_element([1, 2, 3, 4], 3); +> select array_element([1, 2, 3, 4], 3); +-----------------------------------------+ | array_element(List([1,2,3,4]),Int64(3)) | +-----------------------------------------+ @@ -2339,13 +2339,13 @@ array_intersect(array1, array2) #### Example ``` -❯ select array_intersect([1, 2, 3, 4], [5, 6, 3, 4]); +> select array_intersect([1, 2, 3, 4], [5, 6, 3, 4]); +----------------------------------------------------+ | array_intersect([1, 2, 3, 4], [5, 6, 3, 4]); | +----------------------------------------------------+ | [3, 4] | +----------------------------------------------------+ -❯ select array_intersect([1, 2, 3, 4], [5, 6, 7, 8]); +> select array_intersect([1, 2, 3, 4], [5, 6, 7, 8]); +----------------------------------------------------+ | array_intersect([1, 2, 3, 4], [5, 6, 7, 8]); | +----------------------------------------------------+ @@ -2380,7 +2380,7 @@ array_length(array, dimension) #### Example ``` -❯ select array_length([1, 2, 3, 4, 5]); +> select array_length([1, 2, 3, 4, 5]); +---------------------------------+ | array_length(List([1,2,3,4,5])) | +---------------------------------+ @@ -2408,7 +2408,7 @@ array_ndims(array, element) #### Example ``` -❯ select array_ndims([[1, 2, 3], [4, 5, 6]]); +> select array_ndims([[1, 2, 3], [4, 5, 6]]); +----------------------------------+ | array_ndims(List([1,2,3,4,5,6])) | +----------------------------------+ @@ -2437,7 +2437,7 @@ array_prepend(element, array) #### Example ``` -❯ select array_prepend(1, [2, 3, 4]); +> select array_prepend(1, [2, 3, 4]); +---------------------------------------+ | array_prepend(Int64(1),List([2,3,4])) | +---------------------------------------+ @@ -2467,7 +2467,7 @@ array_pop_front(array) #### Example ``` -❯ select array_pop_front([1, 2, 3]); +> select array_pop_front([1, 2, 3]); +-------------------------------+ | array_pop_front(List([1,2,3])) | +-------------------------------+ @@ -2495,7 +2495,7 @@ array_pop_back(array) #### Example ``` -❯ select array_pop_back([1, 2, 3]); +> select array_pop_back([1, 2, 3]); +-------------------------------+ | array_pop_back(List([1,2,3])) | +-------------------------------+ @@ -2526,7 +2526,7 @@ array_position(array, element, index) #### Example ``` -❯ select array_position([1, 2, 2, 3, 1, 4], 2); +> select array_position([1, 2, 2, 3, 1, 4], 2); +----------------------------------------------+ | array_position(List([1,2,2,3,1,4]),Int64(2)) | +----------------------------------------------+ @@ -2557,7 +2557,7 @@ array_positions(array, element) #### Example ``` -❯ select array_positions([1, 2, 2, 3, 1, 4], 2); +> select array_positions([1, 2, 2, 3, 1, 4], 2); +-----------------------------------------------+ | array_positions(List([1,2,2,3,1,4]),Int64(2)) | +-----------------------------------------------+ @@ -2594,7 +2594,7 @@ array_repeat(element, count) #### Example ``` -❯ select array_repeat(1, 3); +> select array_repeat(1, 3); +---------------------------------+ | array_repeat(Int64(1),Int64(3)) | +---------------------------------+ @@ -2603,7 +2603,7 @@ array_repeat(element, count) ``` ``` -❯ select array_repeat([1, 2], 2); +> select array_repeat([1, 2], 2); +------------------------------------+ | array_repeat(List([1,2]),Int64(2)) | +------------------------------------+ @@ -2632,7 +2632,7 @@ array_remove(array, element) #### Example ``` -❯ select array_remove([1, 2, 2, 3, 2, 1, 4], 2); +> select array_remove([1, 2, 2, 3, 2, 1, 4], 2); +----------------------------------------------+ | array_remove(List([1,2,2,3,2,1,4]),Int64(2)) | +----------------------------------------------+ @@ -2662,7 +2662,7 @@ array_remove_n(array, element, max) #### Example ``` -❯ select array_remove_n([1, 2, 2, 3, 2, 1, 4], 2, 2); +> select array_remove_n([1, 2, 2, 3, 2, 1, 4], 2, 2); +---------------------------------------------------------+ | array_remove_n(List([1,2,2,3,2,1,4]),Int64(2),Int64(2)) | +---------------------------------------------------------+ @@ -2691,7 +2691,7 @@ array_remove_all(array, element) #### Example ``` -❯ select array_remove_all([1, 2, 2, 3, 2, 1, 4], 2); +> select array_remove_all([1, 2, 2, 3, 2, 1, 4], 2); +--------------------------------------------------+ | array_remove_all(List([1,2,2,3,2,1,4]),Int64(2)) | +--------------------------------------------------+ @@ -2721,7 +2721,7 @@ array_replace(array, from, to) #### Example ``` -❯ select array_replace([1, 2, 2, 3, 2, 1, 4], 2, 5); +> select array_replace([1, 2, 2, 3, 2, 1, 4], 2, 5); +--------------------------------------------------------+ | array_replace(List([1,2,2,3,2,1,4]),Int64(2),Int64(5)) | +--------------------------------------------------------+ @@ -2752,7 +2752,7 @@ array_replace_n(array, from, to, max) #### Example ``` -❯ select array_replace_n([1, 2, 2, 3, 2, 1, 4], 2, 5, 2); +> select array_replace_n([1, 2, 2, 3, 2, 1, 4], 2, 5, 2); +-------------------------------------------------------------------+ | array_replace_n(List([1,2,2,3,2,1,4]),Int64(2),Int64(5),Int64(2)) | +-------------------------------------------------------------------+ @@ -2782,7 +2782,7 @@ array_replace_all(array, from, to) #### Example ``` -❯ select array_replace_all([1, 2, 2, 3, 2, 1, 4], 2, 5); +> select array_replace_all([1, 2, 2, 3, 2, 1, 4], 2, 5); +------------------------------------------------------------+ | array_replace_all(List([1,2,2,3,2,1,4]),Int64(2),Int64(5)) | +------------------------------------------------------------+ @@ -2810,7 +2810,7 @@ array_reverse(array) #### Example ``` -❯ select array_reverse([1, 2, 3, 4]); +> select array_reverse([1, 2, 3, 4]); +------------------------------------------------------------+ | array_reverse(List([1, 2, 3, 4])) | +------------------------------------------------------------+ @@ -2843,7 +2843,7 @@ array_slice(array, begin, end) #### Example ``` -❯ select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6); +> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6); +--------------------------------------------------------+ | array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) | +--------------------------------------------------------+ @@ -2872,7 +2872,7 @@ array_to_string(array, delimiter) #### Example ``` -❯ select array_to_string([[1, 2, 3, 4], [5, 6, 7, 8]], ','); +> select array_to_string([[1, 2, 3, 4], [5, 6, 7, 8]], ','); +----------------------------------------------------+ | array_to_string(List([1,2,3,4,5,6,7,8]),Utf8(",")) | +----------------------------------------------------+ @@ -2904,13 +2904,13 @@ array_union(array1, array2) #### Example ``` -❯ select array_union([1, 2, 3, 4], [5, 6, 3, 4]); +> select array_union([1, 2, 3, 4], [5, 6, 3, 4]); +----------------------------------------------------+ | array_union([1, 2, 3, 4], [5, 6, 3, 4]); | +----------------------------------------------------+ | [1, 2, 3, 4, 5, 6] | +----------------------------------------------------+ -❯ select array_union([1, 2, 3, 4], [5, 6, 7, 8]); +> select array_union([1, 2, 3, 4], [5, 6, 7, 8]); +----------------------------------------------------+ | array_union([1, 2, 3, 4], [5, 6, 7, 8]); | +----------------------------------------------------+ @@ -2942,13 +2942,13 @@ array_except(array1, array2) #### Example ``` -❯ select array_except([1, 2, 3, 4], [5, 6, 3, 4]); +> select array_except([1, 2, 3, 4], [5, 6, 3, 4]); +----------------------------------------------------+ | array_except([1, 2, 3, 4], [5, 6, 3, 4]); | +----------------------------------------------------+ | [1, 2] | +----------------------------------------------------+ -❯ select array_except([1, 2, 3, 4], [3, 4, 5, 6]); +> select array_except([1, 2, 3, 4], [3, 4, 5, 6]); +----------------------------------------------------+ | array_except([1, 2, 3, 4], [3, 4, 5, 6]); | +----------------------------------------------------+ @@ -2978,7 +2978,7 @@ cardinality(array) #### Example ``` -❯ select cardinality([[1, 2, 3, 4], [5, 6, 7, 8]]); +> select cardinality([[1, 2, 3, 4], [5, 6, 7, 8]]); +--------------------------------------+ | cardinality(List([1,2,3,4,5,6,7,8])) | +--------------------------------------+ @@ -3002,7 +3002,7 @@ empty(array) #### Example ``` -❯ select empty([1]); +> select empty([1]); +------------------+ | empty(List([1])) | +------------------+ @@ -3032,7 +3032,7 @@ generate_series(start, stop, step) #### Example ``` -❯ select generate_series(1,3); +> select generate_series(1,3); +------------------------------------+ | generate_series(Int64(1),Int64(3)) | +------------------------------------+ @@ -3209,7 +3209,7 @@ _Alias of [empty](#empty)._ #### Example ``` -❯ select make_array(1, 2, 3, 4, 5); +> select make_array(1, 2, 3, 4, 5); +----------------------------------------------------------+ | make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) | +----------------------------------------------------------+ @@ -3337,7 +3337,7 @@ select * from t; +---+---+ -- use default names `c0`, `c1` -❯ select struct(a, b) from t; +> select struct(a, b) from t; +-----------------+ | struct(t.a,t.b) | +-----------------+ @@ -3520,7 +3520,7 @@ arrow_cast(expression, datatype) #### Example ``` -❯ select arrow_cast(-5, 'Int8') as a, +> select arrow_cast(-5, 'Int8') as a, arrow_cast('foo', 'Dictionary(Int32, Utf8)') as b, arrow_cast('bar', 'LargeUtf8') as c, arrow_cast('2023-01-02T12:53:02', 'Timestamp(Microsecond, Some("+08:00"))') as d @@ -3550,7 +3550,7 @@ arrow_typeof(expression) #### Example ``` -❯ select arrow_typeof('foo'), arrow_typeof(1); +> select arrow_typeof('foo'), arrow_typeof(1); +---------------------------+------------------------+ | arrow_typeof(Utf8("foo")) | arrow_typeof(Int64(1)) | +---------------------------+------------------------+ From 118eecdc8384805cae752dac0c4ccc768cc9629b Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Thu, 11 Apr 2024 22:41:46 +0300 Subject: [PATCH 6/6] Always pass DataType to PrimitiveDistinctCountAccumulator (#10047) --- .../src/aggregate/count_distinct/mod.rs | 103 ++++++++++-------- .../src/aggregate/count_distinct/native.rs | 9 +- 2 files changed, 61 insertions(+), 51 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs index ee63945eb249..ae3370df723a 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs @@ -99,55 +99,70 @@ impl AggregateExpr for DistinctCount { use DataType::*; use TimeUnit::*; - Ok(match &self.state_data_type { + let data_type = &self.state_data_type; + Ok(match data_type { // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator - Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - dt @ Decimal128(_, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Decimal256(_, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - - Date32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Date64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt16 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt32 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt64 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Decimal128(_, _) => Box::new(PrimitiveDistinctCountAccumulator::< + Decimal128Type, + >::new(data_type)), + Decimal256(_, _) => Box::new(PrimitiveDistinctCountAccumulator::< + Decimal256Type, + >::new(data_type)), + + Date32 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Date64 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), Time32(Millisecond) => Box::new(PrimitiveDistinctCountAccumulator::< Time32MillisecondType, - >::new()), - Time32(Second) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } + >::new(data_type)), + Time32(Second) => Box::new(PrimitiveDistinctCountAccumulator::< + Time32SecondType, + >::new(data_type)), Time64(Microsecond) => Box::new(PrimitiveDistinctCountAccumulator::< Time64MicrosecondType, - >::new()), - Time64(Nanosecond) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } - dt @ Timestamp(Microsecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Timestamp(Millisecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Timestamp(Nanosecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Timestamp(Second, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), + >::new(data_type)), + Time64(Nanosecond) => Box::new(PrimitiveDistinctCountAccumulator::< + Time64NanosecondType, + >::new(data_type)), + Timestamp(Microsecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampMicrosecondType, + >::new(data_type)), + Timestamp(Millisecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampMillisecondType, + >::new(data_type)), + Timestamp(Nanosecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampNanosecondType, + >::new(data_type)), + Timestamp(Second, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampSecondType, + >::new(data_type)), Float16 => Box::new(FloatDistinctCountAccumulator::::new()), Float32 => Box::new(FloatDistinctCountAccumulator::::new()), diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs index 8f3ce8acfe07..97ff1ef257b4 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -54,17 +54,12 @@ where T: ArrowPrimitiveType + Send, T::Native: Eq + Hash, { - pub(super) fn new() -> Self { + pub(super) fn new(data_type: &DataType) -> Self { Self { values: HashSet::default(), - data_type: T::DATA_TYPE, + data_type: data_type.clone(), } } - - pub(super) fn with_data_type(mut self, data_type: DataType) -> Self { - self.data_type = data_type; - self - } } impl Accumulator for PrimitiveDistinctCountAccumulator