From c0b4ddf0f230d8be6c26d9cfc725b2707e0eed96 Mon Sep 17 00:00:00 2001 From: Cindy Jiang <47068112+cindyyuanjiang@users.noreply.github.com> Date: Thu, 11 Jan 2024 11:24:19 -0800 Subject: [PATCH] [FEA] Enable AQE related recommendations in Profiler Auto-tuner (#688) * Add recommendations in AutoTuner for AQE configs and unit tests --------- Signed-off-by: Partho Sarthi Signed-off-by: cindyyuanjiang Co-authored-by: Partho Sarthi --- .../profiling/ApplicationSummaryInfo.scala | 24 +- .../rapids/tool/profiling/AutoTuner.scala | 85 ++++++- .../spark/sql/rapids/tool/ToolUtils.scala | 27 +++ .../tool/profiling/AutoTunerSuite.scala | 215 +++++++++++++++++- 4 files changed, 332 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index 8b4b411d4..6189667da 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -54,8 +54,10 @@ trait AppInfoSqlTaskAggMetricsVisitor { def getSpilledMetrics: Seq[Long] } -trait AppInfoSQLMaxTaskInputSizes { +trait AppInfoSQLTaskInputSizes { def getMaxInput: Double + def getMeanInput: Double + def getMeanShuffleRead: Double } trait AppInfoReadMetrics { @@ -69,13 +71,15 @@ trait AppInfoReadMetrics { */ class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter with AppInfoSqlTaskAggMetricsVisitor - with AppInfoSQLMaxTaskInputSizes + with AppInfoSQLTaskInputSizes with AppInfoReadMetrics { override def getSparkProperty(propKey: String): Option[String] = None override def getRapidsProperty(propKey: String): Option[String] = None override def getProperty(propKey: String): Option[String] = None override def getSparkVersion: Option[String] = None override def getMaxInput: Double = 0.0 + override def getMeanInput: Double = 0.0 + override def getMeanShuffleRead: Double = 0.0 override def getJvmGCFractions: Seq[Double] = Seq() override def getSpilledMetrics: Seq[Long] = Seq() override def getRapidsJars: Seq[String] = Seq() @@ -161,4 +165,20 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo) .values .sum } + + override def getMeanInput: Double = { + if (app.ioMetrics.nonEmpty) { + app.ioMetrics.map(_.inputBytesReadSum).sum * 1.0 / app.ioMetrics.size + } else { + 0.0 + } + } + + override def getMeanShuffleRead: Double = { + if (app.ioMetrics.nonEmpty) { + app.ioMetrics.map(_.srTotalBytesReadSum).sum * 1.0 / app.ioMetrics.size + } else { + 0.0 + } + } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 1ecd13a9b..723229b4d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -33,7 +33,7 @@ import org.yaml.snakeyaml.constructor.{Constructor, ConstructorException} import org.yaml.snakeyaml.representer.Representer import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.{GpuTypes, ToolUtils} import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil /** @@ -79,15 +79,18 @@ class GpuWorkerProps( } /** - * If the GPU memory is missing, it will sets a default valued based on the GPU device and the - * static HashMap [[AutoTuner.DEF_WORKER_GPU_MEMORY_MB]]. + * If the GPU memory is missing, it will sets a default valued based on the GPU device type. * If it is still missing, it sets a default to 15109m. * * @return true if the value has been updated. */ def setDefaultGpuMemIfMissing(): Boolean = { if (memory == null || memory.isEmpty || memory.startsWith("0")) { - memory = AutoTuner.DEF_WORKER_GPU_MEMORY_MB.getOrElse(getName, "15109m") + memory = try { + GpuTypes.getGpuMem(getName) + } catch { + case _: IllegalArgumentException => "15109m" + } true } else { false @@ -586,6 +589,7 @@ class AutoTuner( appendRecommendation("spark.rapids.shuffle.multiThreaded.writer.threads", numExecutorCores) appendRecommendation("spark.rapids.sql.multiThreadedRead.numThreads", Math.max(20, numExecutorCores)) + appendRecommendation("spark.rapids.sql.batchSizeBytes", BATCH_SIZE_BYTES) recommendAQEProperties() } @@ -656,7 +660,7 @@ class AutoTuner( private def recommendAQEProperties(): Unit = { val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled") - .getOrElse("false").toLowerCase + .getOrElse("false").toLowerCase if (aqeEnabled == "false") { appendComment(commentsForMissingProps("spark.sql.adaptive.enabled")) } @@ -682,11 +686,56 @@ class AutoTuner( } case None => } - if (getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes").isEmpty) { - // The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear - // scaling until it is full and 128m makes the GPU more full, but too large can be slightly - // problematic because this is the compressed shuffle size - appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") + val advisoryPartitionSizeProperty = + getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes") + if (appInfoProvider.getMeanInput < AQE_INPUT_SIZE_BYTES_THRESHOLD) { + if(advisoryPartitionSizeProperty.isEmpty) { + // The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear + // scaling until it is full and 128m makes the GPU more full, but too large can be slightly + // problematic because this is the compressed shuffle size + appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") + } + } + if (appInfoProvider.getMeanInput > AQE_INPUT_SIZE_BYTES_THRESHOLD && + appInfoProvider.getMeanShuffleRead > AQE_SHUFFLE_READ_BYTES_THRESHOLD) { + // AQE Recommendations for large input and large shuffle reads + val advisoryPartitionSizeInBytes = clusterProps.gpu.name match { + case GpuTypes.A100 => "64m" + case GpuTypes.T4 => "32m" + case _ => AQE_DEF_ADVISORY_PARTITION_SIZE + } + appendRecommendation( + "spark.sql.adaptive.advisoryPartitionSizeInBytes", + advisoryPartitionSizeInBytes + ) + val initialPartitionNumProperty = + getPropertyValue("spark.sql.adaptive.coalescePartitions.initialPartitionNum").map(_.toInt) + if (initialPartitionNumProperty.getOrElse(0) <= AQE_MIN_INITIAL_PARTITION_NUM) { + val initialPartitionNum = clusterProps.gpu.name match { + case GpuTypes.A100 => 400 + case GpuTypes.T4 => 800 + case _ => AQE_DEF_INITIAL_PARTITION_NUM + } + appendRecommendation( + "spark.sql.adaptive.coalescePartitions.initialPartitionNum", + initialPartitionNum + ) + } + // We need to set this to false, else Spark ignores the target size specified by + // spark.sql.adaptive.advisoryPartitionSizeInBytes. + // Reference: https://spark.apache.org/docs/latest/sql-performance-tuning.html + appendRecommendation("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false") + } + + val autoBroadcastJoinThresholdProperty = + getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(convertToMB) + if (autoBroadcastJoinThresholdProperty.isEmpty) { + appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.") + } else if (autoBroadcastJoinThresholdProperty.get > + convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) { + appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " + + s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance\n" + + " regression. Should be set to a lower number.") } } @@ -995,10 +1044,14 @@ object AutoTuner extends Logging { // GPU count defaults to 1 if it is missing. val DEF_WORKER_GPU_COUNT = 1 // GPU default device is T4 - val DEF_WORKER_GPU_NAME = "T4" + val DEF_WORKER_GPU_NAME = GpuTypes.T4 // T4 default memory is 16G // A100 set default to 40GB - val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map("T4"-> "15109m", "A100" -> "40960m") + val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map( + GpuTypes.T4 -> "15109m", GpuTypes.A100 -> "40960m", GpuTypes.V100 -> "16384m", + GpuTypes.K80 -> "12288m", GpuTypes.P100 -> "16384m", GpuTypes.P100 -> "16384m", + GpuTypes.P4 -> "8192m", GpuTypes.L4 -> "24576m", GpuTypes.A10 -> "24576m", + GpuTypes.A10G -> "24576m") // Default Number of Workers 1 val DEF_NUM_WORKERS = 1 // Default distinct read location thresholds is 50% @@ -1009,6 +1062,14 @@ object AutoTuner extends Logging { val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p") private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" + "additional-functionality/advanced_configs.html#advanced-configuration" + // Value of batchSizeBytes that performs best overall + private val BATCH_SIZE_BYTES = 2147483647 + private val AQE_INPUT_SIZE_BYTES_THRESHOLD = 35000 + private val AQE_SHUFFLE_READ_BYTES_THRESHOLD = 50000 + private val AQE_DEF_ADVISORY_PARTITION_SIZE = "32m" + private val AQE_DEF_INITIAL_PARTITION_NUM = 800 + private val AQE_MIN_INITIAL_PARTITION_NUM = 200 + private val AQE_AUTOBROADCAST_JOIN_THRESHOLD = "100m" val commentsForMissingProps: Map[String, String] = Map( "spark.executor.memory" -> diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index faeaf78c2..b4198fb9a 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -380,3 +380,30 @@ object SupportedMLFuncsName { } case class GpuEventLogException(message: String) extends Exception(message) + +object GpuTypes { + val A100 = "A100" + val T4 = "T4" + val V100 = "V100" + val K80 = "K80" + val P100 = "P100" + val P4 = "P4" + val L4 = "L4" + val A10 = "A10" + val A10G = "A10G" + + def getGpuMem(gpu: String): String = { + gpu match { + case A100 => "40960m" // A100 set default to 40GB + case T4 => "15109m" // T4 default memory is 16G + case V100 => "16384m" + case K80 => "12288m" + case P100 => "16384m" + case P4 => "8192m" + case L4 => "24576m" + case A10 => "24576m" + case A10G => "24576m" + case _ => throw new IllegalArgumentException(s"Invalid input gpu type: $gpu") + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index 0924990b4..68999059a 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -24,6 +24,8 @@ import scala.collection.mutable import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames} import org.scalatest.{BeforeAndAfterEach, FunSuite} import org.scalatest.Matchers.convertToAnyShouldWrapper +import org.scalatest.prop.TableDrivenPropertyChecks._ +import org.scalatest.prop.TableFor4 import org.yaml.snakeyaml.{DumperOptions, Yaml} import org.apache.spark.internal.Logging @@ -37,8 +39,12 @@ class AppInfoProviderMockTest(val maxInput: Double, val sparkVersion: Option[String], val rapidsJars: Seq[String], val distinctLocationPct: Double, - val redundantReadSize: Long) extends AppSummaryInfoBaseProvider { + val redundantReadSize: Long, + val meanInput: Double, + val meanShuffleRead: Double) extends AppSummaryInfoBaseProvider { override def getMaxInput: Double = maxInput + override def getMeanInput: Double = meanInput + override def getMeanShuffleRead: Double = meanShuffleRead override def getSpilledMetrics: Seq[Long] = spilledMetrics override def getJvmGCFractions: Seq[Double] = jvmGCFractions override def getProperty(propKey: String): Option[String] = propsFromLog.get(propKey) @@ -123,9 +129,11 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { sparkVersion: Option[String], rapidsJars: Seq[String] = Seq(), distinctLocationPct: Double = 0.0, - redundantReadSize: Long = 0): AppSummaryInfoBaseProvider = { + redundantReadSize: Long = 0, + meanInput: Double = 0.0, + meanShuffleRead: Double = 0.0): AppSummaryInfoBaseProvider = { new AppInfoProviderMockTest(maxInput, spilledMetrics, jvmGCFractions, propsFromLog, - sparkVersion, rapidsJars, distinctLocationPct, redundantReadSize) + sparkVersion, rapidsJars, distinctLocationPct, redundantReadSize, meanInput, meanShuffleRead) } test("verify 3.2.0+ auto conf setting") { @@ -147,6 +155,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.rapids.memory.pinnedPool.size=4096m |--conf spark.rapids.shuffle.multiThreaded.reader.threads=16 |--conf spark.rapids.shuffle.multiThreaded.writer.threads=16 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.rapids.sql.multiThreadedRead.numThreads=20 |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark320.RapidsShuffleManager @@ -164,10 +173,12 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.rapids.memory.pinnedPool.size' was not set. |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.rapids.sql.concurrentGpuTasks' was not set. |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. |- 'spark.shuffle.manager' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionSize' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. @@ -351,6 +362,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.rapids.memory.pinnedPool.size=4096m |--conf spark.rapids.shuffle.multiThreaded.reader.threads=16 |--conf spark.rapids.shuffle.multiThreaded.writer.threads=16 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.rapids.sql.multiThreadedRead.numThreads=20 |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager @@ -368,10 +380,12 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.rapids.memory.pinnedPool.size' was not set. |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.rapids.sql.concurrentGpuTasks' was not set. |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. |- 'spark.shuffle.manager' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. @@ -408,6 +422,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memoryOverhead=11673m |--conf spark.rapids.shuffle.multiThreaded.reader.threads=32 |--conf spark.rapids.shuffle.multiThreaded.writer.threads=32 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.multiThreadedRead.numThreads=32 |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m @@ -418,9 +433,11 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. |- 'spark.shuffle.manager' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU count is missing. Setting default to 1. @@ -453,12 +470,15 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val expectedResults = s"""| |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU memory is missing. Setting default to 15109m. @@ -493,9 +513,12 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val expectedResults = s"""| |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU memory is missing. Setting default to 15109m. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -526,12 +549,15 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val expectedResults = s"""| |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU device is missing. Setting default to T4. @@ -565,12 +591,15 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val expectedResults = s"""| |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU memory is missing. Setting default to 15109m. @@ -598,12 +627,15 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val expectedResults = s"""| |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -637,12 +669,15 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { s"""| |Spark Properties: |--conf spark.executor.instances=8 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -666,6 +701,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.rapids.memory.pinnedPool.size=4096m |--conf spark.rapids.shuffle.multiThreaded.reader.threads=16 |--conf spark.rapids.shuffle.multiThreaded.writer.threads=16 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.rapids.sql.multiThreadedRead.numThreads=20 |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager @@ -683,10 +719,12 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.rapids.memory.pinnedPool.size' was not set. |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.rapids.sql.concurrentGpuTasks' was not set. |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. |- 'spark.shuffle.manager' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. @@ -746,6 +784,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memory=16384m |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=160 @@ -755,7 +794,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -926,6 +967,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memory=16384m |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.files.maxPartitionBytes=3669m @@ -934,7 +976,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")} |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} @@ -989,6 +1033,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memory=16384m |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.files.maxPartitionBytes=3669m @@ -997,7 +1042,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -1047,6 +1094,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memory=16384m |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.files.maxPartitionBytes=3669m @@ -1055,7 +1103,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -1092,12 +1142,15 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val expectedResults = s"""| |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- ${AutoTuner.classPathComments("rapids.jars.multiple")} [23.06.0, 23.02.1] @@ -1122,13 +1175,16 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { s"$latestRelease/rapids-4-spark_2.12-$latestRelease.jar" val expectedResults = s"""| - |Spark Properties: + |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- A newer RAPIDS Accelerator for Apache Spark plugin is available: @@ -1151,13 +1207,16 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { } val expectedResults = s"""| - |Spark Properties: + |Spark Properties: + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 |--conf spark.sql.shuffle.partitions=200 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} @@ -1207,6 +1266,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.filecache.enabled=true |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.files.maxPartitionBytes=3669m @@ -1216,7 +1276,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. |- 'spark.rapids.filecache.enabled' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. |- Enable file cache only if Spark local disks bandwidth is > 1 GB/s @@ -1266,6 +1328,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memory=16384m |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.files.maxPartitionBytes=3669m @@ -1274,7 +1337,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -1343,6 +1408,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.instances=20 |--conf spark.executor.memory=16384m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=160 @@ -1350,7 +1416,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.task.resource.gpu.amount=0.125 | |Comments: + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")} @@ -1425,6 +1493,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.rapids.memory.pinnedPool.size=4096m |--conf spark.rapids.shuffle.multiThreaded.reader.threads=16 |--conf spark.rapids.shuffle.multiThreaded.writer.threads=16 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.incompatibleDateFormats.enabled=true |--conf spark.rapids.sql.multiThreadedRead.numThreads=20 |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager @@ -1439,10 +1508,12 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.rapids.memory.pinnedPool.size' was not set. |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set. |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. |- 'spark.shuffle.manager' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. @@ -1507,6 +1578,137 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { assert(expectedResults == autoTunerOutput) } + test("AQE configuration autoBroadcastJoinThreshold should not be GTE 100mb") { + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.task.resource.gpu.amount" -> "0.0625") + // mock the properties loaded from eventLog + val logEventsProps: mutable.Map[String, String] = + mutable.LinkedHashMap[String, String]( + "spark.executor.cores" -> "16", + "spark.executor.instances" -> "1", + "spark.executor.memory" -> "80g", + "spark.executor.resource.gpu.amount" -> "1", + "spark.executor.instances" -> "1", + "spark.sql.shuffle.partitions" -> "200", + "spark.sql.files.maxPartitionBytes" -> "1g", + "spark.task.resource.gpu.amount" -> "0.0625", + "spark.executor.memoryOverhead" -> "7372m", + "spark.rapids.memory.pinnedPool.size" -> "5g", + "spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.SQLPlugin", + "spark.rapids.sql.concurrentGpuTasks" -> "4", + "spark.sql.adaptive.enabled" -> "true", + "spark.sql.adaptive.autoBroadcastJoinThreshold" -> "500mb") + val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + Some("212992MiB"), Some(5), Some(4), Some("15109MiB"), Some("Tesla T4")) + val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, + Some(defaultSparkVersion)) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""| + |Spark Properties: + |--conf spark.executor.cores=8 + |--conf spark.executor.instances=20 + |--conf spark.executor.memory=16384m + |--conf spark.executor.memoryOverhead=6758m + |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.shuffle.multiThreaded.reader.threads=8 + |--conf spark.rapids.shuffle.multiThreaded.writer.threads=8 + |--conf spark.rapids.sql.batchSizeBytes=2147483647 + |--conf spark.rapids.sql.concurrentGpuTasks=2 + |--conf spark.rapids.sql.multiThreadedRead.numThreads=20 + |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager + |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m + |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=160 + |--conf spark.sql.files.maxPartitionBytes=4096m + |--conf spark.task.resource.gpu.amount=0.125 + | + |Comments: + |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. + |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. + |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. + |- 'spark.shuffle.manager' was not set. + |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. + |- ${AutoTuner.classPathComments("rapids.jars.missing")} + |- Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > 100m could lead to performance\n regression. Should be set to a lower number. + |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } + + private def testPartitionConfigurations( + inputSize: Double, + shuffleRead: Double, + gpuDevice: String, + expectedLines: Seq[String]): Unit = { + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.task.resource.gpu.amount" -> "0.0625") + // mock the properties loaded from eventLog + val logEventsProps: mutable.Map[String, String] = + mutable.LinkedHashMap[String, String]( + "spark.executor.cores" -> "16", + "spark.executor.instances" -> "1", + "spark.executor.memory" -> "80g", + "spark.executor.resource.gpu.amount" -> "1", + "spark.executor.instances" -> "1", + "spark.sql.shuffle.partitions" -> "200", + "spark.sql.files.maxPartitionBytes" -> "1g", + "spark.task.resource.gpu.amount" -> "0.0625", + "spark.rapids.memory.pinnedPool.size" -> "5g", + "spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.SQLPlugin", + "spark.rapids.sql.concurrentGpuTasks" -> "4") + val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + Some("212992MiB"), Some(5), Some(4), Some("15109MiB"), Some(gpuDevice)) + val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, + Some(defaultSparkVersion), meanInput = inputSize, meanShuffleRead = shuffleRead) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + assert(expectedLines.forall(line => autoTunerOutput.contains(line)), + s"Expected lines not found in AutoTuner output") + } + + val testCases: TableFor4[Int, Int, String, Seq[String]] = Table( + ("inputSize", "shuffleRead", "gpuDevice", "expectedLines"), + // small input, small shuffle read + (1000, 1000, "T4", Seq( + "--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m")), + // large input, small shuffle read + (40000, 1000, "T4", Seq()), + // large input, large shuffle read + (40000, 80000, "T4", Seq( + "--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=32m", + "--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=800", + "--conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false" + )), + // large input, large shuffle read, faster GPU + (40000, 80000, "A100", Seq( + "--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=64m", + "--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=400", + "--conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false" + )) + ) + + forAll(testCases) { (inputSize, shuffleRead, gpuDevice, expectedLines) => + test(s"AQE partition configs - input size: $inputSize," + + s" shuffle read: $shuffleRead, gpu device: $gpuDevice") { + testPartitionConfigurations(inputSize, shuffleRead, gpuDevice, expectedLines) + } + } + test("Handle adaptive auto shuffle configuration setting properly") { val customProps = mutable.LinkedHashMap( "spark.databricks.adaptive.autoOptimizeShuffle.enabled" -> "true") @@ -1547,6 +1749,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |--conf spark.executor.memory=16384m |--conf spark.executor.memoryOverhead=6758m |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 |--conf spark.rapids.sql.concurrentGpuTasks=2 |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m |--conf spark.sql.files.maxPartitionBytes=3669m @@ -1555,7 +1758,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |Comments: |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. |- ${AutoTuner.classPathComments("rapids.jars.missing")}