diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bf5dbb8200e87..37f6465f8817c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2381,13 +2381,16 @@ class Analyzer(override val catalogManager: CatalogManager) val unresolvedSortOrders = sortOrder.filter { s => !s.resolved || !s.references.subsetOf(aggregate.outputSet) || containsAggregate(s) } - val aliasedOrdering = - unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) - val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) + val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) + + val aggregateWithExtraOrdering = aggregate.copy( + aggregateExpressions = aggregate.aggregateExpressions ++ aliasedOrdering) + val resolvedAggregate: Aggregate = - executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate] - val resolvedAliasedOrdering: Seq[Alias] = - resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]] + executeSameContext(aggregateWithExtraOrdering).asInstanceOf[Aggregate] + + val (reResolvedAggExprs, resolvedAliasedOrdering) = + resolvedAggregate.aggregateExpressions.splitAt(aggregate.aggregateExpressions.length) // If we pass the analysis check, then the ordering expressions should only reference to // aggregate expressions or grouping expressions, and it's safe to push them down to @@ -2401,24 +2404,25 @@ class Analyzer(override val catalogManager: CatalogManager) // expression instead. val needsPushDown = ArrayBuffer.empty[NamedExpression] val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering) - val evaluatedOrderings = resolvedAliasedOrdering.zip(orderToAlias).map { - case (evaluated, (order, aliasOrder)) => - val index = originalAggExprs.indexWhere { - case Alias(child, _) => child semanticEquals evaluated.child - case other => other semanticEquals evaluated.child - } + val evaluatedOrderings = + resolvedAliasedOrdering.asInstanceOf[Seq[Alias]].zip(orderToAlias).map { + case (evaluated, (order, aliasOrder)) => + val index = reResolvedAggExprs.indexWhere { + case Alias(child, _) => child semanticEquals evaluated.child + case other => other semanticEquals evaluated.child + } - if (index == -1) { - if (CharVarcharUtils.getRawType(evaluated.metadata).nonEmpty) { - needsPushDown += aliasOrder - order.copy(child = aliasOrder) + if (index == -1) { + if (hasCharVarchar(evaluated)) { + needsPushDown += aliasOrder + order.copy(child = aliasOrder) + } else { + needsPushDown += evaluated + order.copy(child = evaluated.toAttribute) + } } else { - needsPushDown += evaluated - order.copy(child = evaluated.toAttribute) + order.copy(child = originalAggExprs(index).toAttribute) } - } else { - order.copy(child = originalAggExprs(index).toAttribute) - } } val sortOrdersMap = unresolvedSortOrders @@ -2443,6 +2447,13 @@ class Analyzer(override val catalogManager: CatalogManager) } } + def hasCharVarchar(expr: Alias): Boolean = { + expr.find { + case ne: NamedExpression => CharVarcharUtils.getRawType(ne.metadata).nonEmpty + case _ => false + }.nonEmpty + } + def containsAggregate(condition: Expression): Boolean = { condition.find(_.isInstanceOf[AggregateExpression]).isDefined } diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt index 7c3f00d33f24e..cdef75fa0fa47 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt @@ -101,7 +101,7 @@ Input [8]: [ws_sold_date_sk#1, ws_item_sk#2, ws_web_page_sk#3, ws_order_number#4 (10) Exchange Input [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7] -Arguments: hashpartitioning(cast(ws_item_sk#2 as bigint), cast(ws_order_number#4 as bigint), 5), true, [id=#10] +Arguments: hashpartitioning(cast(ws_item_sk#2 as bigint), cast(ws_order_number#4 as bigint), 5), ENSURE_REQUIREMENTS, [id=#10] (11) Sort [codegen id : 3] Input [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7] @@ -123,7 +123,7 @@ Condition : (((((isnotnull(wr_item_sk#11) AND isnotnull(wr_order_number#16)) AND (15) Exchange Input [8]: [wr_item_sk#11, wr_refunded_cdemo_sk#12, wr_refunded_addr_sk#13, wr_returning_cdemo_sk#14, wr_reason_sk#15, wr_order_number#16, wr_fee#17, wr_refunded_cash#18] -Arguments: hashpartitioning(wr_item_sk#11, wr_order_number#16, 5), true, [id=#19] +Arguments: hashpartitioning(wr_item_sk#11, wr_order_number#16, 5), ENSURE_REQUIREMENTS, [id=#19] (16) Sort [codegen id : 5] Input [8]: [wr_item_sk#11, wr_refunded_cdemo_sk#12, wr_refunded_addr_sk#13, wr_returning_cdemo_sk#14, wr_reason_sk#15, wr_order_number#16, wr_fee#17, wr_refunded_cash#18] @@ -229,7 +229,7 @@ Input [11]: [ws_quantity#5, ws_sales_price#6, ws_net_profit#7, wr_refunded_cdemo (39) Exchange Input [7]: [ws_quantity#5, ws_sales_price#6, wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, wr_fee#17, wr_refunded_cash#18, r_reason_desc#24] -Arguments: hashpartitioning(wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, 5), true, [id=#30] +Arguments: hashpartitioning(wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, 5), ENSURE_REQUIREMENTS, [id=#30] (40) Sort [codegen id : 10] Input [7]: [ws_quantity#5, ws_sales_price#6, wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, wr_fee#17, wr_refunded_cash#18, r_reason_desc#24] @@ -278,7 +278,7 @@ Input [6]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo (50) Exchange Input [4]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo_sk#35] -Arguments: hashpartitioning(cast(cd_demo_sk#31 as bigint), cast(cd_demo_sk#35 as bigint), 5), true, [id=#38] +Arguments: hashpartitioning(cast(cd_demo_sk#31 as bigint), cast(cd_demo_sk#35 as bigint), 5), ENSURE_REQUIREMENTS, [id=#38] (51) Sort [codegen id : 13] Input [4]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo_sk#35] @@ -302,16 +302,16 @@ Results [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, coun (55) Exchange Input [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, count#50] -Arguments: hashpartitioning(r_reason_desc#24, 5), true, [id=#51] +Arguments: hashpartitioning(r_reason_desc#24, 5), ENSURE_REQUIREMENTS, [id=#51] (56) HashAggregate [codegen id : 15] Input [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, count#50] Keys [1]: [r_reason_desc#24] Functions [3]: [avg(cast(ws_quantity#5 as bigint)), avg(UnscaledValue(wr_refunded_cash#18)), avg(UnscaledValue(wr_fee#17))] Aggregate Attributes [3]: [avg(cast(ws_quantity#5 as bigint))#52, avg(UnscaledValue(wr_refunded_cash#18))#53, avg(UnscaledValue(wr_fee#17))#54] -Results [5]: [substr(r_reason_desc#24, 1, 20) AS substr(r_reason_desc, 1, 20)#55, avg(cast(ws_quantity#5 as bigint))#52 AS avg(ws_quantity)#56, cast((avg(UnscaledValue(wr_refunded_cash#18))#53 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#57, cast((avg(UnscaledValue(wr_fee#17))#54 / 100.0) as decimal(11,6)) AS avg(wr_fee)#58, avg(cast(ws_quantity#5 as bigint))#52 AS aggOrder#59] +Results [4]: [substr(r_reason_desc#24, 1, 20) AS substr(r_reason_desc, 1, 20)#55, avg(cast(ws_quantity#5 as bigint))#52 AS avg(ws_quantity)#56, cast((avg(UnscaledValue(wr_refunded_cash#18))#53 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#57, cast((avg(UnscaledValue(wr_fee#17))#54 / 100.0) as decimal(11,6)) AS avg(wr_fee)#58] (57) TakeOrderedAndProject -Input [5]: [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58, aggOrder#59] -Arguments: 100, [substr(r_reason_desc, 1, 20)#55 ASC NULLS FIRST, aggOrder#59 ASC NULLS FIRST, avg(wr_refunded_cash)#57 ASC NULLS FIRST, avg(wr_fee)#58 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58] +Input [4]: [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58] +Arguments: 100, [substr(r_reason_desc, 1, 20)#55 ASC NULLS FIRST, avg(ws_quantity)#56 ASC NULLS FIRST, avg(wr_refunded_cash)#57 ASC NULLS FIRST, avg(wr_fee)#58 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt index 3fa7d84f55966..236afa30baf23 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject [substr(r_reason_desc, 1, 20),aggOrder,avg(wr_refunded_cash),avg(wr_fee),avg(ws_quantity)] +TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee)] WholeStageCodegen (15) - HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),aggOrder,sum,count,sum,count,sum,count] + HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),sum,count,sum,count,sum,count] InputAdapter Exchange [r_reason_desc] #1 WholeStageCodegen (14) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/explain.txt index 626d1a71e579f..23598c2e72708 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/explain.txt @@ -272,16 +272,16 @@ Results [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, coun (49) Exchange Input [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, count#48] -Arguments: hashpartitioning(r_reason_desc#35, 5), true, [id=#49] +Arguments: hashpartitioning(r_reason_desc#35, 5), ENSURE_REQUIREMENTS, [id=#49] (50) HashAggregate [codegen id : 9] Input [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, count#48] Keys [1]: [r_reason_desc#35] Functions [3]: [avg(cast(ws_quantity#5 as bigint)), avg(UnscaledValue(wr_refunded_cash#15)), avg(UnscaledValue(wr_fee#14))] Aggregate Attributes [3]: [avg(cast(ws_quantity#5 as bigint))#50, avg(UnscaledValue(wr_refunded_cash#15))#51, avg(UnscaledValue(wr_fee#14))#52] -Results [5]: [substr(r_reason_desc#35, 1, 20) AS substr(r_reason_desc, 1, 20)#53, avg(cast(ws_quantity#5 as bigint))#50 AS avg(ws_quantity)#54, cast((avg(UnscaledValue(wr_refunded_cash#15))#51 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#55, cast((avg(UnscaledValue(wr_fee#14))#52 / 100.0) as decimal(11,6)) AS avg(wr_fee)#56, avg(cast(ws_quantity#5 as bigint))#50 AS aggOrder#57] +Results [4]: [substr(r_reason_desc#35, 1, 20) AS substr(r_reason_desc, 1, 20)#53, avg(cast(ws_quantity#5 as bigint))#50 AS avg(ws_quantity)#54, cast((avg(UnscaledValue(wr_refunded_cash#15))#51 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#55, cast((avg(UnscaledValue(wr_fee#14))#52 / 100.0) as decimal(11,6)) AS avg(wr_fee)#56] (51) TakeOrderedAndProject -Input [5]: [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56, aggOrder#57] -Arguments: 100, [substr(r_reason_desc, 1, 20)#53 ASC NULLS FIRST, aggOrder#57 ASC NULLS FIRST, avg(wr_refunded_cash)#55 ASC NULLS FIRST, avg(wr_fee)#56 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56] +Input [4]: [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56] +Arguments: 100, [substr(r_reason_desc, 1, 20)#53 ASC NULLS FIRST, avg(ws_quantity)#54 ASC NULLS FIRST, avg(wr_refunded_cash)#55 ASC NULLS FIRST, avg(wr_fee)#56 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/simplified.txt index 93c319a615566..b958737162784 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/simplified.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject [substr(r_reason_desc, 1, 20),aggOrder,avg(wr_refunded_cash),avg(wr_fee),avg(ws_quantity)] +TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee)] WholeStageCodegen (9) - HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),aggOrder,sum,count,sum,count,sum,count] + HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),sum,count,sum,count,sum,count] InputAdapter Exchange [r_reason_desc] #1 WholeStageCodegen (8) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index fb35d6cf8dacb..7546e888075d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -474,6 +474,17 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { checkAnswer(sql("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v"), Row("c", 1)) } } + + test("SPARK-34003: fix char/varchar fails w/ order by functions") { + withTable("t") { + sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format") + sql("INSERT INTO t VALUES ('c', 1)") + checkAnswer(sql("SELECT substr(v, 1, 2), sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"), + Row("c", 1)) + checkAnswer(sql("SELECT sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"), + Row(1)) + } + } } // Some basic char/varchar tests which doesn't rely on table implementation.