Skip to content

Commit

Permalink
Fix wildcard expansion for HAVING clause (apache#12046)
Browse files Browse the repository at this point in the history
* fix the wildcard expand for filter plan

* expand the wildcard for the error message

* add the tests

* fix recompute_schema

* fix clippy

* cargo fmt

* change the check for having clause

* rename the function and moving the tests

* fix check

* expand the schema for aggregate plan

* reduce the time to expand wildcard

* clean the testing table after tested

* fmt and address review

* stop expand wildcard and add more check for group-by and selects

* simplify the having check
  • Loading branch information
goldmedal authored Aug 22, 2024
1 parent b2ac83f commit a50aeef
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 15 deletions.
8 changes: 8 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ impl LogicalPlanBuilder {
.map(Self::from)
}

/// Apply a filter which is used for a having clause
pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
let expr = normalize_col(expr.into(), &self.plan)?;
Filter::try_new_with_having(expr, Arc::new(self.plan))
.map(LogicalPlan::Filter)
.map(Self::from)
}

/// Make a builder for a prepare logical plan from the builder's plan
pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
Ok(Self::from(LogicalPlan::Prepare(Prepare {
Expand Down
26 changes: 23 additions & 3 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,12 @@ impl LogicalPlan {
// todo it isn't clear why the schema is not recomputed here
Ok(LogicalPlan::Values(Values { schema, values }))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
Filter::try_new(predicate, input).map(LogicalPlan::Filter)
}
LogicalPlan::Filter(Filter {
predicate,
input,
having,
}) => Filter::try_new_internal(predicate, input, having)
.map(LogicalPlan::Filter),
LogicalPlan::Repartition(_) => Ok(self),
LogicalPlan::Window(Window {
input,
Expand Down Expand Up @@ -2080,6 +2083,8 @@ pub struct Filter {
pub predicate: Expr,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The flag to indicate if the filter is a having clause
pub having: bool,
}

impl Filter {
Expand All @@ -2088,6 +2093,20 @@ impl Filter {
/// Notes: as Aliases have no effect on the output of a filter operator,
/// they are removed from the predicate expression.
pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
Self::try_new_internal(predicate, input, false)
}

/// Create a new filter operator for a having clause.
/// This is similar to a filter, but its having flag is set to true.
pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
Self::try_new_internal(predicate, input, true)
}

fn try_new_internal(
predicate: Expr,
input: Arc<LogicalPlan>,
having: bool,
) -> Result<Self> {
// Filter predicates must return a boolean value so we try and validate that here.
// Note that it is not always possible to resolve the predicate expression during plan
// construction (such as with correlated subqueries) so we make a best effort here and
Expand All @@ -2104,6 +2123,7 @@ impl Filter {
Ok(Self {
predicate: predicate.unalias_nested().data,
input,
having,
})
}

Expand Down
28 changes: 22 additions & 6 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,17 @@ impl TreeNode for LogicalPlan {
schema,
})
}),
LogicalPlan::Filter(Filter { predicate, input }) => rewrite_arc(input, f)?
.update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
LogicalPlan::Filter(Filter {
predicate,
input,
having,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Filter(Filter {
predicate,
input,
having,
})
}),
LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
Expand Down Expand Up @@ -561,10 +570,17 @@ impl LogicalPlan {
value.into_iter().map_until_stop_and_collect(&mut f)
})?
.update_data(|values| LogicalPlan::Values(Values { schema, values })),
LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
.update_data(|predicate| {
LogicalPlan::Filter(Filter { predicate, input })
}),
LogicalPlan::Filter(Filter {
predicate,
input,
having,
}) => f(predicate)?.update_data(|predicate| {
LogicalPlan::Filter(Filter {
predicate,
input,
having,
})
}),
LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,15 @@ pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan {
match input {
LogicalPlan::Window(window) => find_base_plan(&window.input),
LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input),
LogicalPlan::Filter(filter) => {
if filter.having {
// If a filter is used for a having clause, its input plan is an aggregation.
// We should expand the wildcard expression based on the aggregation's input plan.
find_base_plan(&filter.input)
} else {
input
}
}
_ => input,
}
}
Expand Down
5 changes: 2 additions & 3 deletions datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,13 @@ fn replace_columns(
mod tests {
use arrow::datatypes::{DataType, Field, Schema};

use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
use crate::Analyzer;
use datafusion_common::{JoinType, TableReference};
use datafusion_expr::{
col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder,
};

use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
use crate::Analyzer;

use super::*;

fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr {
LogicalPlanBuilder::from(plan)
.filter(having_expr_post_aggr)?
.having(having_expr_post_aggr)?
.build()?
} else {
plan
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! SQL Utility Functions
use std::collections::HashMap;

use arrow_schema::{
DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE,
};
Expand All @@ -33,6 +31,7 @@ use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction};
use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan};
use sqlparser::ast::{Ident, Value};
use std::collections::HashMap;

/// Make a best-effort attempt at resolving all columns in the expression tree
pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
Expand Down
91 changes: 91 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5655,6 +5655,97 @@ select count(null), min(null), max(null), bit_and(NULL), bit_or(NULL), bit_xor(N
----
0 NULL NULL NULL NULL NULL NULL NULL

statement ok
create table having_test(v1 int, v2 int)

statement ok
create table join_table(v1 int, v2 int)

statement ok
insert into having_test values (1, 2), (2, 3), (3, 4)

statement ok
insert into join_table values (1, 2), (2, 3), (3, 4)


query II
select * from having_test group by v1, v2 having max(v1) = 3
----
3 4

query TT
EXPLAIN select * from having_test group by v1, v2 having max(v1) = 3
----
logical_plan
01)Projection: having_test.v1, having_test.v2
02)--Filter: max(having_test.v1) = Int32(3)
03)----Aggregate: groupBy=[[having_test.v1, having_test.v2]], aggr=[[max(having_test.v1)]]
04)------TableScan: having_test projection=[v1, v2]
physical_plan
01)ProjectionExec: expr=[v1@0 as v1, v2@1 as v2]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----FilterExec: max(having_test.v1)@2 = 3
04)------AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=4
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
09)----------------MemoryExec: partitions=1, partition_sizes=[1]


query error
select * from having_test having max(v1) = 3

query I
select max(v1) from having_test having max(v1) = 3
----
3

query I
select max(v1), * exclude (v1, v2) from having_test having max(v1) = 3
----
3

# because v1, v2 is not in the group by clause, the sql is invalid
query III
select max(v1), * replace ('v1' as v3) from having_test group by v1, v2 having max(v1) = 3
----
3 3 4

query III
select max(v1), t.* from having_test t group by v1, v2 having max(v1) = 3
----
3 3 4

# j.* should also be included in the group-by clause
query error
select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by t.v1, t.v2 having max(t.v1) = 3

query III
select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by j.v1, j.v2 having max(t.v1) = 3
----
3 3 4

# If the select items only contain scalar expressions, the having clause is valid.
query P
select now() from having_test having max(v1) = 4
----

# If the select items only contain scalar expressions, the having clause is valid.
query I
select 0 from having_test having max(v1) = 4
----

# v2 should also be included in group-by clause
query error
select * from having_test group by v1 having max(v1) = 3

statement ok
drop table having_test

statement ok
drop table join_table

# test min/max Float16 without group expression
query RRTT
WITH data AS (
Expand Down

0 comments on commit a50aeef

Please sign in to comment.