From bb35e1e133b7a445e52b4d09c56bebad1840256e Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Thu, 30 May 2024 11:55:07 +0800 Subject: [PATCH 01/16] add a heristic to skip agg pass Signed-off-by: Hongbin Ma (Mahone) --- .../spark/rapids/GpuAggregateExec.scala | 122 ++++++++++++++---- .../com/nvidia/spark/rapids/RapidsConf.scala | 10 ++ 2 files changed, 109 insertions(+), 23 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index c58d9862be1..c469f4e30cf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids import java.util import scala.annotation.tailrec +import scala.collection.JavaConverters.collectionAsScalaIterableConverter import scala.collection.mutable import ai.rapids.cudf @@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging { object GpuAggFirstPassIterator { def apply(cbIter: Iterator[ColumnarBatch], aggHelper: AggHelper, - metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = { + metrics: GpuHashAggregateMetrics + ): Iterator[SpillableColumnarBatch] = { val preprocessProjectIter = cbIter.map { cb => val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb) @@ -707,6 +709,9 @@ object GpuAggFinalPassIterator { * @param metrics metrics that will be updated during aggregation * @param configuredTargetBatchSize user-specified value for the targeted input batch size * @param useTieredProject user-specified option to enable tiered projections + * @param canSkipThirdPassAgg if allowed to skip third pass Agg + * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value + * @param localInputRowsCount metric to track the number of input rows processed locally */ class GpuMergeAggregateIterator( firstPassIter: Iterator[SpillableColumnarBatch], @@ -718,15 +723,21 @@ class GpuMergeAggregateIterator( modeInfo: AggregateModeInfo, metrics: GpuHashAggregateMetrics, configuredTargetBatchSize: Long, - useTieredProject: Boolean) + useTieredProject: Boolean, + canSkipThirdPassAgg: Boolean, + skipAggPassReductionRatio: Double, + localInputRowsCount: LocalGpuMetric) extends Iterator[ColumnarBatch] with AutoCloseable with Logging { private[this] val isReductionOnly = groupingExpressions.isEmpty private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch] private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None - /** Iterator for fetching aggregated batches if a sort-based fallback has occurred */ - private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None + /** Iterator for fetching aggregated batches either if: + * 1. a sort-based fallback has occurred + * 2. skip third pass agg has occurred + **/ + private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None /** Whether a batch is pending for a reduction-only aggregation */ private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly @@ -739,24 +750,60 @@ class GpuMergeAggregateIterator( } override def hasNext: Boolean = { - sortFallbackIter.map(_.hasNext).getOrElse { + fallbackIter.map(_.hasNext).getOrElse { // reductions produce a result even if the input is empty hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext } } override def next(): ColumnarBatch = { - sortFallbackIter.map(_.next()).getOrElse { + fallbackIter.map(_.next()).getOrElse { + var shouldSkipThirdPassAgg = false + // aggregate and merge all pending inputs if (firstPassIter.hasNext) { - aggregateInputBatches() - tryMergeAggregatedBatches() + // first pass agg + val rowsAfterFirstPassAgg = aggregateInputBatches() + + // by now firstPassIter has been traversed, so localInputRowsCount is finished updating + if (skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) { + // second pass agg + tryMergeAggregatedBatches() + + val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) { + (totalRows, batch) => totalRows + batch.numRows() + } + shouldSkipThirdPassAgg = + rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg + } else { + shouldSkipThirdPassAgg = true + logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " + + s"${skipAggPassReductionRatio * 100}% of " + + s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg") + } } if (aggregatedBatches.size() > 1) { - // Unable to merge to a single output, so must fall back to a sort-based approach. - sortFallbackIter = Some(buildSortFallbackIterator()) - sortFallbackIter.get.next() + // Unable to merge to a single output, so must fall back + if (canSkipThirdPassAgg && shouldSkipThirdPassAgg) { + // skip third pass agg, return the aggregated batches directly + logInfo(s"Rows after second pass aggregation exceeds " + + s"${skipAggPassReductionRatio * 100}% of " + + s"rows after first pass, skip the third pass agg") + fallbackIter = Some(new Iterator[ColumnarBatch] { + override def hasNext: Boolean = !aggregatedBatches.isEmpty + + override def next(): ColumnarBatch = { + withResource(aggregatedBatches.pop()) { spillableBatch => + spillableBatch.getColumnarBatch() + } + } + }) + } else { + // fallback to sort agg, this is the third pass agg + fallbackIter = Some(buildSortFallbackIterator()) + } + fallbackIter.get.next() } else if (aggregatedBatches.isEmpty) { if (hasReductionOnlyBatch) { hasReductionOnlyBatch = false @@ -779,7 +826,7 @@ class GpuMergeAggregateIterator( aggregatedBatches.clear() outOfCoreIter.foreach(_.close()) outOfCoreIter = None - sortFallbackIter = None + fallbackIter = None hasReductionOnlyBatch = false } @@ -789,11 +836,15 @@ class GpuMergeAggregateIterator( } /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ - private def aggregateInputBatches(): Unit = { + private def aggregateInputBatches(): Long = { + var rowsAfter = 0L // cache everything in the first pass while (firstPassIter.hasNext) { - aggregatedBatches.add(firstPassIter.next()) + val batch = firstPassIter.next() + rowsAfter += batch.numRows() + aggregatedBatches.add(batch) } + rowsAfter } /** @@ -1115,8 +1166,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( /* * Type inferencing by the Scala compiler will choose the most specific return type - * something like Array[Set[Product with Serializable with AggregateMode]] or with - * slight differences depending on Scala version. Here we ensure this is + * something like Array[Set[Product with Serializable with AggregateMode]] or with + * slight differences depending on Scala version. Here we ensure this is * Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly. */ val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern => @@ -1189,6 +1240,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( mode == Partial || mode == PartialMerge } && agg.groupingExpressions.nonEmpty // Don't do this for a reduce... + val canSkipThirdPassAgg = canUsePartialSortAgg + lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr => orderable.isSupportedByPlugin(expr.dataType) } @@ -1272,7 +1325,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( useTiered, estimatedPreProcessGrowth, conf.forceSinglePassPartialSortAgg, - allowSinglePassAgg) + allowSinglePassAgg, + canSkipThirdPassAgg, + conf.skipAggPassReductionRatio) } } @@ -1358,7 +1413,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega // For now we are just going to go with the original hash aggregation 1.0, false, - false) + false, + false, + 1) } else { super.convertToGpu() } @@ -1707,6 +1764,9 @@ object GpuHashAggregateExecBase { * @param child incoming plan (where we get input columns from) * @param configuredTargetBatchSize user-configured maximum device memory size of a batch * @param configuredTieredProjectEnabled configurable optimization to use tiered projections + * @param canSkipThirdPassAgg whether we can skip the third pass of aggregation (can omit non fully + * aggregated data for non-final stage of aggregation) + * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value */ case class GpuHashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], @@ -1719,7 +1779,10 @@ case class GpuHashAggregateExec( configuredTieredProjectEnabled: Boolean, estimatedPreProcessGrowth: Double, forceSinglePassAgg: Boolean, - allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec { + allowSinglePassAgg: Boolean, + canSkipThirdPassAgg: Boolean, + skipAggPassReductionRatio: Double +) extends ShimUnaryExecNode with GpuExec { // lifted directly from `BaseAggregateExec.inputAttributes`, edited comment. def inputAttributes: Seq[Attribute] = @@ -1804,7 +1867,7 @@ case class GpuHashAggregateExec( boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo, localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering, postBoundReferences, targetBatchSize, aggMetrics, useTieredProject, - localForcePre, localAllowPre) + localForcePre, localAllowPre, canSkipThirdPassAgg, skipAggPassReductionRatio) } } @@ -1920,7 +1983,10 @@ class DynamicGpuPartialSortAggregateIterator( metrics: GpuHashAggregateMetrics, useTiered: Boolean, forceSinglePassAgg: Boolean, - allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] { + allowSinglePassAgg: Boolean, + canSkipThirdPassAgg: Boolean, + skipAggPassReductionRatio: Double +) extends Iterator[ColumnarBatch] { private var aggIter: Option[Iterator[ColumnarBatch]] = None private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty @@ -1998,7 +2064,14 @@ class DynamicGpuPartialSortAggregateIterator( inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound, metrics.opTime, metrics.numPreSplits) - val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics) + val localInputRowsMetrics = new LocalGpuMetric + val firstPassIter = GpuAggFirstPassIterator( + splitInputIter.map(cb => { + localInputRowsMetrics += cb.numRows() + cb + }), + preProcessAggHelper, + metrics) val mergeIter = new GpuMergeAggregateIterator( firstPassIter, @@ -2010,7 +2083,10 @@ class DynamicGpuPartialSortAggregateIterator( modeInfo, metrics, configuredTargetBatchSize, - useTiered) + useTiered, + canSkipThirdPassAgg, + skipAggPassReductionRatio, + localInputRowsMetrics) GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 8ea1641fb4a..b9c90808e11 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1509,6 +1509,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(true) + val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio") + .doc("In non-final aggregation stages, if the previous agg pass has a reduction ratio " + + "greater than this value, the next aggregation pass will be skipped." + + "Setting this to 1 essentially disables this feature.") + .doubleConf + .checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].") + .createWithDefault(0.9) + val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] = conf("spark.rapids.sql.agg.forceSinglePassPartialSort") .doc("Force a single pass partial sort agg to happen in all cases that it could, " + @@ -3043,6 +3051,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG) + lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO) + lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP) lazy val maxRegExpStateMemory: Long = { From 8010c019cfb804174828629e5480408fc80a4347 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Thu, 30 May 2024 13:41:55 +0800 Subject: [PATCH 02/16] commit doc change Signed-off-by: Hongbin Ma (Mahone) --- docs/additional-functionality/advanced_configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 3231b7b3069..935444a3cca 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup +spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous agg pass has a reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|0.9|Runtime spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime From e5b2feff6a31f26dc96a495eeb4a4dbe7f1021d5 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Thu, 30 May 2024 13:46:08 +0800 Subject: [PATCH 03/16] refine naming Signed-off-by: Hongbin Ma (Mahone) --- .../spark/rapids/GpuAggregateExec.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index c469f4e30cf..8caee0e0a6c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -709,7 +709,7 @@ object GpuAggFinalPassIterator { * @param metrics metrics that will be updated during aggregation * @param configuredTargetBatchSize user-specified value for the targeted input batch size * @param useTieredProject user-specified option to enable tiered projections - * @param canSkipThirdPassAgg if allowed to skip third pass Agg + * @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value * @param localInputRowsCount metric to track the number of input rows processed locally */ @@ -724,7 +724,7 @@ class GpuMergeAggregateIterator( metrics: GpuHashAggregateMetrics, configuredTargetBatchSize: Long, useTieredProject: Boolean, - canSkipThirdPassAgg: Boolean, + allowNonFullyAggregatedOutput: Boolean, skipAggPassReductionRatio: Double, localInputRowsCount: LocalGpuMetric) extends Iterator[ColumnarBatch] with AutoCloseable with Logging { @@ -785,7 +785,7 @@ class GpuMergeAggregateIterator( if (aggregatedBatches.size() > 1) { // Unable to merge to a single output, so must fall back - if (canSkipThirdPassAgg && shouldSkipThirdPassAgg) { + if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) { // skip third pass agg, return the aggregated batches directly logInfo(s"Rows after second pass aggregation exceeds " + s"${skipAggPassReductionRatio * 100}% of " + @@ -1240,7 +1240,7 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( mode == Partial || mode == PartialMerge } && agg.groupingExpressions.nonEmpty // Don't do this for a reduce... - val canSkipThirdPassAgg = canUsePartialSortAgg + val allowNonFullyAggregatedOutput = canUsePartialSortAgg lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr => orderable.isSupportedByPlugin(expr.dataType) @@ -1326,7 +1326,7 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( estimatedPreProcessGrowth, conf.forceSinglePassPartialSortAgg, allowSinglePassAgg, - canSkipThirdPassAgg, + allowNonFullyAggregatedOutput, conf.skipAggPassReductionRatio) } } @@ -1764,7 +1764,7 @@ object GpuHashAggregateExecBase { * @param child incoming plan (where we get input columns from) * @param configuredTargetBatchSize user-configured maximum device memory size of a batch * @param configuredTieredProjectEnabled configurable optimization to use tiered projections - * @param canSkipThirdPassAgg whether we can skip the third pass of aggregation (can omit non fully + * @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation (can omit non fully * aggregated data for non-final stage of aggregation) * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value */ @@ -1780,7 +1780,7 @@ case class GpuHashAggregateExec( estimatedPreProcessGrowth: Double, forceSinglePassAgg: Boolean, allowSinglePassAgg: Boolean, - canSkipThirdPassAgg: Boolean, + allowNonFullyAggregatedOutput: Boolean, skipAggPassReductionRatio: Double ) extends ShimUnaryExecNode with GpuExec { @@ -1867,7 +1867,7 @@ case class GpuHashAggregateExec( boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo, localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering, postBoundReferences, targetBatchSize, aggMetrics, useTieredProject, - localForcePre, localAllowPre, canSkipThirdPassAgg, skipAggPassReductionRatio) + localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio) } } @@ -1984,7 +1984,7 @@ class DynamicGpuPartialSortAggregateIterator( useTiered: Boolean, forceSinglePassAgg: Boolean, allowSinglePassAgg: Boolean, - canSkipThirdPassAgg: Boolean, + allowNonFullyAggregatedOutput: Boolean, skipAggPassReductionRatio: Double ) extends Iterator[ColumnarBatch] { private var aggIter: Option[Iterator[ColumnarBatch]] = None @@ -2084,7 +2084,7 @@ class DynamicGpuPartialSortAggregateIterator( metrics, configuredTargetBatchSize, useTiered, - canSkipThirdPassAgg, + allowNonFullyAggregatedOutput, skipAggPassReductionRatio, localInputRowsMetrics) From 7451f841a0246d451c3b60b242bc52286f6d0e05 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Thu, 30 May 2024 14:07:05 +0800 Subject: [PATCH 04/16] fix only reduction case Signed-off-by: Hongbin Ma (Mahone) --- .../main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index 8caee0e0a6c..0bb67d42819 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -766,7 +766,8 @@ class GpuMergeAggregateIterator( val rowsAfterFirstPassAgg = aggregateInputBatches() // by now firstPassIter has been traversed, so localInputRowsCount is finished updating - if (skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) { + if (isReductionOnly || + skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) { // second pass agg tryMergeAggregatedBatches() From 4af0c728ca99483257f9b0fca225c5dbaccfdcdf Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Thu, 30 May 2024 14:15:18 +0800 Subject: [PATCH 05/16] fix compile Signed-off-by: Hongbin Ma (Mahone) --- .../scala/com/nvidia/spark/rapids/GpuAggregateExec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index 0bb67d42819..f2f28e29e41 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1765,8 +1765,9 @@ object GpuHashAggregateExecBase { * @param child incoming plan (where we get input columns from) * @param configuredTargetBatchSize user-configured maximum device memory size of a batch * @param configuredTieredProjectEnabled configurable optimization to use tiered projections - * @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation (can omit non fully - * aggregated data for non-final stage of aggregation) + * @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation + * (can omit non fully aggregated data for non-final + * stage of aggregation) * @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value */ case class GpuHashAggregateExec( From 35834cbd6df6995caf5212d7d929e3060418f32a Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Mon, 3 Jun 2024 11:31:22 +0800 Subject: [PATCH 06/16] fix Signed-off-by: Hongbin Ma (Mahone) --- .../src/main/python/hash_aggregate_test.py | 3 +- .../spark/rapids/GpuAggregateExec.scala | 5 +- .../spark/rapids/GpuTransitionOverrides.scala | 48 +++++++++++++++++++ tests/src/test/resources/log4j2.properties | 2 +- .../RapidsDataFrameAggregateSuite.scala | 14 ++++++ .../utils/RapidsSQLTestsBaseTrait.scala | 4 ++ 6 files changed, 73 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index e24a34ef3d5..0727abebd27 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -29,7 +29,8 @@ pytestmark = pytest.mark.nightly_resource_consuming_test _float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', - 'spark.rapids.sql.castStringToFloat.enabled': 'true' + 'spark.rapids.sql.castStringToFloat.enabled': 'true', + 'spark.rapids.sql.agg.skipAggPassReductionRatio': '0', } _float_smallbatch_conf = copy_and_update(_float_conf, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index f2f28e29e41..2b8bdb3ee40 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1241,7 +1241,10 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( mode == Partial || mode == PartialMerge } && agg.groupingExpressions.nonEmpty // Don't do this for a reduce... - val allowNonFullyAggregatedOutput = canUsePartialSortAgg + lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode => + mode == Partial || mode == PartialMerge + } && agg.aggregateExpressions.nonEmpty + // for a reduce case, we can distinguish between final and non-final, so don't allow lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr => orderable.isSupportedByPlugin(expr.dataType) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 48f9de5a61a..8da87c1c583 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashedRelationBroadcastMode} import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuShuffleEnv, GpuTaskMetrics} +import org.apache.spark.sql.rapids.aggregate.GpuPivotFirst import org.apache.spark.sql.rapids.execution.{ExchangeMappingCache, GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase, GpuBroadcastToRowExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase} import org.apache.spark.sql.types.StructType @@ -375,6 +376,46 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { InputFileBlockRule.hasInputFileExpression(plan) } + // This walks from the output to the input to look for any uses of GpuPivotFirst, + // which requires fully aggregated input, and thus we need to disable non fully aggregate + // optimization for Aggregate (check spark.rapids.sql.agg.skipAggPassReductionRatio). + private def updateAggsForFullyAggregatedRequired(plan: SparkPlan, + fullyAggregatedRequired: Boolean = false): SparkPlan = { + + if (plan.isInstanceOf[AdaptiveSparkPlanExec]) { + return plan + } + + if (plan.isInstanceOf[LeafExecNode]) { + return plan + } + + var required = fullyAggregatedRequired + plan match { + case exec: GpuHashAggregateExec => + // pivot don't allow non fully aggregated input + if (!required && exec.aggregateExpressions.exists(e => + e.origAggregateFunction.isInstanceOf[GpuPivotFirst])) { + required = true + } + case _ => + } + + val newPlan = plan.withNewChildren(plan.children.map(c => { + updateAggsForFullyAggregatedRequired(c, required) + })) + + newPlan match { + case exec: GpuHashAggregateExec => + if (required && exec.allowNonFullyAggregatedOutput) { + println ("xxxxxxxxxxxxxxx") + return exec.copy(allowNonFullyAggregatedOutput = false) + } + case _ => + } + newPlan + } + // This walks from the output to the input to look for any uses of InputFileName, // InputFileBlockStart, or InputFileBlockLength when we use a Parquet read because // we can't support the coalesce file reader optimization when this is used. @@ -781,6 +822,13 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { GpuOverrides.logDuration(rapidsConf.shouldExplain, t => f"GPU plan transition optimization took $t%.2f ms") { var updatedPlan = insertHashOptimizeSorts(plan) + if (updatedPlan.children.size > 1000) { + println(s"before a run of updateAggsForFullyAggregatedRequired," + + s" output: ${updatedPlan.toString()} ") + updatedPlan = updateAggsForFullyAggregatedRequired(updatedPlan) + println(s"a run of updateAggsForFullyAggregatedRequired," + + s" output: ${updatedPlan.toString()} ") + } updatedPlan = updateScansForInputAndOrder(updatedPlan) if (rapidsConf.isFileScanPrunePartitionEnabled) { updatedPlan = prunePartitionForFileSourceScan(updatedPlan) diff --git a/tests/src/test/resources/log4j2.properties b/tests/src/test/resources/log4j2.properties index 90d7dd3d469..a20dde8b4cd 100644 --- a/tests/src/test/resources/log4j2.properties +++ b/tests/src/test/resources/log4j2.properties @@ -32,7 +32,7 @@ appender.console.type=Console appender.console.name=consoleAppender appender.console.filter.threshold.type=ThresholdFilter # print error log and system.err to console -appender.console.filter.threshold.level=error +appender.console.filter.threshold.level=info appender.console.target=SYSTEM_ERR appender.console.layout.type=PatternLayout appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala index 5a394a5b0e8..0c55658ca5f 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala @@ -26,5 +26,19 @@ class RapidsDataFrameAggregateSuite extends DataFrameAggregateSuite with RapidsS // example to show how to replace the logic of an excluded test case in Vanilla Spark testRapids("collect functions" ) { // "collect functions" was excluded at RapidsTestSettings // println("...") + + } + + test("233") { +// { +// val df = spark.sql("select count(y), x from values ('a', 2), ('a', 4), ('b',5) as t(x, y)" + +// " group by x") +// df.show() +// } +// val df = spark.sql("select * from values ('a', 2), ('a', 4), ('a' , 4),('b',5) as t(x, y)") +// df.show() + spark.read.parquet("/tmp/t2"). + groupBy("x").pivot("y").agg(org.apache.spark.sql.functions.count("*")).show() + println } } diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala index f8b9d21d169..8326273d5d6 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala @@ -166,7 +166,11 @@ object RapidsSQLTestsBaseTrait extends Logging { // TODO: remove hard coded UTC https://github.com/NVIDIA/spark-rapids/issues/10874 .set("spark.sql.session.timeZone", "UTC") .set("spark.rapids.sql.explain", "ALL") + .set("spark.rapids.sql.batchSizeBytes","200") +// .set("spark.rapids.sql.variableFloatAgg.enabled","true") +// .set("spark.rapids.sql.agg.skipAggPassReductionRatio","1") // uncomment below config to run `strict mode`, where fallback to CPU is treated as fail + // .set("spark.rapids.sql.test.enabled", "true") // .set("spark.rapids.sql.test.allowedNonGpu", // "SerializeFromObjectExec,DeserializeToObjectExec,ExternalRDDScanExec") From 97230b5e71cfe78ad42b2b820f07b4765d19ff07 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Mon, 3 Jun 2024 11:34:40 +0800 Subject: [PATCH 07/16] clean Signed-off-by: Hongbin Ma (Mahone) --- .../src/main/python/hash_aggregate_test.py | 12 +++-- .../spark/rapids/GpuAggregateExec.scala | 5 +- .../spark/rapids/GpuTransitionOverrides.scala | 48 ------------------- .../com/nvidia/spark/rapids/RapidsConf.scala | 2 +- tests/src/test/resources/log4j2.properties | 2 +- .../RapidsDataFrameAggregateSuite.scala | 14 ------ .../utils/RapidsSQLTestsBaseTrait.scala | 4 -- 7 files changed, 13 insertions(+), 74 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 0727abebd27..fc3bae58a4b 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -29,9 +29,13 @@ pytestmark = pytest.mark.nightly_resource_consuming_test _float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', - 'spark.rapids.sql.castStringToFloat.enabled': 'true', + 'spark.rapids.sql.castStringToFloat.enabled': 'true', + } + +_float_conf_skipagg = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', + 'spark.rapids.sql.castStringToFloat.enabled': 'true', 'spark.rapids.sql.agg.skipAggPassReductionRatio': '0', - } + } _float_smallbatch_conf = copy_and_update(_float_conf, {'spark.rapids.sql.batchSizeBytes' : '250'}) @@ -222,8 +226,8 @@ def get_params(init_list, marked_params=[]): return list -# Run these tests with in 4 modes, all on the GPU -_confs = [_float_conf, _float_smallbatch_conf, _float_conf_final, _float_conf_partial] +# Run these tests with in 5 modes, all on the GPU +_confs = [_float_conf, _float_conf_skipagg, _float_smallbatch_conf, _float_conf_final, _float_conf_partial] # Pytest marker for list of operators allowed to run on the CPU, # esp. useful in partial and final only modes. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index 2b8bdb3ee40..7e6a1056d01 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1241,10 +1241,11 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( mode == Partial || mode == PartialMerge } && agg.groupingExpressions.nonEmpty // Don't do this for a reduce... + // for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final, + // so don't allow it. lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode => mode == Partial || mode == PartialMerge } && agg.aggregateExpressions.nonEmpty - // for a reduce case, we can distinguish between final and non-final, so don't allow lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr => orderable.isSupportedByPlugin(expr.dataType) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 8da87c1c583..48f9de5a61a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashedRelationBroadcastMode} import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuShuffleEnv, GpuTaskMetrics} -import org.apache.spark.sql.rapids.aggregate.GpuPivotFirst import org.apache.spark.sql.rapids.execution.{ExchangeMappingCache, GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase, GpuBroadcastToRowExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase} import org.apache.spark.sql.types.StructType @@ -376,46 +375,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { InputFileBlockRule.hasInputFileExpression(plan) } - // This walks from the output to the input to look for any uses of GpuPivotFirst, - // which requires fully aggregated input, and thus we need to disable non fully aggregate - // optimization for Aggregate (check spark.rapids.sql.agg.skipAggPassReductionRatio). - private def updateAggsForFullyAggregatedRequired(plan: SparkPlan, - fullyAggregatedRequired: Boolean = false): SparkPlan = { - - if (plan.isInstanceOf[AdaptiveSparkPlanExec]) { - return plan - } - - if (plan.isInstanceOf[LeafExecNode]) { - return plan - } - - var required = fullyAggregatedRequired - plan match { - case exec: GpuHashAggregateExec => - // pivot don't allow non fully aggregated input - if (!required && exec.aggregateExpressions.exists(e => - e.origAggregateFunction.isInstanceOf[GpuPivotFirst])) { - required = true - } - case _ => - } - - val newPlan = plan.withNewChildren(plan.children.map(c => { - updateAggsForFullyAggregatedRequired(c, required) - })) - - newPlan match { - case exec: GpuHashAggregateExec => - if (required && exec.allowNonFullyAggregatedOutput) { - println ("xxxxxxxxxxxxxxx") - return exec.copy(allowNonFullyAggregatedOutput = false) - } - case _ => - } - newPlan - } - // This walks from the output to the input to look for any uses of InputFileName, // InputFileBlockStart, or InputFileBlockLength when we use a Parquet read because // we can't support the coalesce file reader optimization when this is used. @@ -822,13 +781,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { GpuOverrides.logDuration(rapidsConf.shouldExplain, t => f"GPU plan transition optimization took $t%.2f ms") { var updatedPlan = insertHashOptimizeSorts(plan) - if (updatedPlan.children.size > 1000) { - println(s"before a run of updateAggsForFullyAggregatedRequired," + - s" output: ${updatedPlan.toString()} ") - updatedPlan = updateAggsForFullyAggregatedRequired(updatedPlan) - println(s"a run of updateAggsForFullyAggregatedRequired," + - s" output: ${updatedPlan.toString()} ") - } updatedPlan = updateScansForInputAndOrder(updatedPlan) if (rapidsConf.isFileScanPrunePartitionEnabled) { updatedPlan = prunePartitionForFileSourceScan(updatedPlan) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index b9c90808e11..fbaf076cf12 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1510,7 +1510,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .createWithDefault(true) val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio") - .doc("In non-final aggregation stages, if the previous agg pass has a reduction ratio " + + .doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " + "greater than this value, the next aggregation pass will be skipped." + "Setting this to 1 essentially disables this feature.") .doubleConf diff --git a/tests/src/test/resources/log4j2.properties b/tests/src/test/resources/log4j2.properties index a20dde8b4cd..90d7dd3d469 100644 --- a/tests/src/test/resources/log4j2.properties +++ b/tests/src/test/resources/log4j2.properties @@ -32,7 +32,7 @@ appender.console.type=Console appender.console.name=consoleAppender appender.console.filter.threshold.type=ThresholdFilter # print error log and system.err to console -appender.console.filter.threshold.level=info +appender.console.filter.threshold.level=error appender.console.target=SYSTEM_ERR appender.console.layout.type=PatternLayout appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala index 0c55658ca5f..5a394a5b0e8 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/suites/RapidsDataFrameAggregateSuite.scala @@ -26,19 +26,5 @@ class RapidsDataFrameAggregateSuite extends DataFrameAggregateSuite with RapidsS // example to show how to replace the logic of an excluded test case in Vanilla Spark testRapids("collect functions" ) { // "collect functions" was excluded at RapidsTestSettings // println("...") - - } - - test("233") { -// { -// val df = spark.sql("select count(y), x from values ('a', 2), ('a', 4), ('b',5) as t(x, y)" + -// " group by x") -// df.show() -// } -// val df = spark.sql("select * from values ('a', 2), ('a', 4), ('a' , 4),('b',5) as t(x, y)") -// df.show() - spark.read.parquet("/tmp/t2"). - groupBy("x").pivot("y").agg(org.apache.spark.sql.functions.count("*")).show() - println } } diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala index 8326273d5d6..f8b9d21d169 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsSQLTestsBaseTrait.scala @@ -166,11 +166,7 @@ object RapidsSQLTestsBaseTrait extends Logging { // TODO: remove hard coded UTC https://github.com/NVIDIA/spark-rapids/issues/10874 .set("spark.sql.session.timeZone", "UTC") .set("spark.rapids.sql.explain", "ALL") - .set("spark.rapids.sql.batchSizeBytes","200") -// .set("spark.rapids.sql.variableFloatAgg.enabled","true") -// .set("spark.rapids.sql.agg.skipAggPassReductionRatio","1") // uncomment below config to run `strict mode`, where fallback to CPU is treated as fail - // .set("spark.rapids.sql.test.enabled", "true") // .set("spark.rapids.sql.test.allowedNonGpu", // "SerializeFromObjectExec,DeserializeToObjectExec,ExternalRDDScanExec") From f0f47bd163a14fa7051807d0b441365f4964f972 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Mon, 3 Jun 2024 13:10:40 +0800 Subject: [PATCH 08/16] fix doc Signed-off-by: Hongbin Ma (Mahone) --- docs/additional-functionality/advanced_configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 935444a3cca..01dd4a350e0 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -60,7 +60,7 @@ Name | Description | Default Value | Applicable at spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup -spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous agg pass has a reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|0.9|Runtime +spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|0.9|Runtime spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime From 91b877c48384b7d8c1386c829a910b4bcefff175 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 4 Jun 2024 10:03:35 +0800 Subject: [PATCH 09/16] reduce premergeci2 Signed-off-by: Hongbin Ma (Mahone) --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 883b3f3acfc..f6cdc984971 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -166,7 +166,7 @@ ci_2() { # Download a Scala 2.12 build of spark prepare_spark $SPARK_VER 2.12 - ./integration_tests/run_pyspark_from_build.sh + ./integration_tests/run_pyspark_from_build.sh -k 'array_test.py and test_array_element_at_ansi_fail_invalid_index' # enable avro test separately INCLUDE_SPARK_AVRO_JAR=true TEST='avro_test.py' ./integration_tests/run_pyspark_from_build.sh From c9925c3137e5828b3420ab15139d834178ace3cf Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 4 Jun 2024 10:41:40 +0800 Subject: [PATCH 10/16] reduce premergeci2, 2 Signed-off-by: Hongbin Ma (Mahone) --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index f6cdc984971..8c871f4aaef 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -166,7 +166,7 @@ ci_2() { # Download a Scala 2.12 build of spark prepare_spark $SPARK_VER 2.12 - ./integration_tests/run_pyspark_from_build.sh -k 'array_test.py and test_array_element_at_ansi_fail_invalid_index' + ./integration_tests/run_pyspark_from_build.sh -k 'array_test.py' # enable avro test separately INCLUDE_SPARK_AVRO_JAR=true TEST='avro_test.py' ./integration_tests/run_pyspark_from_build.sh From c62838ad8019d990f7dd58928aa9d7f599fe8922 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 4 Jun 2024 11:56:32 +0800 Subject: [PATCH 11/16] use test_parallel to workaround flaky array test Signed-off-by: Hongbin Ma (Mahone) --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 8c871f4aaef..e497cbe5b33 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -162,7 +162,7 @@ ci_2() { $MVN_CMD -U -B $MVN_URM_MIRROR clean package $MVN_BUILD_ARGS -DskipTests=true export TEST_TAGS="not premerge_ci_1" export TEST_TYPE="pre-commit" - export TEST_PARALLEL=5 + export TEST_PARALLEL=4 # Download a Scala 2.12 build of spark prepare_spark $SPARK_VER 2.12 From c4c50539a442063b74b2213839fc4784d73da30e Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 4 Jun 2024 13:10:04 +0800 Subject: [PATCH 12/16] address review comment Signed-off-by: Hongbin Ma (Mahone) --- .../src/main/python/hash_aggregate_test.py | 10 ++++------ jenkins/spark-premerge-build.sh | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index fc3bae58a4b..0aca9b2c8c6 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -32,14 +32,12 @@ 'spark.rapids.sql.castStringToFloat.enabled': 'true', } -_float_conf_skipagg = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', - 'spark.rapids.sql.castStringToFloat.enabled': 'true', - 'spark.rapids.sql.agg.skipAggPassReductionRatio': '0', - } - _float_smallbatch_conf = copy_and_update(_float_conf, {'spark.rapids.sql.batchSizeBytes' : '250'}) +_float_conf_skipagg = copy_and_update(_float_smallbatch_conf, + {'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'}) + _float_conf_partial = copy_and_update(_float_conf, {'spark.rapids.sql.hashAgg.replaceMode': 'partial'}) @@ -227,7 +225,7 @@ def get_params(init_list, marked_params=[]): # Run these tests with in 5 modes, all on the GPU -_confs = [_float_conf, _float_conf_skipagg, _float_smallbatch_conf, _float_conf_final, _float_conf_partial] +_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial] # Pytest marker for list of operators allowed to run on the CPU, # esp. useful in partial and final only modes. diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index e497cbe5b33..56615258926 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -166,7 +166,7 @@ ci_2() { # Download a Scala 2.12 build of spark prepare_spark $SPARK_VER 2.12 - ./integration_tests/run_pyspark_from_build.sh -k 'array_test.py' + ./integration_tests/run_pyspark_from_build.sh # enable avro test separately INCLUDE_SPARK_AVRO_JAR=true TEST='avro_test.py' ./integration_tests/run_pyspark_from_build.sh From 9e797737ca8966c2c395b7a66d3a9af1617c6fee Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 4 Jun 2024 13:20:35 +0800 Subject: [PATCH 13/16] remove comma Signed-off-by: Hongbin Ma (Mahone) --- integration_tests/src/main/python/hash_aggregate_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 0aca9b2c8c6..d1cd70aa43c 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -29,7 +29,7 @@ pytestmark = pytest.mark.nightly_resource_consuming_test _float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', - 'spark.rapids.sql.castStringToFloat.enabled': 'true', + 'spark.rapids.sql.castStringToFloat.enabled': 'true' } _float_smallbatch_conf = copy_and_update(_float_conf, From ac4801bc617b8e81463335efb33770a754e7ee82 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 4 Jun 2024 16:47:46 +0800 Subject: [PATCH 14/16] workaround for ci_scala213 Signed-off-by: Hongbin Ma (Mahone) --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 56615258926..d124f754b9c 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -206,7 +206,7 @@ ci_scala213() { cd .. # Run integration tests in the project root dir to leverage test cases and resource files export TEST_TAGS="not premerge_ci_1" export TEST_TYPE="pre-commit" - export TEST_PARALLEL=5 + export TEST_PARALLEL=4 # SPARK_HOME (and related) must be set to a Spark built with Scala 2.13 SPARK_HOME=$SPARK_HOME PYTHONPATH=$PYTHONPATH \ ./integration_tests/run_pyspark_from_build.sh From 283a4a504361c04a070d63ceb9aec21d14c86d79 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Wed, 26 Jun 2024 14:56:44 +0800 Subject: [PATCH 15/16] disable agg ratio heruistic by default Signed-off-by: Hongbin Ma (Mahone) --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 0214aad16a7..aad4f05b334 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1515,7 +1515,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") "Setting this to 1 essentially disables this feature.") .doubleConf .checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].") - .createWithDefault(0.9) + .createWithDefault(1.0) val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] = conf("spark.rapids.sql.agg.forceSinglePassPartialSort") From 3e51b42dfdedac3e98a01d90d074cedbcd19188f Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Wed, 26 Jun 2024 16:22:32 +0800 Subject: [PATCH 16/16] fix doc Signed-off-by: Hongbin Ma (Mahone) --- docs/additional-functionality/advanced_configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 0a58858143f..033e332b99c 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -60,7 +60,7 @@ Name | Description | Default Value | Applicable at spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup -spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|0.9|Runtime +spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime