Skip to content

Commit

Permalink
Velox backend support merge two aggregate to one complete mode aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Nov 13, 2024
1 parent 106fd80 commit c431863
Show file tree
Hide file tree
Showing 37 changed files with 2,545 additions and 2,538 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private object VeloxRuleApi {
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ abstract class HashAggregateExecTransformer(
VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction)
)
aggregateNodeList.add(aggFunctionNode)
case Final =>
case Final | Complete =>
val aggFunctionNode = ExpressionBuilder.makeAggregateFunction(
VeloxAggregateFunctionsBuilder.create(args, aggregateFunction, aggregateMode),
childrenNodeList,
Expand Down Expand Up @@ -242,7 +242,7 @@ abstract class HashAggregateExecTransformer(
aggregateFunction.inputAggBufferAttributes.head.nullable)
)
aggregateNodeList.add(partialNode)
case Final =>
case Final | Complete =>
val aggFunctionNode = ExpressionBuilder.makeAggregateFunction(
VeloxAggregateFunctionsBuilder.create(args, aggregateFunction, aggregateMode),
childrenNodeList,
Expand Down Expand Up @@ -275,7 +275,7 @@ abstract class HashAggregateExecTransformer(
expression.mode match {
case Partial | PartialMerge =>
typeNodeList.add(VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction))
case Final =>
case Final | Complete =>
typeNodeList.add(
ConverterUtils
.getTypeNode(aggregateFunction.dataType, aggregateFunction.nullable))
Expand Down Expand Up @@ -356,7 +356,7 @@ abstract class HashAggregateExecTransformer(
// The process of handling the inconsistency in column types and order between
// Spark and Velox is exactly the opposite of applyExtractStruct.
aggregateExpression.mode match {
case PartialMerge | Final =>
case PartialMerge | Final | Complete =>
val newInputAttributes = new ArrayBuffer[Attribute]()
val childNodes = new JArrayList[ExpressionNode]()
val (sparkOrders, sparkTypes) =
Expand Down Expand Up @@ -467,7 +467,7 @@ abstract class HashAggregateExecTransformer(
// by previous projection.
childrenNodes.add(ExpressionBuilder.makeSelection(colIdx))
colIdx += 1
case Partial =>
case Partial | Complete =>
aggFunc.children.foreach {
_ =>
childrenNodes.add(ExpressionBuilder.makeSelection(colIdx))
Expand Down Expand Up @@ -600,7 +600,7 @@ abstract class HashAggregateExecTransformer(
}
val aggregateFunc = aggExpr.aggregateFunction
val childrenNodes = aggExpr.mode match {
case Partial =>
case Partial | Complete =>
aggregateFunc.children.toList.map(
expr => {
ExpressionConverter
Expand Down Expand Up @@ -784,7 +784,7 @@ case class HashAggregateExecPullOutHelper(
expr.mode match {
case Partial | PartialMerge =>
expr.aggregateFunction.aggBufferAttributes
case Final =>
case Final | Complete =>
Seq(aggregateAttributes(index))
case other =>
throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.")
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c431863

Please sign in to comment.