Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#10950 for liyuan/0612-base-local #17

1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|0.9|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
<a name="sql.castFloatToDecimal.enabled"></a>spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
11 changes: 7 additions & 4 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
pytestmark = pytest.mark.nightly_resource_consuming_test

_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}

_float_smallbatch_conf = copy_and_update(_float_conf,
{'spark.rapids.sql.batchSizeBytes' : '250'})

_float_conf_skipagg = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'})

_float_conf_partial = copy_and_update(_float_conf,
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

Expand Down Expand Up @@ -221,8 +224,8 @@ def get_params(init_list, marked_params=[]):
return list


# Run these tests with in 4 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_final, _float_conf_partial]
# Run these tests with in 5 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
Expand Down
130 changes: 106 additions & 24 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids
import java.util

import scala.annotation.tailrec
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.mutable

import ai.rapids.cudf
Expand Down Expand Up @@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging {
object GpuAggFirstPassIterator {
def apply(cbIter: Iterator[ColumnarBatch],
aggHelper: AggHelper,
metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = {
metrics: GpuHashAggregateMetrics
): Iterator[SpillableColumnarBatch] = {
val preprocessProjectIter = cbIter.map { cb =>
val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb)
Expand Down Expand Up @@ -707,6 +709,9 @@ object GpuAggFinalPassIterator {
* @param metrics metrics that will be updated during aggregation
* @param configuredTargetBatchSize user-specified value for the targeted input batch size
* @param useTieredProject user-specified option to enable tiered projections
* @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
* @param localInputRowsCount metric to track the number of input rows processed locally
*/
class GpuMergeAggregateIterator(
firstPassIter: Iterator[SpillableColumnarBatch],
Expand All @@ -718,15 +723,21 @@ class GpuMergeAggregateIterator(
modeInfo: AggregateModeInfo,
metrics: GpuHashAggregateMetrics,
configuredTargetBatchSize: Long,
useTieredProject: Boolean)
useTieredProject: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double,
localInputRowsCount: LocalGpuMetric)
extends Iterator[ColumnarBatch] with AutoCloseable with Logging {
private[this] val isReductionOnly = groupingExpressions.isEmpty
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize)
private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch]
private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None

/** Iterator for fetching aggregated batches if a sort-based fallback has occurred */
private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None
/** Iterator for fetching aggregated batches either if:
* 1. a sort-based fallback has occurred
* 2. skip third pass agg has occurred
**/
private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None

/** Whether a batch is pending for a reduction-only aggregation */
private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly
Expand All @@ -739,24 +750,61 @@ class GpuMergeAggregateIterator(
}

override def hasNext: Boolean = {
sortFallbackIter.map(_.hasNext).getOrElse {
fallbackIter.map(_.hasNext).getOrElse {
// reductions produce a result even if the input is empty
hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext
}
}

override def next(): ColumnarBatch = {
sortFallbackIter.map(_.next()).getOrElse {
fallbackIter.map(_.next()).getOrElse {
var shouldSkipThirdPassAgg = false

// aggregate and merge all pending inputs
if (firstPassIter.hasNext) {
aggregateInputBatches()
tryMergeAggregatedBatches()
// first pass agg
val rowsAfterFirstPassAgg = aggregateInputBatches()

// by now firstPassIter has been traversed, so localInputRowsCount is finished updating
if (isReductionOnly ||
skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) {
// second pass agg
tryMergeAggregatedBatches()

val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) {
(totalRows, batch) => totalRows + batch.numRows()
}
shouldSkipThirdPassAgg =
rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg
} else {
shouldSkipThirdPassAgg = true
logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " +
s"${skipAggPassReductionRatio * 100}% of " +
s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg")
}
}

if (aggregatedBatches.size() > 1) {
// Unable to merge to a single output, so must fall back to a sort-based approach.
sortFallbackIter = Some(buildSortFallbackIterator())
sortFallbackIter.get.next()
// Unable to merge to a single output, so must fall back
if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) {
// skip third pass agg, return the aggregated batches directly
logInfo(s"Rows after second pass aggregation exceeds " +
s"${skipAggPassReductionRatio * 100}% of " +
s"rows after first pass, skip the third pass agg")
fallbackIter = Some(new Iterator[ColumnarBatch] {
override def hasNext: Boolean = !aggregatedBatches.isEmpty

override def next(): ColumnarBatch = {
withResource(aggregatedBatches.pop()) { spillableBatch =>
spillableBatch.getColumnarBatch()
}
}
})
} else {
// fallback to sort agg, this is the third pass agg
fallbackIter = Some(buildSortFallbackIterator())
}
fallbackIter.get.next()
} else if (aggregatedBatches.isEmpty) {
if (hasReductionOnlyBatch) {
hasReductionOnlyBatch = false
Expand All @@ -779,7 +827,7 @@ class GpuMergeAggregateIterator(
aggregatedBatches.clear()
outOfCoreIter.foreach(_.close())
outOfCoreIter = None
sortFallbackIter = None
fallbackIter = None
hasReductionOnlyBatch = false
}

Expand All @@ -789,11 +837,15 @@ class GpuMergeAggregateIterator(
}

/** Aggregate all input batches and place the results in the aggregatedBatches queue. */
private def aggregateInputBatches(): Unit = {
private def aggregateInputBatches(): Long = {
var rowsAfter = 0L
// cache everything in the first pass
while (firstPassIter.hasNext) {
aggregatedBatches.add(firstPassIter.next())
val batch = firstPassIter.next()
rowsAfter += batch.numRows()
aggregatedBatches.add(batch)
}
rowsAfter
}

/**
Expand Down Expand Up @@ -1115,8 +1167,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](

/*
* Type inferencing by the Scala compiler will choose the most specific return type
* something like Array[Set[Product with Serializable with AggregateMode]] or with
* slight differences depending on Scala version. Here we ensure this is
* something like Array[Set[Product with Serializable with AggregateMode]] or with
* slight differences depending on Scala version. Here we ensure this is
* Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly.
*/
val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern =>
Expand Down Expand Up @@ -1189,6 +1241,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
mode == Partial || mode == PartialMerge
} && agg.groupingExpressions.nonEmpty // Don't do this for a reduce...

// for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final,
// so don't allow it.
lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode =>
mode == Partial || mode == PartialMerge
} && agg.aggregateExpressions.nonEmpty

lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr =>
orderable.isSupportedByPlugin(expr.dataType)
}
Expand Down Expand Up @@ -1272,7 +1330,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
useTiered,
estimatedPreProcessGrowth,
conf.forceSinglePassPartialSortAgg,
allowSinglePassAgg)
allowSinglePassAgg,
allowNonFullyAggregatedOutput,
conf.skipAggPassReductionRatio)
}
}

Expand Down Expand Up @@ -1358,7 +1418,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega
// For now we are just going to go with the original hash aggregation
1.0,
false,
false)
false,
false,
1)
} else {
super.convertToGpu()
}
Expand Down Expand Up @@ -1707,6 +1769,10 @@ object GpuHashAggregateExecBase {
* @param child incoming plan (where we get input columns from)
* @param configuredTargetBatchSize user-configured maximum device memory size of a batch
* @param configuredTieredProjectEnabled configurable optimization to use tiered projections
* @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation
* (can omit non fully aggregated data for non-final
* stage of aggregation)
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
*/
case class GpuHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
Expand All @@ -1719,7 +1785,10 @@ case class GpuHashAggregateExec(
configuredTieredProjectEnabled: Boolean,
estimatedPreProcessGrowth: Double,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec {
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double
) extends ShimUnaryExecNode with GpuExec {

// lifted directly from `BaseAggregateExec.inputAttributes`, edited comment.
def inputAttributes: Seq[Attribute] =
Expand Down Expand Up @@ -1804,7 +1873,7 @@ case class GpuHashAggregateExec(
boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo,
localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering,
postBoundReferences, targetBatchSize, aggMetrics, useTieredProject,
localForcePre, localAllowPre)
localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio)
}
}

Expand Down Expand Up @@ -1920,7 +1989,10 @@ class DynamicGpuPartialSortAggregateIterator(
metrics: GpuHashAggregateMetrics,
useTiered: Boolean,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] {
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double
) extends Iterator[ColumnarBatch] {
private var aggIter: Option[Iterator[ColumnarBatch]] = None
private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty

Expand Down Expand Up @@ -1998,7 +2070,14 @@ class DynamicGpuPartialSortAggregateIterator(
inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound,
metrics.opTime, metrics.numPreSplits)

val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics)
val localInputRowsMetrics = new LocalGpuMetric
val firstPassIter = GpuAggFirstPassIterator(
splitInputIter.map(cb => {
localInputRowsMetrics += cb.numRows()
cb
}),
preProcessAggHelper,
metrics)

val mergeIter = new GpuMergeAggregateIterator(
firstPassIter,
Expand All @@ -2010,7 +2089,10 @@ class DynamicGpuPartialSortAggregateIterator(
modeInfo,
metrics,
configuredTargetBatchSize,
useTiered)
useTiered,
allowNonFullyAggregatedOutput,
skipAggPassReductionRatio,
localInputRowsMetrics)

GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics)
}
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio")
.doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " +
"greater than this value, the next aggregation pass will be skipped." +
"Setting this to 1 essentially disables this feature.")
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].")
.createWithDefault(0.9)

val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
.doc("Force a single pass partial sort agg to happen in all cases that it could, " +
Expand Down Expand Up @@ -3074,6 +3082,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG)

lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO)

lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP)

lazy val maxRegExpStateMemory: Long = {
Expand Down
Loading