diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a96caa03d611..d8e5d8bbdc0c 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -385,6 +385,14 @@ impl LogicalPlanBuilder { .map(Self::from) } + /// Apply a filter which is used for a having clause + pub fn having(self, expr: impl Into) -> Result { + let expr = normalize_col(expr.into(), &self.plan)?; + Filter::try_new_with_having(expr, Arc::new(self.plan)) + .map(LogicalPlan::Filter) + .map(Self::from) + } + /// Make a builder for a prepare logical plan from the builder's plan pub fn prepare(self, name: String, data_types: Vec) -> Result { Ok(Self::from(LogicalPlan::Prepare(Prepare { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index f93b7c0fedd0..ca7d04b9b03e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -643,9 +643,12 @@ impl LogicalPlan { // todo it isn't clear why the schema is not recomputed here Ok(LogicalPlan::Values(Values { schema, values })) } - LogicalPlan::Filter(Filter { predicate, input }) => { - Filter::try_new(predicate, input).map(LogicalPlan::Filter) - } + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) => Filter::try_new_internal(predicate, input, having) + .map(LogicalPlan::Filter), LogicalPlan::Repartition(_) => Ok(self), LogicalPlan::Window(Window { input, @@ -2080,6 +2083,8 @@ pub struct Filter { pub predicate: Expr, /// The incoming logical plan pub input: Arc, + /// The flag to indicate if the filter is a having clause + pub having: bool, } impl Filter { @@ -2088,6 +2093,20 @@ impl Filter { /// Notes: as Aliases have no effect on the output of a filter operator, /// they are removed from the predicate expression. pub fn try_new(predicate: Expr, input: Arc) -> Result { + Self::try_new_internal(predicate, input, false) + } + + /// Create a new filter operator for a having clause. + /// This is similar to a filter, but its having flag is set to true. + pub fn try_new_with_having(predicate: Expr, input: Arc) -> Result { + Self::try_new_internal(predicate, input, true) + } + + fn try_new_internal( + predicate: Expr, + input: Arc, + having: bool, + ) -> Result { // Filter predicates must return a boolean value so we try and validate that here. // Note that it is not always possible to resolve the predicate expression during plan // construction (such as with correlated subqueries) so we make a best effort here and @@ -2104,6 +2123,7 @@ impl Filter { Ok(Self { predicate: predicate.unalias_nested().data, input, + having, }) } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index dbe43128fd38..539cb1cf5fb2 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -87,8 +87,17 @@ impl TreeNode for LogicalPlan { schema, }) }), - LogicalPlan::Filter(Filter { predicate, input }) => rewrite_arc(input, f)? - .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })), + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) + }), LogicalPlan::Repartition(Repartition { input, partitioning_scheme, @@ -561,10 +570,17 @@ impl LogicalPlan { value.into_iter().map_until_stop_and_collect(&mut f) })? .update_data(|values| LogicalPlan::Values(Values { schema, values })), - LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)? - .update_data(|predicate| { - LogicalPlan::Filter(Filter { predicate, input }) - }), + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) => f(predicate)?.update_data(|predicate| { + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) + }), LogicalPlan::Repartition(Repartition { input, partitioning_scheme, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 5f5c468fa2f5..11a244a944f8 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -804,6 +804,15 @@ pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan { match input { LogicalPlan::Window(window) => find_base_plan(&window.input), LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input), + LogicalPlan::Filter(filter) => { + if filter.having { + // If a filter is used for a having clause, its input plan is an aggregation. + // We should expand the wildcard expression based on the aggregation's input plan. + find_base_plan(&filter.input) + } else { + input + } + } _ => input, } } diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs index 53ba3042f522..dd422f7aab95 100644 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs @@ -160,14 +160,13 @@ fn replace_columns( mod tests { use arrow::datatypes::{DataType, Field, Schema}; + use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan}; + use crate::Analyzer; use datafusion_common::{JoinType, TableReference}; use datafusion_expr::{ col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder, }; - use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan}; - use crate::Analyzer; - use super::*; fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 4e0ce33f1334..45fda094557b 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -215,7 +215,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr { LogicalPlanBuilder::from(plan) - .filter(having_expr_post_aggr)? + .having(having_expr_post_aggr)? .build()? } else { plan diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index af161bba45c1..c32acecaae5f 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,8 +17,6 @@ //! SQL Utility Functions -use std::collections::HashMap; - use arrow_schema::{ DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; @@ -33,6 +31,7 @@ use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; use sqlparser::ast::{Ident, Value}; +use std::collections::HashMap; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index d39bf6538ecb..09fc397bf915 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5655,6 +5655,97 @@ select count(null), min(null), max(null), bit_and(NULL), bit_or(NULL), bit_xor(N ---- 0 NULL NULL NULL NULL NULL NULL NULL +statement ok +create table having_test(v1 int, v2 int) + +statement ok +create table join_table(v1 int, v2 int) + +statement ok +insert into having_test values (1, 2), (2, 3), (3, 4) + +statement ok +insert into join_table values (1, 2), (2, 3), (3, 4) + + +query II +select * from having_test group by v1, v2 having max(v1) = 3 +---- +3 4 + +query TT +EXPLAIN select * from having_test group by v1, v2 having max(v1) = 3 +---- +logical_plan +01)Projection: having_test.v1, having_test.v2 +02)--Filter: max(having_test.v1) = Int32(3) +03)----Aggregate: groupBy=[[having_test.v1, having_test.v2]], aggr=[[max(having_test.v1)]] +04)------TableScan: having_test projection=[v1, v2] +physical_plan +01)ProjectionExec: expr=[v1@0 as v1, v2@1 as v2] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----FilterExec: max(having_test.v1)@2 = 3 +04)------AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=4 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)] +09)----------------MemoryExec: partitions=1, partition_sizes=[1] + + +query error +select * from having_test having max(v1) = 3 + +query I +select max(v1) from having_test having max(v1) = 3 +---- +3 + +query I +select max(v1), * exclude (v1, v2) from having_test having max(v1) = 3 +---- +3 + +# because v1, v2 is not in the group by clause, the sql is invalid +query III +select max(v1), * replace ('v1' as v3) from having_test group by v1, v2 having max(v1) = 3 +---- +3 3 4 + +query III +select max(v1), t.* from having_test t group by v1, v2 having max(v1) = 3 +---- +3 3 4 + +# j.* should also be included in the group-by clause +query error +select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by t.v1, t.v2 having max(t.v1) = 3 + +query III +select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by j.v1, j.v2 having max(t.v1) = 3 +---- +3 3 4 + +# If the select items only contain scalar expressions, the having clause is valid. +query P +select now() from having_test having max(v1) = 4 +---- + +# If the select items only contain scalar expressions, the having clause is valid. +query I +select 0 from having_test having max(v1) = 4 +---- + +# v2 should also be included in group-by clause +query error +select * from having_test group by v1 having max(v1) = 3 + +statement ok +drop table having_test + +statement ok +drop table join_table + # test min/max Float16 without group expression query RRTT WITH data AS (