Skip to content

Commit

Permalink
Add a heuristic to skip second or third agg pass (NVIDIA#10950)
Browse files Browse the repository at this point in the history
* add a heristic to skip agg pass

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* commit doc change

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* refine naming

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix only reduction case

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix compile

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* clean

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix doc

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* reduce premergeci2

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* reduce premergeci2, 2

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* use test_parallel to workaround flaky array test

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* address review comment

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* remove comma

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* workaround for  ci_scala213

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* disable agg ratio heruistic by default

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix doc

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

---------

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone authored Jun 29, 2024
1 parent dd62000 commit f954026
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 28 deletions.
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
<a name="sql.castFloatToDecimal.enabled"></a>spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
11 changes: 7 additions & 4 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
pytestmark = pytest.mark.nightly_resource_consuming_test

_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}

_float_smallbatch_conf = copy_and_update(_float_conf,
{'spark.rapids.sql.batchSizeBytes' : '250'})

_float_conf_skipagg = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'})

_float_conf_partial = copy_and_update(_float_conf,
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

Expand Down Expand Up @@ -221,8 +224,8 @@ def get_params(init_list, marked_params=[]):
return list


# Run these tests with in 4 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_final, _float_conf_partial]
# Run these tests with in 5 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
Expand Down
130 changes: 106 additions & 24 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids
import java.util

import scala.annotation.tailrec
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.mutable

import ai.rapids.cudf
Expand Down Expand Up @@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging {
object GpuAggFirstPassIterator {
def apply(cbIter: Iterator[ColumnarBatch],
aggHelper: AggHelper,
metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = {
metrics: GpuHashAggregateMetrics
): Iterator[SpillableColumnarBatch] = {
val preprocessProjectIter = cbIter.map { cb =>
val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb)
Expand Down Expand Up @@ -707,6 +709,9 @@ object GpuAggFinalPassIterator {
* @param metrics metrics that will be updated during aggregation
* @param configuredTargetBatchSize user-specified value for the targeted input batch size
* @param useTieredProject user-specified option to enable tiered projections
* @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
* @param localInputRowsCount metric to track the number of input rows processed locally
*/
class GpuMergeAggregateIterator(
firstPassIter: Iterator[SpillableColumnarBatch],
Expand All @@ -718,15 +723,21 @@ class GpuMergeAggregateIterator(
modeInfo: AggregateModeInfo,
metrics: GpuHashAggregateMetrics,
configuredTargetBatchSize: Long,
useTieredProject: Boolean)
useTieredProject: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double,
localInputRowsCount: LocalGpuMetric)
extends Iterator[ColumnarBatch] with AutoCloseable with Logging {
private[this] val isReductionOnly = groupingExpressions.isEmpty
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize)
private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch]
private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None

/** Iterator for fetching aggregated batches if a sort-based fallback has occurred */
private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None
/** Iterator for fetching aggregated batches either if:
* 1. a sort-based fallback has occurred
* 2. skip third pass agg has occurred
**/
private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None

/** Whether a batch is pending for a reduction-only aggregation */
private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly
Expand All @@ -739,24 +750,61 @@ class GpuMergeAggregateIterator(
}

override def hasNext: Boolean = {
sortFallbackIter.map(_.hasNext).getOrElse {
fallbackIter.map(_.hasNext).getOrElse {
// reductions produce a result even if the input is empty
hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext
}
}

override def next(): ColumnarBatch = {
sortFallbackIter.map(_.next()).getOrElse {
fallbackIter.map(_.next()).getOrElse {
var shouldSkipThirdPassAgg = false

// aggregate and merge all pending inputs
if (firstPassIter.hasNext) {
aggregateInputBatches()
tryMergeAggregatedBatches()
// first pass agg
val rowsAfterFirstPassAgg = aggregateInputBatches()

// by now firstPassIter has been traversed, so localInputRowsCount is finished updating
if (isReductionOnly ||
skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) {
// second pass agg
tryMergeAggregatedBatches()

val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) {
(totalRows, batch) => totalRows + batch.numRows()
}
shouldSkipThirdPassAgg =
rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg
} else {
shouldSkipThirdPassAgg = true
logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " +
s"${skipAggPassReductionRatio * 100}% of " +
s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg")
}
}

if (aggregatedBatches.size() > 1) {
// Unable to merge to a single output, so must fall back to a sort-based approach.
sortFallbackIter = Some(buildSortFallbackIterator())
sortFallbackIter.get.next()
// Unable to merge to a single output, so must fall back
if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) {
// skip third pass agg, return the aggregated batches directly
logInfo(s"Rows after second pass aggregation exceeds " +
s"${skipAggPassReductionRatio * 100}% of " +
s"rows after first pass, skip the third pass agg")
fallbackIter = Some(new Iterator[ColumnarBatch] {
override def hasNext: Boolean = !aggregatedBatches.isEmpty

override def next(): ColumnarBatch = {
withResource(aggregatedBatches.pop()) { spillableBatch =>
spillableBatch.getColumnarBatch()
}
}
})
} else {
// fallback to sort agg, this is the third pass agg
fallbackIter = Some(buildSortFallbackIterator())
}
fallbackIter.get.next()
} else if (aggregatedBatches.isEmpty) {
if (hasReductionOnlyBatch) {
hasReductionOnlyBatch = false
Expand All @@ -779,7 +827,7 @@ class GpuMergeAggregateIterator(
aggregatedBatches.clear()
outOfCoreIter.foreach(_.close())
outOfCoreIter = None
sortFallbackIter = None
fallbackIter = None
hasReductionOnlyBatch = false
}

Expand All @@ -789,11 +837,15 @@ class GpuMergeAggregateIterator(
}

/** Aggregate all input batches and place the results in the aggregatedBatches queue. */
private def aggregateInputBatches(): Unit = {
private def aggregateInputBatches(): Long = {
var rowsAfter = 0L
// cache everything in the first pass
while (firstPassIter.hasNext) {
aggregatedBatches.add(firstPassIter.next())
val batch = firstPassIter.next()
rowsAfter += batch.numRows()
aggregatedBatches.add(batch)
}
rowsAfter
}

/**
Expand Down Expand Up @@ -1115,8 +1167,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](

/*
* Type inferencing by the Scala compiler will choose the most specific return type
* something like Array[Set[Product with Serializable with AggregateMode]] or with
* slight differences depending on Scala version. Here we ensure this is
* something like Array[Set[Product with Serializable with AggregateMode]] or with
* slight differences depending on Scala version. Here we ensure this is
* Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly.
*/
val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern =>
Expand Down Expand Up @@ -1189,6 +1241,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
mode == Partial || mode == PartialMerge
} && agg.groupingExpressions.nonEmpty // Don't do this for a reduce...

// for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final,
// so don't allow it.
lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode =>
mode == Partial || mode == PartialMerge
} && agg.aggregateExpressions.nonEmpty

lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr =>
orderable.isSupportedByPlugin(expr.dataType)
}
Expand Down Expand Up @@ -1272,7 +1330,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
useTiered,
estimatedPreProcessGrowth,
conf.forceSinglePassPartialSortAgg,
allowSinglePassAgg)
allowSinglePassAgg,
allowNonFullyAggregatedOutput,
conf.skipAggPassReductionRatio)
}
}

Expand Down Expand Up @@ -1358,7 +1418,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega
// For now we are just going to go with the original hash aggregation
1.0,
false,
false)
false,
false,
1)
} else {
super.convertToGpu()
}
Expand Down Expand Up @@ -1707,6 +1769,10 @@ object GpuHashAggregateExecBase {
* @param child incoming plan (where we get input columns from)
* @param configuredTargetBatchSize user-configured maximum device memory size of a batch
* @param configuredTieredProjectEnabled configurable optimization to use tiered projections
* @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation
* (can omit non fully aggregated data for non-final
* stage of aggregation)
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
*/
case class GpuHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
Expand All @@ -1719,7 +1785,10 @@ case class GpuHashAggregateExec(
configuredTieredProjectEnabled: Boolean,
estimatedPreProcessGrowth: Double,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec {
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double
) extends ShimUnaryExecNode with GpuExec {

// lifted directly from `BaseAggregateExec.inputAttributes`, edited comment.
def inputAttributes: Seq[Attribute] =
Expand Down Expand Up @@ -1804,7 +1873,7 @@ case class GpuHashAggregateExec(
boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo,
localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering,
postBoundReferences, targetBatchSize, aggMetrics, useTieredProject,
localForcePre, localAllowPre)
localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio)
}
}

Expand Down Expand Up @@ -1920,7 +1989,10 @@ class DynamicGpuPartialSortAggregateIterator(
metrics: GpuHashAggregateMetrics,
useTiered: Boolean,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] {
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double
) extends Iterator[ColumnarBatch] {
private var aggIter: Option[Iterator[ColumnarBatch]] = None
private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty

Expand Down Expand Up @@ -1998,7 +2070,14 @@ class DynamicGpuPartialSortAggregateIterator(
inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound,
metrics.opTime, metrics.numPreSplits)

val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics)
val localInputRowsMetrics = new LocalGpuMetric
val firstPassIter = GpuAggFirstPassIterator(
splitInputIter.map(cb => {
localInputRowsMetrics += cb.numRows()
cb
}),
preProcessAggHelper,
metrics)

val mergeIter = new GpuMergeAggregateIterator(
firstPassIter,
Expand All @@ -2010,7 +2089,10 @@ class DynamicGpuPartialSortAggregateIterator(
modeInfo,
metrics,
configuredTargetBatchSize,
useTiered)
useTiered,
allowNonFullyAggregatedOutput,
skipAggPassReductionRatio,
localInputRowsMetrics)

GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics)
}
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio")
.doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " +
"greater than this value, the next aggregation pass will be skipped." +
"Setting this to 1 essentially disables this feature.")
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].")
.createWithDefault(1.0)

val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
.doc("Force a single pass partial sort agg to happen in all cases that it could, " +
Expand Down Expand Up @@ -3069,6 +3077,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG)

lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO)

lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP)

lazy val maxRegExpStateMemory: Long = {
Expand Down

0 comments on commit f954026

Please sign in to comment.