diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 941ab4046e6..033e332b99c 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup +spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index e24a34ef3d5..d1cd70aa43c 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -29,12 +29,15 @@ pytestmark = pytest.mark.nightly_resource_consuming_test _float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', - 'spark.rapids.sql.castStringToFloat.enabled': 'true' - } + 'spark.rapids.sql.castStringToFloat.enabled': 'true' + } _float_smallbatch_conf = copy_and_update(_float_conf, {'spark.rapids.sql.batchSizeBytes' : '250'}) +_float_conf_skipagg = copy_and_update(_float_smallbatch_conf, + {'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'}) + _float_conf_partial = copy_and_update(_float_conf, {'spark.rapids.sql.hashAgg.replaceMode': 'partial'}) @@ -221,8 +224,8 @@ def get_params(init_list, marked_params=[]): return list -# Run these tests with in 4 modes, all on the GPU -_confs = [_float_conf, _float_smallbatch_conf, _float_conf_final, _float_conf_partial] +# Run these tests with in 5 modes, all on the GPU +_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial] # Pytest marker for list of operators allowed to run on the CPU, # esp. useful in partial and final only modes. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index c58d9862be1..7e6a1056d01 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids import java.util import scala.annotation.tailrec +import scala.collection.JavaConverters.collectionAsScalaIterableConverter import scala.collection.mutable import ai.rapids.cudf @@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging { object GpuAggFirstPassIterator { def apply(cbIter: Iterator[ColumnarBatch], aggHelper: AggHelper, - metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = { + metrics: GpuHashAggregateMetrics + ): Iterator[SpillableColumnarBatch] = { val preprocessProjectIter = cbIter.map { cb => val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb) @@ -707,6 +709,9 @@ object GpuAggFinalPassIterator { * @param metrics metrics that will be updated during aggregation * @param configuredTargetBatchSize user-specified value for the targeted input batch size * @param useTieredProject user-specified option to enable tiered projections + * @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg + * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value + * @param localInputRowsCount metric to track the number of input rows processed locally */ class GpuMergeAggregateIterator( firstPassIter: Iterator[SpillableColumnarBatch], @@ -718,15 +723,21 @@ class GpuMergeAggregateIterator( modeInfo: AggregateModeInfo, metrics: GpuHashAggregateMetrics, configuredTargetBatchSize: Long, - useTieredProject: Boolean) + useTieredProject: Boolean, + allowNonFullyAggregatedOutput: Boolean, + skipAggPassReductionRatio: Double, + localInputRowsCount: LocalGpuMetric) extends Iterator[ColumnarBatch] with AutoCloseable with Logging { private[this] val isReductionOnly = groupingExpressions.isEmpty private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch] private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None - /** Iterator for fetching aggregated batches if a sort-based fallback has occurred */ - private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None + /** Iterator for fetching aggregated batches either if: + * 1. a sort-based fallback has occurred + * 2. skip third pass agg has occurred + **/ + private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None /** Whether a batch is pending for a reduction-only aggregation */ private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly @@ -739,24 +750,61 @@ class GpuMergeAggregateIterator( } override def hasNext: Boolean = { - sortFallbackIter.map(_.hasNext).getOrElse { + fallbackIter.map(_.hasNext).getOrElse { // reductions produce a result even if the input is empty hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext } } override def next(): ColumnarBatch = { - sortFallbackIter.map(_.next()).getOrElse { + fallbackIter.map(_.next()).getOrElse { + var shouldSkipThirdPassAgg = false + // aggregate and merge all pending inputs if (firstPassIter.hasNext) { - aggregateInputBatches() - tryMergeAggregatedBatches() + // first pass agg + val rowsAfterFirstPassAgg = aggregateInputBatches() + + // by now firstPassIter has been traversed, so localInputRowsCount is finished updating + if (isReductionOnly || + skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) { + // second pass agg + tryMergeAggregatedBatches() + + val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) { + (totalRows, batch) => totalRows + batch.numRows() + } + shouldSkipThirdPassAgg = + rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg + } else { + shouldSkipThirdPassAgg = true + logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " + + s"${skipAggPassReductionRatio * 100}% of " + + s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg") + } } if (aggregatedBatches.size() > 1) { - // Unable to merge to a single output, so must fall back to a sort-based approach. - sortFallbackIter = Some(buildSortFallbackIterator()) - sortFallbackIter.get.next() + // Unable to merge to a single output, so must fall back + if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) { + // skip third pass agg, return the aggregated batches directly + logInfo(s"Rows after second pass aggregation exceeds " + + s"${skipAggPassReductionRatio * 100}% of " + + s"rows after first pass, skip the third pass agg") + fallbackIter = Some(new Iterator[ColumnarBatch] { + override def hasNext: Boolean = !aggregatedBatches.isEmpty + + override def next(): ColumnarBatch = { + withResource(aggregatedBatches.pop()) { spillableBatch => + spillableBatch.getColumnarBatch() + } + } + }) + } else { + // fallback to sort agg, this is the third pass agg + fallbackIter = Some(buildSortFallbackIterator()) + } + fallbackIter.get.next() } else if (aggregatedBatches.isEmpty) { if (hasReductionOnlyBatch) { hasReductionOnlyBatch = false @@ -779,7 +827,7 @@ class GpuMergeAggregateIterator( aggregatedBatches.clear() outOfCoreIter.foreach(_.close()) outOfCoreIter = None - sortFallbackIter = None + fallbackIter = None hasReductionOnlyBatch = false } @@ -789,11 +837,15 @@ class GpuMergeAggregateIterator( } /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ - private def aggregateInputBatches(): Unit = { + private def aggregateInputBatches(): Long = { + var rowsAfter = 0L // cache everything in the first pass while (firstPassIter.hasNext) { - aggregatedBatches.add(firstPassIter.next()) + val batch = firstPassIter.next() + rowsAfter += batch.numRows() + aggregatedBatches.add(batch) } + rowsAfter } /** @@ -1115,8 +1167,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( /* * Type inferencing by the Scala compiler will choose the most specific return type - * something like Array[Set[Product with Serializable with AggregateMode]] or with - * slight differences depending on Scala version. Here we ensure this is + * something like Array[Set[Product with Serializable with AggregateMode]] or with + * slight differences depending on Scala version. Here we ensure this is * Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly. */ val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern => @@ -1189,6 +1241,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( mode == Partial || mode == PartialMerge } && agg.groupingExpressions.nonEmpty // Don't do this for a reduce... + // for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final, + // so don't allow it. + lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode => + mode == Partial || mode == PartialMerge + } && agg.aggregateExpressions.nonEmpty + lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr => orderable.isSupportedByPlugin(expr.dataType) } @@ -1272,7 +1330,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( useTiered, estimatedPreProcessGrowth, conf.forceSinglePassPartialSortAgg, - allowSinglePassAgg) + allowSinglePassAgg, + allowNonFullyAggregatedOutput, + conf.skipAggPassReductionRatio) } } @@ -1358,7 +1418,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega // For now we are just going to go with the original hash aggregation 1.0, false, - false) + false, + false, + 1) } else { super.convertToGpu() } @@ -1707,6 +1769,10 @@ object GpuHashAggregateExecBase { * @param child incoming plan (where we get input columns from) * @param configuredTargetBatchSize user-configured maximum device memory size of a batch * @param configuredTieredProjectEnabled configurable optimization to use tiered projections + * @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation + * (can omit non fully aggregated data for non-final + * stage of aggregation) + * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value */ case class GpuHashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], @@ -1719,7 +1785,10 @@ case class GpuHashAggregateExec( configuredTieredProjectEnabled: Boolean, estimatedPreProcessGrowth: Double, forceSinglePassAgg: Boolean, - allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec { + allowSinglePassAgg: Boolean, + allowNonFullyAggregatedOutput: Boolean, + skipAggPassReductionRatio: Double +) extends ShimUnaryExecNode with GpuExec { // lifted directly from `BaseAggregateExec.inputAttributes`, edited comment. def inputAttributes: Seq[Attribute] = @@ -1804,7 +1873,7 @@ case class GpuHashAggregateExec( boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo, localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering, postBoundReferences, targetBatchSize, aggMetrics, useTieredProject, - localForcePre, localAllowPre) + localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio) } } @@ -1920,7 +1989,10 @@ class DynamicGpuPartialSortAggregateIterator( metrics: GpuHashAggregateMetrics, useTiered: Boolean, forceSinglePassAgg: Boolean, - allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] { + allowSinglePassAgg: Boolean, + allowNonFullyAggregatedOutput: Boolean, + skipAggPassReductionRatio: Double +) extends Iterator[ColumnarBatch] { private var aggIter: Option[Iterator[ColumnarBatch]] = None private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty @@ -1998,7 +2070,14 @@ class DynamicGpuPartialSortAggregateIterator( inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound, metrics.opTime, metrics.numPreSplits) - val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics) + val localInputRowsMetrics = new LocalGpuMetric + val firstPassIter = GpuAggFirstPassIterator( + splitInputIter.map(cb => { + localInputRowsMetrics += cb.numRows() + cb + }), + preProcessAggHelper, + metrics) val mergeIter = new GpuMergeAggregateIterator( firstPassIter, @@ -2010,7 +2089,10 @@ class DynamicGpuPartialSortAggregateIterator( modeInfo, metrics, configuredTargetBatchSize, - useTiered) + useTiered, + allowNonFullyAggregatedOutput, + skipAggPassReductionRatio, + localInputRowsMetrics) GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 5203e926efa..aad4f05b334 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1509,6 +1509,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(true) + val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio") + .doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " + + "greater than this value, the next aggregation pass will be skipped." + + "Setting this to 1 essentially disables this feature.") + .doubleConf + .checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].") + .createWithDefault(1.0) + val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] = conf("spark.rapids.sql.agg.forceSinglePassPartialSort") .doc("Force a single pass partial sort agg to happen in all cases that it could, " + @@ -3069,6 +3077,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG) + lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO) + lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP) lazy val maxRegExpStateMemory: Long = {