Skip to content

Commit

Permalink
Don't preserve functional dependency when generating UNION logical pl…
Browse files Browse the repository at this point in the history
…an (#44) (apache#12979)

* Don't preserve functional dependency when generating UNION logical plan

* Remove extra lines
  • Loading branch information
Sevenannn authored and alamb committed Dec 7, 2024
1 parent 577e4bb commit 63ef2ff
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
48 changes: 48 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2623,6 +2623,54 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_aggregate_with_union() -> Result<()> {
let df = test_table().await?;

let df1 = df
.clone()
// GROUP BY `c1`
.aggregate(vec![col("c1")], vec![min(col("c2"))])?
// SELECT `c1` , min(c2) as `result`
.select(vec![col("c1"), min(col("c2")).alias("result")])?;
let df2 = df
.clone()
// GROUP BY `c1`
.aggregate(vec![col("c1")], vec![max(col("c3"))])?
// SELECT `c1` , max(c3) as `result`
.select(vec![col("c1"), max(col("c3")).alias("result")])?;

let df_union = df1.union(df2)?;
let df = df_union
// GROUP BY `c1`
.aggregate(
vec![col("c1")],
vec![sum(col("result")).alias("sum_result")],
)?
// SELECT `c1`, sum(result) as `sum_result`
.select(vec![(col("c1")), col("sum_result")])?;

let df_results = df.collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
[
"+----+------------+",
"| c1 | sum_result |",
"+----+------------+",
"| a | 84 |",
"| b | 69 |",
"| c | 124 |",
"| d | 126 |",
"| e | 121 |",
"+----+------------+"
],
&df_results
);

Ok(())
}

#[tokio::test]
async fn test_aggregate_subexpr() -> Result<()> {
let df = test_table().await?;
Expand Down
11 changes: 8 additions & 3 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{
get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err,
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
TableReference, ToDFSchema, UnnestOptions,
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies,
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
};
use datafusion_expr_common::type_coercion::binary::type_union_resolution;

Expand Down Expand Up @@ -1402,7 +1402,12 @@ pub fn validate_unique_names<'a>(
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
// Temporarily use the schema from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.
let schema = Arc::clone(left_plan.schema());

// Functional Dependencies doesn't preserve after UNION operation
let schema = (**left_plan.schema()).clone();
let schema =
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);

Ok(LogicalPlan::Union(Union {
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
schema,
Expand Down

0 comments on commit 63ef2ff

Please sign in to comment.