diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md
index 941ab4046e6..033e332b99c 100644
--- a/docs/additional-functionality/advanced_configs.md
+++ b/docs/additional-functionality/advanced_configs.md
@@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
+spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime
spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py
index e24a34ef3d5..d1cd70aa43c 100644
--- a/integration_tests/src/main/python/hash_aggregate_test.py
+++ b/integration_tests/src/main/python/hash_aggregate_test.py
@@ -29,12 +29,15 @@
pytestmark = pytest.mark.nightly_resource_consuming_test
_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
- 'spark.rapids.sql.castStringToFloat.enabled': 'true'
- }
+ 'spark.rapids.sql.castStringToFloat.enabled': 'true'
+ }
_float_smallbatch_conf = copy_and_update(_float_conf,
{'spark.rapids.sql.batchSizeBytes' : '250'})
+_float_conf_skipagg = copy_and_update(_float_smallbatch_conf,
+ {'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'})
+
_float_conf_partial = copy_and_update(_float_conf,
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})
@@ -221,8 +224,8 @@ def get_params(init_list, marked_params=[]):
return list
-# Run these tests with in 4 modes, all on the GPU
-_confs = [_float_conf, _float_smallbatch_conf, _float_conf_final, _float_conf_partial]
+# Run these tests with in 5 modes, all on the GPU
+_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial]
# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
index c58d9862be1..7e6a1056d01 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ package com.nvidia.spark.rapids
import java.util
import scala.annotation.tailrec
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.mutable
import ai.rapids.cudf
@@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging {
object GpuAggFirstPassIterator {
def apply(cbIter: Iterator[ColumnarBatch],
aggHelper: AggHelper,
- metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = {
+ metrics: GpuHashAggregateMetrics
+ ): Iterator[SpillableColumnarBatch] = {
val preprocessProjectIter = cbIter.map { cb =>
val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb)
@@ -707,6 +709,9 @@ object GpuAggFinalPassIterator {
* @param metrics metrics that will be updated during aggregation
* @param configuredTargetBatchSize user-specified value for the targeted input batch size
* @param useTieredProject user-specified option to enable tiered projections
+ * @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg
+ * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
+ * @param localInputRowsCount metric to track the number of input rows processed locally
*/
class GpuMergeAggregateIterator(
firstPassIter: Iterator[SpillableColumnarBatch],
@@ -718,15 +723,21 @@ class GpuMergeAggregateIterator(
modeInfo: AggregateModeInfo,
metrics: GpuHashAggregateMetrics,
configuredTargetBatchSize: Long,
- useTieredProject: Boolean)
+ useTieredProject: Boolean,
+ allowNonFullyAggregatedOutput: Boolean,
+ skipAggPassReductionRatio: Double,
+ localInputRowsCount: LocalGpuMetric)
extends Iterator[ColumnarBatch] with AutoCloseable with Logging {
private[this] val isReductionOnly = groupingExpressions.isEmpty
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize)
private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch]
private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None
- /** Iterator for fetching aggregated batches if a sort-based fallback has occurred */
- private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None
+ /** Iterator for fetching aggregated batches either if:
+ * 1. a sort-based fallback has occurred
+ * 2. skip third pass agg has occurred
+ **/
+ private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None
/** Whether a batch is pending for a reduction-only aggregation */
private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly
@@ -739,24 +750,61 @@ class GpuMergeAggregateIterator(
}
override def hasNext: Boolean = {
- sortFallbackIter.map(_.hasNext).getOrElse {
+ fallbackIter.map(_.hasNext).getOrElse {
// reductions produce a result even if the input is empty
hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext
}
}
override def next(): ColumnarBatch = {
- sortFallbackIter.map(_.next()).getOrElse {
+ fallbackIter.map(_.next()).getOrElse {
+ var shouldSkipThirdPassAgg = false
+
// aggregate and merge all pending inputs
if (firstPassIter.hasNext) {
- aggregateInputBatches()
- tryMergeAggregatedBatches()
+ // first pass agg
+ val rowsAfterFirstPassAgg = aggregateInputBatches()
+
+ // by now firstPassIter has been traversed, so localInputRowsCount is finished updating
+ if (isReductionOnly ||
+ skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) {
+ // second pass agg
+ tryMergeAggregatedBatches()
+
+ val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) {
+ (totalRows, batch) => totalRows + batch.numRows()
+ }
+ shouldSkipThirdPassAgg =
+ rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg
+ } else {
+ shouldSkipThirdPassAgg = true
+ logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " +
+ s"${skipAggPassReductionRatio * 100}% of " +
+ s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg")
+ }
}
if (aggregatedBatches.size() > 1) {
- // Unable to merge to a single output, so must fall back to a sort-based approach.
- sortFallbackIter = Some(buildSortFallbackIterator())
- sortFallbackIter.get.next()
+ // Unable to merge to a single output, so must fall back
+ if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) {
+ // skip third pass agg, return the aggregated batches directly
+ logInfo(s"Rows after second pass aggregation exceeds " +
+ s"${skipAggPassReductionRatio * 100}% of " +
+ s"rows after first pass, skip the third pass agg")
+ fallbackIter = Some(new Iterator[ColumnarBatch] {
+ override def hasNext: Boolean = !aggregatedBatches.isEmpty
+
+ override def next(): ColumnarBatch = {
+ withResource(aggregatedBatches.pop()) { spillableBatch =>
+ spillableBatch.getColumnarBatch()
+ }
+ }
+ })
+ } else {
+ // fallback to sort agg, this is the third pass agg
+ fallbackIter = Some(buildSortFallbackIterator())
+ }
+ fallbackIter.get.next()
} else if (aggregatedBatches.isEmpty) {
if (hasReductionOnlyBatch) {
hasReductionOnlyBatch = false
@@ -779,7 +827,7 @@ class GpuMergeAggregateIterator(
aggregatedBatches.clear()
outOfCoreIter.foreach(_.close())
outOfCoreIter = None
- sortFallbackIter = None
+ fallbackIter = None
hasReductionOnlyBatch = false
}
@@ -789,11 +837,15 @@ class GpuMergeAggregateIterator(
}
/** Aggregate all input batches and place the results in the aggregatedBatches queue. */
- private def aggregateInputBatches(): Unit = {
+ private def aggregateInputBatches(): Long = {
+ var rowsAfter = 0L
// cache everything in the first pass
while (firstPassIter.hasNext) {
- aggregatedBatches.add(firstPassIter.next())
+ val batch = firstPassIter.next()
+ rowsAfter += batch.numRows()
+ aggregatedBatches.add(batch)
}
+ rowsAfter
}
/**
@@ -1115,8 +1167,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
/*
* Type inferencing by the Scala compiler will choose the most specific return type
- * something like Array[Set[Product with Serializable with AggregateMode]] or with
- * slight differences depending on Scala version. Here we ensure this is
+ * something like Array[Set[Product with Serializable with AggregateMode]] or with
+ * slight differences depending on Scala version. Here we ensure this is
* Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly.
*/
val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern =>
@@ -1189,6 +1241,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
mode == Partial || mode == PartialMerge
} && agg.groupingExpressions.nonEmpty // Don't do this for a reduce...
+ // for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final,
+ // so don't allow it.
+ lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode =>
+ mode == Partial || mode == PartialMerge
+ } && agg.aggregateExpressions.nonEmpty
+
lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr =>
orderable.isSupportedByPlugin(expr.dataType)
}
@@ -1272,7 +1330,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
useTiered,
estimatedPreProcessGrowth,
conf.forceSinglePassPartialSortAgg,
- allowSinglePassAgg)
+ allowSinglePassAgg,
+ allowNonFullyAggregatedOutput,
+ conf.skipAggPassReductionRatio)
}
}
@@ -1358,7 +1418,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega
// For now we are just going to go with the original hash aggregation
1.0,
false,
- false)
+ false,
+ false,
+ 1)
} else {
super.convertToGpu()
}
@@ -1707,6 +1769,10 @@ object GpuHashAggregateExecBase {
* @param child incoming plan (where we get input columns from)
* @param configuredTargetBatchSize user-configured maximum device memory size of a batch
* @param configuredTieredProjectEnabled configurable optimization to use tiered projections
+ * @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation
+ * (can omit non fully aggregated data for non-final
+ * stage of aggregation)
+ * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
*/
case class GpuHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
@@ -1719,7 +1785,10 @@ case class GpuHashAggregateExec(
configuredTieredProjectEnabled: Boolean,
estimatedPreProcessGrowth: Double,
forceSinglePassAgg: Boolean,
- allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec {
+ allowSinglePassAgg: Boolean,
+ allowNonFullyAggregatedOutput: Boolean,
+ skipAggPassReductionRatio: Double
+) extends ShimUnaryExecNode with GpuExec {
// lifted directly from `BaseAggregateExec.inputAttributes`, edited comment.
def inputAttributes: Seq[Attribute] =
@@ -1804,7 +1873,7 @@ case class GpuHashAggregateExec(
boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo,
localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering,
postBoundReferences, targetBatchSize, aggMetrics, useTieredProject,
- localForcePre, localAllowPre)
+ localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio)
}
}
@@ -1920,7 +1989,10 @@ class DynamicGpuPartialSortAggregateIterator(
metrics: GpuHashAggregateMetrics,
useTiered: Boolean,
forceSinglePassAgg: Boolean,
- allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] {
+ allowSinglePassAgg: Boolean,
+ allowNonFullyAggregatedOutput: Boolean,
+ skipAggPassReductionRatio: Double
+) extends Iterator[ColumnarBatch] {
private var aggIter: Option[Iterator[ColumnarBatch]] = None
private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty
@@ -1998,7 +2070,14 @@ class DynamicGpuPartialSortAggregateIterator(
inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound,
metrics.opTime, metrics.numPreSplits)
- val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics)
+ val localInputRowsMetrics = new LocalGpuMetric
+ val firstPassIter = GpuAggFirstPassIterator(
+ splitInputIter.map(cb => {
+ localInputRowsMetrics += cb.numRows()
+ cb
+ }),
+ preProcessAggHelper,
+ metrics)
val mergeIter = new GpuMergeAggregateIterator(
firstPassIter,
@@ -2010,7 +2089,10 @@ class DynamicGpuPartialSortAggregateIterator(
modeInfo,
metrics,
configuredTargetBatchSize,
- useTiered)
+ useTiered,
+ allowNonFullyAggregatedOutput,
+ skipAggPassReductionRatio,
+ localInputRowsMetrics)
GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics)
}
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
index 5203e926efa..aad4f05b334 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
@@ -1509,6 +1509,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)
+ val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio")
+ .doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " +
+ "greater than this value, the next aggregation pass will be skipped." +
+ "Setting this to 1 essentially disables this feature.")
+ .doubleConf
+ .checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].")
+ .createWithDefault(1.0)
+
val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
.doc("Force a single pass partial sort agg to happen in all cases that it could, " +
@@ -3069,6 +3077,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG)
+ lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO)
+
lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP)
lazy val maxRegExpStateMemory: Long = {