From 162f61d32f8e767ec32d231751da44446b2930ec Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 17 Dec 2024 21:13:51 -0800 Subject: [PATCH] Split AutoTuner for Profiling and Qualification and override batch size Signed-off-by: Partho Sarthi --- .../rapids/tool/profiling/AutoTuner.scala | 228 ++++---- .../rapids/tool/profiling/ProfileArgs.scala | 2 +- .../rapids/tool/profiling/Profiler.scala | 8 +- .../tool/qualification/Qualification.scala | 3 +- .../qualification/QualificationArgs.scala | 3 +- .../QualificationAutoTuner.scala | 51 ++ .../qualification/QualificationMain.scala | 4 +- ...ala => QualificationAutoTunerRunner.scala} | 14 +- .../rapids/tool/tuning/TunerContext.scala | 2 +- .../tool/profiling/AutoTunerSuite.scala | 507 +++++++++++------- .../QualificationAutoTunerSuite.scala | 66 +++ 11 files changed, 574 insertions(+), 314 deletions(-) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTuner.scala rename core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/{QualificationAutoTuner.scala => QualificationAutoTunerRunner.scala} (89%) create mode 100644 core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTunerSuite.scala diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 44ff839ba..dcf9e0f3a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -63,10 +63,10 @@ class GpuWorkerProps( * * @return true if the value has been updated. */ - def setDefaultGpuCountIfMissing(): Boolean = { + def setDefaultGpuCountIfMissing(autoTunerConfigsProvider: AutoTunerConfigsProvider): Boolean = { // TODO - do we want to recommend 1 or base it on core count? 32 cores to 1 gpu may be to much. if (count == 0) { - count = AutoTuner.DEF_WORKER_GPU_COUNT + count = autoTunerConfigsProvider.DEF_WORKER_GPU_COUNT true } else { false @@ -105,9 +105,10 @@ class GpuWorkerProps( * @return a list containing information of what was missing and the default value that has been * used to initialize the field. */ - def setMissingFields(platform: Platform): Seq[String] = { + def setMissingFields(platform: Platform, + autoTunerConfigsProvider: AutoTunerConfigsProvider): Seq[String] = { val res = new ListBuffer[String]() - if (setDefaultGpuCountIfMissing()) { + if (setDefaultGpuCountIfMissing(autoTunerConfigsProvider)) { res += s"GPU count is missing. Setting default to $getCount." } if (setDefaultGpuNameIfMissing(platform)) { @@ -143,9 +144,9 @@ class SystemClusterProps( // consider the object incorrect if either numCores or memory are not set. memory == null || memory.isEmpty || numCores <= 0 || memory.startsWith("0") } - def setDefaultNumWorkersIfMissing(): Boolean = { + def setDefaultNumWorkersIfMissing(autoTunerConfigsProvider: AutoTunerConfigsProvider): Boolean = { if (numWorkers <= 0) { - numWorkers = AutoTuner.DEF_NUM_WORKERS + numWorkers = autoTunerConfigsProvider.DEF_NUM_WORKERS true } else { false @@ -156,9 +157,9 @@ class SystemClusterProps( * @return a list containing information of what was missing and the default value that has been * used to initialize the field. */ - def setMissingFields(): Seq[String] = { + def setMissingFields(autoTunerConfigsProvider: AutoTunerConfigsProvider): Seq[String] = { val res = new ListBuffer[String]() - if (setDefaultNumWorkersIfMissing()) { + if (setDefaultNumWorkersIfMissing(autoTunerConfigsProvider)) { res += s"Number of workers is missing. Setting default to $getNumWorkers." } res @@ -184,17 +185,12 @@ class ClusterProperties( @BeanProperty var gpu: GpuWorkerProps, @BeanProperty var softwareProperties: util.LinkedHashMap[String, String]) { - import AutoTuner._ - def this() { this(new SystemClusterProps(), new GpuWorkerProps(), new util.LinkedHashMap[String, String]()) } def isEmpty: Boolean = { system.isEmpty && gpu.isEmpty } - def getTargetProperties: mutable.Map[String, String] = { - softwareProperties.asScala.filter(entry => recommendationsTarget.contains(entry._1)) - } override def toString: String = s"{${system.toString}, ${gpu.toString}, $softwareProperties}" } @@ -337,11 +333,10 @@ class AutoTuner( val clusterProps: ClusterProperties, val appInfoProvider: AppSummaryInfoBaseProvider, val platform: Platform, - val driverInfoProvider: DriverLogInfoProvider) + val driverInfoProvider: DriverLogInfoProvider, + val autoTunerConfigsProvider: AutoTunerConfigsProvider) extends Logging { - import AutoTuner._ - var comments = new ListBuffer[String]() var recommendations: mutable.LinkedHashMap[String, RecommendationEntry] = mutable.LinkedHashMap[String, RecommendationEntry]() @@ -369,7 +364,7 @@ class AutoTuner( } def initRecommendations(): Unit = { - recommendationsTarget.foreach { key => + autoTunerConfigsProvider.recommendationsTarget.foreach { key => // no need to add new records if they are missing from props getPropertyValue(key).foreach { propVal => val recommendationVal = new RecommendationEntry(key, Option(propVal), None) @@ -455,7 +450,7 @@ class AutoTuner( * Assumption - cluster properties were updated to have a default values if missing. */ def calcGpuConcTasks(): Long = { - Math.min(MAX_CONC_GPU_TASKS, platform.getGpuOrDefault.getGpuConcTasks) + Math.min(autoTunerConfigsProvider.MAX_CONC_GPU_TASKS, platform.getGpuOrDefault.getGpuConcTasks) } /** @@ -478,7 +473,7 @@ class AutoTuner( val maxExecutorHeap = Math.max(0, executorContainerMemCalculator()).toInt // give up to 2GB of heap to each executor core // TODO - revisit this in future as we could let heap be bigger - Math.min(maxExecutorHeap, DEF_HEAP_PER_CORE_MB * numExecCores) + Math.min(maxExecutorHeap, autoTunerConfigsProvider.DEF_HEAP_PER_CORE_MB * numExecCores) } /** @@ -497,10 +492,14 @@ class AutoTuner( val containerMem = containerMemCalculator.apply() var setMaxBytesInFlight = false // reserve 10% of heap as memory overhead - var executorMemOverhead = (executorHeap * DEF_HEAP_OVERHEAD_FRACTION).toLong - executorMemOverhead += DEF_PAGEABLE_POOL_MB + var executorMemOverhead = ( + executorHeap * autoTunerConfigsProvider.DEF_HEAP_OVERHEAD_FRACTION + ).toLong + executorMemOverhead += autoTunerConfigsProvider.DEF_PAGEABLE_POOL_MB val containerMemLeftOverOffHeap = containerMem - executorHeap - val minOverhead = executorMemOverhead + (MIN_PINNED_MEMORY_MB + MIN_SPILL_MEMORY_MB) + val minOverhead = executorMemOverhead + ( + autoTunerConfigsProvider.MIN_PINNED_MEMORY_MB + autoTunerConfigsProvider.MIN_SPILL_MEMORY_MB + ) logDebug("containerMem " + containerMem + " executorHeap: " + executorHeap + " executorMemOverhead: " + executorMemOverhead + " minOverhead " + minOverhead) if (containerMemLeftOverOffHeap >= minOverhead) { @@ -508,14 +507,15 @@ class AutoTuner( // memory to core ratio if (numExecutorCores >= 16 && platform.isPlatformCSP && containerMemLeftOverOffHeap > - executorMemOverhead + 4096L + MIN_PINNED_MEMORY_MB + MIN_SPILL_MEMORY_MB) { + executorMemOverhead + 4096L + autoTunerConfigsProvider.MIN_PINNED_MEMORY_MB + + autoTunerConfigsProvider.MIN_SPILL_MEMORY_MB) { // Account for the setting of: // appendRecommendation("spark.rapids.shuffle.multiThreaded.maxBytesInFlight", "4g") executorMemOverhead += 4096L setMaxBytesInFlight = true } // Pinned memory uses any unused space up to 4GB. Spill memory is same size as pinned. - val pinnedMem = Math.min(MAX_PINNED_MEMORY_MB, + val pinnedMem = Math.min(autoTunerConfigsProvider.MAX_PINNED_MEMORY_MB, (containerMemLeftOverOffHeap - executorMemOverhead) / 2).toLong // Spill storage is set to the pinned size by default. Its not guaranteed to use just pinned // memory though so the size worst case would be doesn't use any pinned memory and uses @@ -525,7 +525,8 @@ class AutoTuner( executorMemOverhead += pinnedMem + spillMem } else { // use min pinned and spill mem - executorMemOverhead += MIN_PINNED_MEMORY_MB + MIN_SPILL_MEMORY_MB + executorMemOverhead += autoTunerConfigsProvider.MIN_PINNED_MEMORY_MB + + autoTunerConfigsProvider.MIN_SPILL_MEMORY_MB } (pinnedMem, executorMemOverhead, executorHeap, setMaxBytesInFlight) } else { @@ -534,7 +535,7 @@ class AutoTuner( // first calculate what we think min overhead is and make sure we have enough // for that // calculate minimum heap size - val minExecHeapMem = MIN_HEAP_PER_CORE_MB * numExecutorCores + val minExecHeapMem = autoTunerConfigsProvider.MIN_HEAP_PER_CORE_MB * numExecutorCores if ((containerMem - minOverhead) < minExecHeapMem) { // For now just throw so we don't get any tunings and its obvious to user this isn't a good // setup. In the future we may just recommend them to use larger nodes. This would be more @@ -547,7 +548,8 @@ class AutoTuner( warnNotEnoughMem(minExecHeapMem + minOverhead) } // Pinned memory uses any unused space up to 4GB. Spill memory is same size as pinned. - val pinnedMem = Math.min(MAX_PINNED_MEMORY_MB, (leftOverMemUsingMinHeap / 2)).toLong + val pinnedMem = Math.min(autoTunerConfigsProvider.MAX_PINNED_MEMORY_MB, + leftOverMemUsingMinHeap / 2).toLong val spillMem = pinnedMem // spill memory is by default same size as pinned memory executorMemOverhead += pinnedMem + spillMem @@ -604,7 +606,8 @@ class AutoTuner( * else recommend "spark.kubernetes.memoryOverheadFactor" and add comment if missing */ def addRecommendationForMemoryOverhead(recomValue: String): Unit = { - if (enableMemoryOverheadRecommendation(getPropertyValue("spark.master"))) { + if (autoTunerConfigsProvider + .enableMemoryOverheadRecommendation(getPropertyValue("spark.master"))) { val memOverheadLookup = memoryOverheadLabel appendRecommendationForMemoryMB(memOverheadLookup, recomValue) getPropertyValue("spark.rapids.memory.pinnedPool.size").foreach { lookup => @@ -705,7 +708,8 @@ class AutoTuner( } else { addDefaultComments() } - appendRecommendation("spark.rapids.sql.batchSizeBytes", BATCH_SIZE_BYTES) + appendRecommendation("spark.rapids.sql.batchSizeBytes", + autoTunerConfigsProvider.BATCH_SIZE_BYTES) appendRecommendation("spark.locality.wait", 0) } @@ -716,7 +720,7 @@ class AutoTuner( case Some(smClassName) => appendRecommendation("spark.shuffle.manager", smClassName) case None => appendComment("Could not define the Spark Version") } - appendComment(classPathComments("rapids.shuffle.jars")) + appendComment(autoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")) recommendFileCache() recommendMaxPartitionBytes() recommendShufflePartitions() @@ -781,10 +785,12 @@ class AutoTuner( def configureClusterPropDefaults: Unit = { if (!clusterProps.system.isEmpty) { if (clusterProps.system.isMissingInfo) { - clusterProps.system.setMissingFields().foreach(m => appendComment(m)) + clusterProps.system.setMissingFields(autoTunerConfigsProvider) + .foreach(m => appendComment(m)) } if (clusterProps.gpu.isMissingInfo) { - clusterProps.gpu.setMissingFields(platform).foreach(m => appendComment(m)) + clusterProps.gpu.setMissingFields(platform, autoTunerConfigsProvider) + .foreach(m => appendComment(m)) } } } @@ -792,7 +798,8 @@ class AutoTuner( private def recommendGCProperty(): Unit = { val jvmGCFraction = appInfoProvider.getJvmGCFractions if (jvmGCFraction.nonEmpty) { // avoid zero division - if ((jvmGCFraction.sum / jvmGCFraction.size) > MAX_JVM_GCTIME_FRACTION) { + if ((jvmGCFraction.sum / jvmGCFraction.size) > + autoTunerConfigsProvider.MAX_JVM_GCTIME_FRACTION) { // TODO - or other cores/memory ratio appendComment("Average JVM GC time is very high. " + "Other Garbage Collectors can be used for better performance.") @@ -804,7 +811,7 @@ class AutoTuner( val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled") .getOrElse("false").toLowerCase if (aqeEnabled == "false") { - appendComment(commentsForMissingProps("spark.sql.adaptive.enabled")) + appendComment(autoTunerConfigsProvider.commentsForMissingProps("spark.sql.adaptive.enabled")) } appInfoProvider.getSparkVersion match { case Some(version) => @@ -833,7 +840,8 @@ class AutoTuner( val advisoryPartitionSizeProperty = getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes") - if (appInfoProvider.getMeanInput < AQE_INPUT_SIZE_BYTES_THRESHOLD) { + if (appInfoProvider.getMeanInput < + autoTunerConfigsProvider.AQE_INPUT_SIZE_BYTES_THRESHOLD) { if(advisoryPartitionSizeProperty.isEmpty) { // The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear // scaling until it is full and 128m makes the GPU more full, but too large can be @@ -841,15 +849,17 @@ class AutoTuner( appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") } } - if (appInfoProvider.getMeanInput > AQE_INPUT_SIZE_BYTES_THRESHOLD && - appInfoProvider.getMeanShuffleRead > AQE_SHUFFLE_READ_BYTES_THRESHOLD) { + if (appInfoProvider.getMeanInput > autoTunerConfigsProvider.AQE_INPUT_SIZE_BYTES_THRESHOLD && + appInfoProvider.getMeanShuffleRead > + autoTunerConfigsProvider.AQE_SHUFFLE_READ_BYTES_THRESHOLD) { // AQE Recommendations for large input and large shuffle reads platform.getGpuOrDefault.getAdvisoryPartitionSizeInBytes.foreach { size => appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", size) } val initialPartitionNumProperty = getPropertyValue("spark.sql.adaptive.coalescePartitions.initialPartitionNum").map(_.toInt) - if (initialPartitionNumProperty.getOrElse(0) <= AQE_MIN_INITIAL_PARTITION_NUM) { + if (initialPartitionNumProperty.getOrElse(0) <= + autoTunerConfigsProvider.AQE_MIN_INITIAL_PARTITION_NUM) { platform.getGpuOrDefault.getInitialPartitionNum.foreach { initialPartitionNum => appendRecommendation( "spark.sql.adaptive.coalescePartitions.initialPartitionNum", initialPartitionNum) @@ -867,9 +877,10 @@ class AutoTuner( if (autoBroadcastJoinThresholdProperty.isEmpty) { appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.") } else if (autoBroadcastJoinThresholdProperty.get > - StringUtils.convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) { + StringUtils.convertToMB(autoTunerConfigsProvider.AQE_AUTOBROADCAST_JOIN_THRESHOLD)) { appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " + - s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance\n" + + s"${autoTunerConfigsProvider.AQE_AUTOBROADCAST_JOIN_THRESHOLD} could " + + s"lead to performance\n" + " regression. Should be set to a lower number.") } } @@ -896,36 +907,37 @@ class AutoTuner( * 4- If there is a new release recommend that to the user */ private def recommendClassPathEntries(): Unit = { - val missingRapidsJarsEntry = classPathComments("rapids.jars.missing") - val multipleRapidsJarsEntry = classPathComments("rapids.jars.multiple") + val missingRapidsJarsEntry = autoTunerConfigsProvider.classPathComments("rapids.jars.missing") + val multipleRapidsJarsEntry = autoTunerConfigsProvider.classPathComments("rapids.jars.multiple") appInfoProvider.getRapidsJars match { case Seq() => // No rapids jars appendComment(missingRapidsJarsEntry) case s: Seq[String] => - s.flatMap(e => pluginJarRegEx.findAllMatchIn(e).map(_.group(1))) match { - case Seq() => appendComment(missingRapidsJarsEntry) - case v: Seq[String] if v.length > 1 => - val comment = s"$multipleRapidsJarsEntry [${v.mkString(", ")}]" - appendComment(comment) - case Seq(jarVer) => - // compare jarVersion to the latest release - val latestPluginVersion = WebCrawlerUtil.getLatestPluginRelease - latestPluginVersion match { - case Some(ver) => - if (ToolUtils.compareVersions(jarVer, ver) < 0) { - val jarURL = WebCrawlerUtil.getPluginMvnDownloadLink(ver) + s.flatMap(e => + autoTunerConfigsProvider.pluginJarRegEx.findAllMatchIn(e).map(_.group(1))) match { + case Seq() => appendComment(missingRapidsJarsEntry) + case v: Seq[String] if v.length > 1 => + val comment = s"$multipleRapidsJarsEntry [${v.mkString(", ")}]" + appendComment(comment) + case Seq(jarVer) => + // compare jarVersion to the latest release + val latestPluginVersion = WebCrawlerUtil.getLatestPluginRelease + latestPluginVersion match { + case Some(ver) => + if (ToolUtils.compareVersions(jarVer, ver) < 0) { + val jarURL = WebCrawlerUtil.getPluginMvnDownloadLink(ver) + appendComment( + "A newer RAPIDS Accelerator for Apache Spark plugin is available:\n" + + s" $jarURL\n" + + s" Version used in application is $jarVer.") + } + case None => + logError("Could not pull the latest release of RAPIDS-plugin jar.") + val pluginRepoUrl = WebCrawlerUtil.getMVNArtifactURL("rapids.plugin") appendComment( - "A newer RAPIDS Accelerator for Apache Spark plugin is available:\n" + - s" $jarURL\n" + - s" Version used in application is $jarVer.") - } - case None => - logError("Could not pull the latest release of RAPIDS-plugin jar.") - val pluginRepoUrl = WebCrawlerUtil.getMVNArtifactURL("rapids.plugin") - appendComment( - "Failed to validate the latest release of Apache Spark plugin.\n" + + "Failed to validate the latest release of Apache Spark plugin.\n" + s" Verify that the version used in application ($jarVer) is the latest on:\n" + s" $pluginRepoUrl") @@ -956,19 +968,19 @@ class AutoTuner( maxPartitionBytesNum.toString } else { if (inputBytesMax > 0 && - inputBytesMax < MIN_PARTITION_BYTES_RANGE_MB) { + inputBytesMax < autoTunerConfigsProvider.MIN_PARTITION_BYTES_RANGE_MB) { // Increase partition size val calculatedMaxPartitionBytes = Math.min( maxPartitionBytesNum * - (MIN_PARTITION_BYTES_RANGE_MB / inputBytesMax), - MAX_PARTITION_BYTES_BOUND_MB) + (autoTunerConfigsProvider.MIN_PARTITION_BYTES_RANGE_MB / inputBytesMax), + autoTunerConfigsProvider.MAX_PARTITION_BYTES_BOUND_MB) calculatedMaxPartitionBytes.toLong.toString - } else if (inputBytesMax > MAX_PARTITION_BYTES_RANGE_MB) { + } else if (inputBytesMax > autoTunerConfigsProvider.MAX_PARTITION_BYTES_RANGE_MB) { // Decrease partition size val calculatedMaxPartitionBytes = Math.min( maxPartitionBytesNum / - (inputBytesMax / MAX_PARTITION_BYTES_RANGE_MB), - MAX_PARTITION_BYTES_BOUND_MB) + (inputBytesMax / autoTunerConfigsProvider.MAX_PARTITION_BYTES_RANGE_MB), + autoTunerConfigsProvider.MAX_PARTITION_BYTES_BOUND_MB) calculatedMaxPartitionBytes.toLong.toString } else { // Do not recommend maxPartitionBytes @@ -981,8 +993,10 @@ class AutoTuner( * Recommendation for 'spark.rapids.file.cache' based on read characteristics of job. */ private def recommendFileCache() { - if (appInfoProvider.getDistinctLocationPct < DEF_DISTINCT_READ_THRESHOLD - && appInfoProvider.getRedundantReadSize > DEF_READ_SIZE_THRESHOLD) { + if (appInfoProvider.getDistinctLocationPct < + autoTunerConfigsProvider.DEF_DISTINCT_READ_THRESHOLD && + appInfoProvider.getRedundantReadSize > + autoTunerConfigsProvider.DEF_READ_SIZE_THRESHOLD) { appendRecommendation("spark.rapids.filecache.enabled", "true") appendComment("Enable file cache only if Spark local disks bandwidth is > 1 GB/s" + " and you have sufficient disk space available to fit both cache and normal Spark" + @@ -997,7 +1011,8 @@ class AutoTuner( */ private def recommendMaxPartitionBytes(): Unit = { val maxPartitionProp = - getPropertyValue("spark.sql.files.maxPartitionBytes").getOrElse(MAX_PARTITION_BYTES) + getPropertyValue("spark.sql.files.maxPartitionBytes") + .getOrElse(autoTunerConfigsProvider.MAX_PARTITION_BYTES) val recommended = if (isCalculationEnabled("spark.sql.files.maxPartitionBytes")) { calculateMaxPartitionBytes(maxPartitionProp) @@ -1015,7 +1030,7 @@ class AutoTuner( def recommendShufflePartitions(): Unit = { val lookup = "spark.sql.shuffle.partitions" var shufflePartitions = - getPropertyValue(lookup).getOrElse(DEF_SHUFFLE_PARTITIONS).toInt + getPropertyValue(lookup).getOrElse(autoTunerConfigsProvider.DEF_SHUFFLE_PARTITIONS).toInt val shuffleStagesWithPosSpilling = appInfoProvider.getShuffleStagesWithPosSpilling // TODO: Need to look at other metrics for GPU spills (DEBUG mode), and batch sizes metric @@ -1028,7 +1043,7 @@ class AutoTuner( s" stages with spilling. Increasing shuffle partitions is not recommended in this\n" + s" case since keys will still hash to the same task.") } else { - shufflePartitions *= DEF_SHUFFLE_PARTITION_MULTIPLIER + shufflePartitions *= autoTunerConfigsProvider.DEF_SHUFFLE_PARTITION_MULTIPLIER // Could be memory instead of partitions appendOptionalComment(lookup, s"'$lookup' should be increased since spilling occurred in shuffle stages.") @@ -1050,10 +1065,10 @@ class AutoTuner( private def recommendFromDriverLogs(): Unit = { // Iterate through unsupported operators' reasons and check for matching properties driverInfoProvider.getUnsupportedOperators.map(_.reason).foreach { operatorReason => - recommendationsFromDriverLogs.collect { + autoTunerConfigsProvider.recommendationsFromDriverLogs.collect { case (config, recommendedValue) if operatorReason.contains(config) => appendRecommendation(config, recommendedValue) - appendComment(commentForExperimentalConfig(config)) + appendComment(autoTunerConfigsProvider.commentForExperimentalConfig(config)) } } } @@ -1098,7 +1113,7 @@ class AutoTuner( private def addDefaultComments(): Unit = { appendComment("Could not infer the cluster configuration, recommendations " + "are generated using default values!") - commentsForMissingProps.foreach { + autoTunerConfigsProvider.commentsForMissingProps.foreach { case (key, value) => if (!skippedRecommendations.contains(key)) { appendComment(value) @@ -1107,7 +1122,7 @@ class AutoTuner( } private def addMissingMemoryComments(): Unit = { - commentsForMissingMemoryProps.foreach { + autoTunerConfigsProvider.commentsForMissingMemoryProps.foreach { case (key, value) => if (!skippedRecommendations.contains(key)) { appendComment(value) @@ -1188,7 +1203,7 @@ class AutoTuner( // - make sure that we exclude the skipped list private def processPropKeys( srcMap: collection.Map[String, String]): collection.Map[String, String] = { - (srcMap -- skippedRecommendations) -- filteredPropKeys + (srcMap -- skippedRecommendations) -- autoTunerConfigsProvider.filteredPropKeys } // Combines the original Spark properties with the recommended ones. @@ -1206,7 +1221,10 @@ class AutoTuner( } } -object AutoTuner extends Logging { +/** + * Trait defining configuration defaults and parameters for the AutoTuner. + */ +trait AutoTunerConfigsProvider extends Logging { // Maximum number of concurrent tasks to run on the GPU val MAX_CONC_GPU_TASKS = 4L // Amount of CPU memory to reserve for system overhead (kernel, buffers, etc.) in megabytes @@ -1251,13 +1269,13 @@ object AutoTuner extends Logging { private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" + "additional-functionality/advanced_configs.html#advanced-configuration" // Value of batchSizeBytes that performs best overall - private val BATCH_SIZE_BYTES = 2147483647 - private val AQE_INPUT_SIZE_BYTES_THRESHOLD = 35000 - private val AQE_SHUFFLE_READ_BYTES_THRESHOLD = 50000 - private val AQE_MIN_INITIAL_PARTITION_NUM = 200 - private val AQE_AUTOBROADCAST_JOIN_THRESHOLD = "100m" + val BATCH_SIZE_BYTES = 2147483647 + val AQE_INPUT_SIZE_BYTES_THRESHOLD = 35000 + val AQE_SHUFFLE_READ_BYTES_THRESHOLD = 50000 + val AQE_MIN_INITIAL_PARTITION_NUM = 200 + val AQE_AUTOBROADCAST_JOIN_THRESHOLD = "100m" // Set of spark properties to be filtered out from the combined Spark properties. - private val filteredPropKeys: Set[String] = Set( + val filteredPropKeys: Set[String] = Set( "spark.app.id" ) @@ -1313,7 +1331,7 @@ object AutoTuner extends Logging { ) // Recommended values for specific unsupported configurations - private val recommendationsFromDriverLogs: Map[String, String] = Map( + val recommendationsFromDriverLogs: Map[String, String] = Map( "spark.rapids.sql.incompatibleDateFormats.enabled" -> "true" ) @@ -1325,13 +1343,23 @@ object AutoTuner extends Logging { // the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r - private def handleException( + /** + * Abstract method to create an instance of the AutoTuner. + */ + def createAutoTunerInstance( + clusterProps: ClusterProperties, + appInfoProvider: AppSummaryInfoBaseProvider, + platform: Platform, + driverInfoProvider: DriverLogInfoProvider): AutoTuner + + def handleException( ex: Throwable, appInfo: AppSummaryInfoBaseProvider, platform: Platform, driverInfoProvider: DriverLogInfoProvider): AutoTuner = { logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")")) - val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform, driverInfoProvider) + val tuning = createAutoTunerInstance(new ClusterProperties(), appInfo, + platform, driverInfoProvider) val msg = ex match { case cEx: ConstructorException => cEx.getContext case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString @@ -1394,8 +1422,8 @@ object AutoTuner extends Logging { ): AutoTuner = { try { val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps) - new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform, - driverInfoProvider) + createAutoTunerInstance(clusterPropsOpt.getOrElse(new ClusterProperties()), + singleAppProvider, platform, driverInfoProvider) } catch { case NonFatal(e) => handleException(e, singleAppProvider, platform, driverInfoProvider) @@ -1408,7 +1436,8 @@ object AutoTuner extends Logging { driverInfoProvider: DriverLogInfoProvider = BaseDriverLogInfoProvider.noneDriverLog ): AutoTuner = { try { - val autoT = new AutoTuner(platform.clusterProperties.getOrElse(new ClusterProperties()), + val autoT = createAutoTunerInstance( + platform.clusterProperties.getOrElse(new ClusterProperties()), singleAppProvider, platform, driverInfoProvider) autoT } catch { @@ -1431,3 +1460,18 @@ object AutoTuner extends Logging { } } } + +/** + * Provides configuration settings for the Profiling Tool's AutoTuner. This object is as a concrete + * implementation of the `AutoTunerConfigsProvider` interface, + */ +object ProfilingAutoTunerConfigsProvider extends AutoTunerConfigsProvider { + def createAutoTunerInstance( + clusterProps: ClusterProperties, + appInfoProvider: AppSummaryInfoBaseProvider, + platform: Platform, + driverInfoProvider: DriverLogInfoProvider): AutoTuner = { + new AutoTuner(clusterProps, appInfoProvider, platform, driverInfoProvider, + ProfilingAutoTunerConfigsProvider) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala index ed4fbb27d..1bd873b49 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala @@ -117,7 +117,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* descr = "File path containing the system information of a worker node. It is assumed " + "that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " + "./worker_info.yaml", - default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH)) + default = Some(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH)) validate(filterCriteria) { case crit if (crit.endsWith("-newest-filesystem") || diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 75b4c4590..206c49613 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase} -import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps import com.nvidia.spark.rapids.tool.views._ import org.apache.hadoop.conf.Configuration @@ -419,9 +418,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea // assumptions made in the code if (appInfo.isDefined && appInfo.get.appInfo.head.pluginEnabled) { val appInfoProvider = AppSummaryInfoBaseProvider.fromAppInfo(appInfo) - val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH) - val clusterPropsOpt = loadClusterProps(workerInfoPath) - val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider, + val workerInfoPath = appArgs.workerInfo + .getOrElse(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider.loadClusterProps(workerInfoPath) + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider.buildAutoTuner(appInfoProvider, PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt), driverInfoProvider) // The autotuner allows skipping some properties, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index fa59e8b3e..dec4dab23 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import com.nvidia.spark.rapids.tool.{EventLogInfo, FailedEventLog, PlatformFactory, ToolBase} -import com.nvidia.spark.rapids.tool.profiling.AutoTuner import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY import com.nvidia.spark.rapids.tool.tuning.TunerContext import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator @@ -147,7 +146,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration, // we need a platform per application because it's storing cluster information which could // vary between applications, especially when using dynamic allocation val platform = { - val clusterPropsOpt = AutoTuner.loadClusterProps(workerInfoPath) + val clusterPropsOpt = QualificationAutoTunerConfigsProvider.loadClusterProps(workerInfoPath) PlatformFactory.createInstance(platformArg, clusterPropsOpt) } val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index ce322a0dc..2329ffa23 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -16,7 +16,6 @@ package com.nvidia.spark.rapids.tool.qualification import com.nvidia.spark.rapids.tool.PlatformNames -import com.nvidia.spark.rapids.tool.profiling.AutoTuner import org.rogach.scallop.{ScallopConf, ScallopOption} import org.rogach.scallop.exceptions.ScallopException @@ -195,7 +194,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* descr = "File path containing the system information of a worker node. It is assumed " + "that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " + "./worker_info.yaml", - default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH)) + default = Some(QualificationAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH)) val clusterReport: ScallopOption[Boolean] = toggle("cluster-report", default = Some(true), diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTuner.scala new file mode 100644 index 000000000..7c9a118a1 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTuner.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.qualification + +import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform} +import com.nvidia.spark.rapids.tool.profiling.{AutoTuner, AutoTunerConfigsProvider, ClusterProperties, DriverLogInfoProvider} + + +/** + * Implementation of the `AutoTuner` designed the Qualification Tool. This class can be used to + * implement the logic to recommend AutoTuner configurations by the Qualification Tool. + */ +class QualificationAutoTuner( + clusterProps: ClusterProperties, + appInfoProvider: AppSummaryInfoBaseProvider, + platform: Platform, + driverInfoProvider: DriverLogInfoProvider) + extends AutoTuner(clusterProps, appInfoProvider, platform, driverInfoProvider, + QualificationAutoTunerConfigsProvider) + +/** + * Provides configuration settings for the Qualification Tool's AutoTuner + */ +object QualificationAutoTunerConfigsProvider extends AutoTunerConfigsProvider { + + // For qualification tool's auto-tuner, the batch size to be recommended is 1GB + // See https://github.com/NVIDIA/spark-rapids-tools/issues/1399 + override val BATCH_SIZE_BYTES = 1073741824 + + override def createAutoTunerInstance( + clusterProps: ClusterProperties, + appInfoProvider: AppSummaryInfoBaseProvider, + platform: Platform, + driverInfoProvider: DriverLogInfoProvider): AutoTuner = { + new QualificationAutoTuner(clusterProps, appInfoProvider, platform, driverInfoProvider) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 743f95de8..48bd26d22 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.tool.qualification import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory} -import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps import com.nvidia.spark.rapids.tool.tuning.TunerContext import org.apache.spark.internal.Logging @@ -75,7 +74,8 @@ object QualificationMain extends Logging { // This platform instance should not be used for anything other then referencing the // files for this particular Platform. val referencePlatform = try { - val clusterPropsOpt = loadClusterProps(appArgs.workerInfo()) + val clusterPropsOpt = + QualificationAutoTunerConfigsProvider.loadClusterProps(appArgs.workerInfo()) PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt) } catch { case NonFatal(e) => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/QualificationAutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/QualificationAutoTunerRunner.scala similarity index 89% rename from core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/QualificationAutoTuner.scala rename to core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/QualificationAutoTunerRunner.scala index 4344f0a5d..b39a4c216 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/QualificationAutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/QualificationAutoTunerRunner.scala @@ -21,17 +21,18 @@ import scala.util.{Failure, Success, Try} import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform, ToolTextFileWriter} import com.nvidia.spark.rapids.tool.analysis.AggRawMetricsResult import com.nvidia.spark.rapids.tool.profiling.{AutoTuner, DataSourceProfileResult, Profiler} +import com.nvidia.spark.rapids.tool.qualification.QualificationAutoTunerConfigsProvider import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo} /** - * Implementation of the AutoTuner for Qualification. + * A wrapper class to run the AutoTuner for Qualification Tool. * @param appInfoProvider Provider of the qualification analysis data * @param tunerContext Container which holds the arguments passed to the AutoTuner execution */ -class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider, +class QualificationAutoTunerRunner(val appInfoProvider: QualAppSummaryInfoProvider, val tunerContext: TunerContext) { // When enabled, the profiler recommendations should only include updated settings. @@ -65,7 +66,8 @@ class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider, } def runAutoTuner(platform: Platform): TuningResult = { - val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider, platform) + val autoTuner: AutoTuner = + QualificationAutoTunerConfigsProvider.buildAutoTuner(appInfoProvider, platform) val (recommendations, comments) = autoTuner.getRecommendedProperties(showOnlyUpdatedProps = filterByUpdatedPropsEnabled) // Combine the GPU recommendations with all others. @@ -82,17 +84,17 @@ class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider, } } -object QualificationAutoTuner extends Logging { +object QualificationAutoTunerRunner extends Logging { def apply(appInfo: QualificationAppInfo, appAggStats: Option[QualificationSummaryInfo], tunerContext: TunerContext, rawAggMetrics: AggRawMetricsResult, - dsInfo: Seq[DataSourceProfileResult]): Option[QualificationAutoTuner] = { + dsInfo: Seq[DataSourceProfileResult]): Option[QualificationAutoTunerRunner] = { Try { val qualInfoProvider: QualAppSummaryInfoProvider = AppSummaryInfoBaseProvider.fromQualAppInfo(appInfo, appAggStats, rawAggMetrics, dsInfo) .asInstanceOf[QualAppSummaryInfoProvider] - new QualificationAutoTuner(qualInfoProvider, tunerContext) + new QualificationAutoTunerRunner(qualInfoProvider, tunerContext) } match { case Success(q) => Some(q) case Failure(e) => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/TunerContext.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/TunerContext.scala index b9b23ec07..bbf7fb9b6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/TunerContext.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/TunerContext.scala @@ -56,7 +56,7 @@ case class TunerContext ( val sqlAnalyzer = AppSQLPlanAnalyzer(appInfo, appIndex) val rawAggMetrics = QualSparkMetricsAnalyzer.getAggRawMetrics(appInfo, appIndex, Some(sqlAnalyzer)) - QualificationAutoTuner(appInfo, appAggStats, this, rawAggMetrics, dsInfo).collect { + QualificationAutoTunerRunner(appInfo, appAggStats, this, rawAggMetrics, dsInfo).collect { case qualTuner => Try { qualTuner.runAutoTuner(platform) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index d61440195..191a670c9 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -23,7 +23,6 @@ import scala.collection.mutable import com.nvidia.spark.rapids.tool.{A100Gpu, AppSummaryInfoBaseProvider, GpuDevice, PlatformFactory, PlatformNames, T4Gpu} import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper -import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterPropertiesFromContent import org.scalatest.{BeforeAndAfterEach, FunSuite} import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.prop.TableFor4 @@ -66,7 +65,7 @@ class AppInfoProviderMockTest(val maxInput: Double, override def getShuffleSkewStages: Set[Long] = shuffleSkewStages } -class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { +abstract class BaseAutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { val defaultSparkVersion = "3.1.1" @@ -91,14 +90,14 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { ) } - private def buildWorkerInfoAsString( + protected final def buildWorkerInfoAsString( customProps: Option[mutable.Map[String, String]] = None, numCores: Option[Int] = Some(32), systemMemory: Option[String] = Some("122880MiB"), numWorkers: Option[Int] = Some(4), - gpuCount: Option[Int] = Some(2), - gpuMemory: Option[String] = Some(GpuDevice.DEFAULT.getMemory), - gpuDevice: Option[String] = Some(GpuDevice.DEFAULT.toString)): String = { + gpuCount: Option[Int] = None, + gpuMemory: Option[String] = None, + gpuDevice: Option[String] = None): String = { val gpuWorkerProps = new GpuWorkerProps( gpuMemory.getOrElse(""), gpuCount.getOrElse(0), gpuDevice.getOrElse("")) val cpuSystem = new SystemClusterProps( @@ -120,21 +119,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { rawString.split("\n").drop(1).mkString("\n") } - private def getGpuAppMockInfoProvider: AppSummaryInfoBaseProvider = { - getMockInfoProvider(0, Seq(0), Seq(0.0), - mutable.Map("spark.rapids.sql.enabled" -> "true", - "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), - Some(defaultSparkVersion), Seq()) - } - - private def getGpuAppMockInfoWithJars(rapidsJars: Seq[String]): AppSummaryInfoBaseProvider = { - getMockInfoProvider(0, Seq(0), Seq(0.0), - mutable.Map("spark.rapids.sql.enabled" -> "true", - "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), - Some(defaultSparkVersion), rapidsJars) - } - - private def getMockInfoProvider(maxInput: Double, + protected def getMockInfoProvider(maxInput: Double, spilledMetrics: Seq[Long], jvmGCFractions: Seq[Double], propsFromLog: mutable.Map[String, String], @@ -150,16 +135,53 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { sparkVersion, rapidsJars, distinctLocationPct, redundantReadSize, meanInput, meanShuffleRead, shuffleStagesWithPosSpilling, shuffleSkewStages) } +} + +/** + * Suite to test the Qualification Tool's AutoTuner + */ +class ProfilerAutoTunerSuite extends BaseAutoTunerSuite { + + /** + * Helper method to build a worker info string with GPU properties + */ + protected def buildGpuWorkerInfoAsString( + customProps: Option[mutable.Map[String, String]] = None, + numCores: Option[Int] = Some(32), + systemMemory: Option[String] = Some("122880MiB"), + numWorkers: Option[Int] = Some(4), + gpuCount: Option[Int] = Some(2), + gpuMemory: Option[String] = Some(GpuDevice.DEFAULT.getMemory), + gpuDevice: Option[String] = Some(GpuDevice.DEFAULT.toString)): String = { + buildWorkerInfoAsString(customProps, numCores, systemMemory, numWorkers, + gpuCount, gpuMemory,gpuDevice) + } + + private def getGpuAppMockInfoProvider: AppSummaryInfoBaseProvider = { + getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), + Some(defaultSparkVersion), Seq()) + } + + private def getGpuAppMockInfoWithJars(rapidsJars: Seq[String]): AppSummaryInfoBaseProvider = { + getMockInfoProvider(0, Seq(0), Seq(0.0), + mutable.Map("spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), + Some(defaultSparkVersion), rapidsJars) + } test("verify 3.2.0+ auto conf setting") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), Some("122880MiB"), Some(0)) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("122880MiB"), Some(0)) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), Some("3.2.0"), Seq()) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) val expectedResults = @@ -223,7 +245,8 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { test("Load non-existing cluster properties") { val platform = PlatformFactory.createInstance(clusterProperties = None) - val autoTuner = AutoTuner.buildAutoTuner(getGpuAppMockInfoProvider, platform) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTuner(getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) // scalastyle:off line.size.limit @@ -263,8 +286,9 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { } test("Load cluster properties with CPU cores 0") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(0)) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(0)) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) // scalastyle:off line.size.limit @@ -304,10 +328,12 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { } test("Load cluster properties with memory to cores ratio to small") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(8), Some("14000MiB")) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(8), Some("14000MiB")) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -321,8 +347,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." } test("Load cluster properties with CPU memory missing") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), None) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), None) + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) // scalastyle:off line.size.limit @@ -362,10 +389,12 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." } test("Load cluster properties with CPU memory 0") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), Some("0m")) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("0m")) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -424,10 +453,12 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." } test("Load cluster properties with number of workers 0") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), Some("122880MiB"), Some(0)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("122880MiB"), Some(0)) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -477,8 +508,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. |- Number of workers is missing. Setting default to 1. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } @@ -494,11 +525,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps), Some(32), Some("122880MiB"), Some(4), Some(0)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -536,8 +569,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU count is missing. Setting default to 1. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } @@ -557,11 +590,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = - buildWorkerInfoAsString(Some(sparkProps), Some(32), Some("122880MiB"), Some(4), Some(2), None) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps), Some(32), + Some("122880MiB"), Some(4), Some(2), None) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -591,8 +626,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU memory is missing. Setting default to 15109m. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } @@ -614,11 +649,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "64m", "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps), Some(32), Some("122880MiB"), Some(4), Some(2), Some("0M")) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -644,8 +681,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- GPU memory is missing. Setting default to 15109m. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } @@ -664,11 +701,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps), Some(32), Some("122880MiB"), Some(4), Some(2), Some("0MiB"), None) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -699,8 +738,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- GPU device is missing. Setting default to $T4Gpu. |- GPU memory is missing. Setting default to ${T4Gpu.getMemory}. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } @@ -720,11 +759,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps), Some(32), Some("122880MiB"), Some(4), Some(2), Some("0MiB"), Some("GPU-X")) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -755,18 +796,20 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- GPU device is missing. Setting default to $T4Gpu. |- GPU memory is missing. Setting default to ${T4Gpu.getMemory}. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } test("Test executor memory on CSP where executor/cpu ratio is small") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(8), Some("15360MiB"), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(8), Some("15360MiB"), Some(4), Some(1)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -809,8 +852,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.files.maxPartitionBytes' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin assert(expectedResults == autoTunerOutput) } @@ -829,7 +872,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps)) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps)) val expectedResults = s"""| |Spark Properties: @@ -855,12 +898,14 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -884,7 +929,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps)) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps)) val expectedResults = s"""| |Spark Properties: @@ -910,12 +955,14 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -923,7 +970,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." } test("test AutoTuner with empty sparkProperties") { - val dataprocWorkerInfo = buildWorkerInfoAsString(None) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None) val expectedResults = s"""| |Spark Properties: @@ -969,12 +1016,14 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.files.maxPartitionBytes' was not set. |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1011,13 +1060,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "4") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1048,8 +1099,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |- file.encoding should be [UTF-8] because GPU only supports the charset when using some expressions. |""".stripMargin // scalastyle:on line.size.limit @@ -1084,13 +1135,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "4") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1121,8 +1174,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1148,13 +1201,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.plugins" -> "com.nvidia.spark.WrongPlugin0, com.nvidia.spark.WrongPlugin1", "spark.rapids.memory.pinnedPool.size" -> "5g", "spark.rapids.sql.concurrentGpuTasks" -> "4") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion)) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1191,8 +1246,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1221,13 +1276,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "false", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "4") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1264,8 +1321,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1299,11 +1356,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "1") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getMockInfoProvider(3.7449728E7, Seq(0, 0), Seq(0.01, 0.0), logEventsProps, Some(defaultSparkVersion)), platform) val (properties, comments) = autoTuner.getRecommendedProperties() @@ -1333,8 +1392,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(autoTunerOutput == expectedResults) @@ -1370,13 +1429,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "1") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(0, 0), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1406,8 +1467,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1437,13 +1498,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "1") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(0, 0), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1473,8 +1536,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1494,10 +1557,12 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.sql.files.maxPartitionBytes" -> "512m", "spark.task.resource.gpu.amount" -> "0.0625") val sparkProps = defaultDataprocProps.++(customProps) - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(sparkProps)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(sparkProps)) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, getGpuAppMockInfoWithJars(rapidsJars), platform) val (properties, comments) = autoTuner.getRecommendedProperties() Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1506,6 +1571,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." test("Multiple RAPIDS jars trigger a comment") { // 1. The Autotuner should warn the users that they have multiple jars defined in the classPath // 2. Compare the output + // scalastyle:off line.size.limit val expectedResults = s"""| |Spark Properties: @@ -1531,9 +1597,10 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- ${AutoTuner.classPathComments("rapids.jars.multiple")} [23.06.0, 23.02.1] - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.multiple")} [23.06.0, 23.02.1] + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin + // scalastyle:on line.size.limit val rapidsJarsArr = Seq("rapids-4-spark_2.12-23.06.0-SNAPSHOT.jar", "rapids-4-spark_2.12-23.02.1.jar") val autoTunerOutput = generateRecommendationsForRapidsJars(rapidsJarsArr) @@ -1579,7 +1646,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- A newer RAPIDS Accelerator for Apache Spark plugin is available: | $pluginJarMvnURl | Version used in application is $jarVer. - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin val rapidsJarsArr = Seq(s"rapids-4-spark_2.12-$jarVer.jar") val autoTunerOutput = generateRecommendationsForRapidsJars(rapidsJarsArr) @@ -1619,7 +1686,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin val rapidsJarsArr = Seq(s"rapids-4-spark_2.12-$latestRelease.jar") val autoTunerOutput = generateRecommendationsForRapidsJars(rapidsJarsArr) @@ -1649,13 +1716,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "1") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(0, 0), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion), Seq(), 40.0, 200000000000L) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1688,8 +1757,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. |- Enable file cache only if Spark local disks bandwidth is > 1 GB/s and you have sufficient disk space available to fit both cache and normal Spark temporary data. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1718,13 +1787,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "1") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(0, 0), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion), Seq(), 40.0, 2000000L) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1754,18 +1825,20 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) } test("test recommendations for databricks-aws platform argument") { - val databricksWorkerInfo = buildWorkerInfoAsString() - val clusterPropsOpt = loadClusterPropertiesFromContent(databricksWorkerInfo) + val databricksWorkerInfo = buildGpuWorkerInfoAsString() + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(databricksWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATABRICKS_AWS, clusterPropsOpt) - val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, getGpuAppMockInfoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() @@ -1806,13 +1879,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "4") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1840,8 +1915,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1862,8 +1937,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "to force onto GPU.") ) val driverInfoProvider = DriverInfoProviderMockTest(unsupportedDriverOperators) - val workerInfo = buildWorkerInfoAsString(Some(customProps)) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, + val workerInfo = buildGpuWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(workerInfo, AppSummaryInfoBaseProvider.fromAppInfo(None), PlatformFactory.createInstance(), driverInfoProvider) val (properties, comments) = autoTuner.getRecommendedProperties() @@ -1876,7 +1952,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." | |Comments: |- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set. - |- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")} + |- ${ProfilingAutoTunerConfigsProvider.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1897,8 +1973,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "to force onto GPU.") ) val driverInfoProvider = DriverInfoProviderMockTest(unsupportedDriverOperators) - val workerInfo = buildWorkerInfoAsString(Some(customProps)) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, + val workerInfo = buildGpuWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(workerInfo, getGpuAppMockInfoProvider, PlatformFactory.createInstance(), driverInfoProvider) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -1926,9 +2003,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)). |- Could not infer the cluster configuration, recommendations are generated using default values! - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} - |- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -1940,8 +2017,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.executor.memory" -> "47222m", "spark.rapids.sql.concurrentGpuTasks" -> "2", "spark.task.resource.gpu.amount" -> "0.0625") - val workerInfo = buildWorkerInfoAsString(Some(customProps)) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, + val workerInfo = buildGpuWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(workerInfo, AppSummaryInfoBaseProvider.fromAppInfo(None), PlatformFactory.createInstance()) val (properties, comments) = autoTuner.getRecommendedProperties() @@ -1967,8 +2045,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "expression Literal 1700518632630000 produces an unsupported type TimestampType") ) val driverInfoProvider = DriverInfoProviderMockTest(unsupportedDriverOperators) - val workerInfo = buildWorkerInfoAsString(Some(customProps)) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, + val workerInfo = buildGpuWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(workerInfo, AppSummaryInfoBaseProvider.fromAppInfo(None), PlatformFactory.createInstance(), driverInfoProvider) val (properties, comments) = autoTuner.getRecommendedProperties() @@ -2005,13 +2084,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.concurrentGpuTasks" -> "4", "spark.sql.adaptive.enabled" -> "true", "spark.sql.adaptive.autoBroadcastJoinThreshold" -> "500mb") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2043,9 +2124,9 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.shuffle.manager' was not set. |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} |- Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > 100m could lead to performance\n regression. Should be set to a lower number. - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -2076,13 +2157,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "4") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(gpuDevice.getMemory), Some(gpuDevice.toString)) val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, Some(defaultSparkVersion), meanInput = inputSize, meanShuffleRead = shuffleRead) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2141,13 +2224,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.SQLPlugin", "spark.rapids.sql.concurrentGpuTasks" -> "1") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(0, 0), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2178,22 +2263,23 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) } test("test shuffle manager version for databricks") { - val databricksWorkerInfo = buildWorkerInfoAsString(None) + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin", DatabricksParseHelper.PROP_TAG_CLUSTER_SPARK_VERSION_KEY -> "11.3.x-gpu-ml-scala2.12"), Some("3.3.0"), Seq()) // Do not set the platform as DB to see if it can work correctly irrespective - val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, infoProvider, PlatformFactory.createInstance()) val smVersion = autoTuner.getShuffleManagerClassName() // Assert shuffle manager string for DB 11.3 tag @@ -2201,12 +2287,13 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." } test("test shuffle manager version for non-databricks") { - val databricksWorkerInfo = buildWorkerInfoAsString(None) + val databricksWorkerInfo = buildGpuWorkerInfoAsString(None) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), mutable.Map("spark.rapids.sql.enabled" -> "true", "spark.plugins" -> "com.nvidia.spark.AnotherPlugin, com.nvidia.spark.SQLPlugin"), Some("3.3.0"), Seq()) - val autoTuner = AutoTuner.buildAutoTunerFromProps(databricksWorkerInfo, + val autoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(databricksWorkerInfo, infoProvider, PlatformFactory.createInstance()) val smVersion = autoTuner.getShuffleManagerClassName() assert(smVersion.get == "com.nvidia.spark.rapids.spark330.RapidsShuffleManager") @@ -2235,13 +2322,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.shuffle.multiThreaded.writer.threads" -> "8", "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1", "spark.shuffle.manager" -> "com.nvidia.spark.rapids.spark311.RapidsShuffleManager") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(1000L, 1000L), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion), shuffleStagesWithPosSpilling = Set(1)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2275,8 +2364,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' should be increased since spilling occurred in shuffle stages. |- 'spark.sql.shuffle.partitions' was not set. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -2305,14 +2394,16 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.rapids.shuffle.multiThreaded.writer.threads" -> "8", "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1", "spark.shuffle.manager" -> "com.nvidia.spark.rapids.spark311.RapidsShuffleManager") - val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(Some(customProps), Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(3.7449728E7, Seq(1000L, 1000L), Seq(0.4, 0.4), logEventsProps, Some(defaultSparkVersion), shuffleStagesWithPosSpilling = Set(1, 5), shuffleSkewStages = Set(1)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2345,11 +2436,11 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.shuffle.partitions' was not set. |- Average JVM GC time is very high. Other Garbage Collectors can be used for better performance. - |- ${AutoTuner.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} |- Shuffle skew exists (when task's Shuffle Read Size > 3 * Avg Stage-level size) in | stages with spilling. Increasing shuffle partitions is not recommended in this | case since keys will still hash to the same task. - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -2366,13 +2457,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.executor.instances" -> "1", "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer" ) - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2416,8 +2509,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -2435,13 +2528,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", "spark.kryo.registrator" -> "org.apache.SomeRegistrator,org.apache.SomeOtherRegistrator" ) - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2484,8 +2579,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -2503,13 +2598,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", "spark.kryo.registrator" -> "" ) - val dataprocWorkerInfo = buildWorkerInfoAsString(None, Some(32), + val dataprocWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), logEventsProps, Some(defaultSparkVersion)) - val clusterPropsOpt = loadClusterPropertiesFromContent(dataprocWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(dataprocWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.DATAPROC, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2552,8 +2649,8 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html - |- ${AutoTuner.classPathComments("rapids.jars.missing")} - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) @@ -2569,13 +2666,15 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." "spark.executor.resource.gpu.amount" -> "1", "spark.executor.instances" -> "1" ) - val emrWorkerInfo = buildWorkerInfoAsString(None, Some(32), + val emrWorkerInfo = buildGpuWorkerInfoAsString(None, Some(32), Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString)) val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), logEventsProps, Some("3.4.1-amzn-1")) - val clusterPropsOpt = loadClusterPropertiesFromContent(emrWorkerInfo) + val clusterPropsOpt = ProfilingAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(emrWorkerInfo) val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt) - val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(emrWorkerInfo, infoProvider, + val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider + .buildAutoTunerFromProps(emrWorkerInfo, infoProvider, platform) val (properties, comments) = autoTuner.getRecommendedProperties() val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) @@ -2620,7 +2719,7 @@ We recommend using nodes/workers with more memory. Need at least 7796MB memory." |- 'spark.sql.shuffle.partitions' was not set. |- 'spark.task.resource.gpu.amount' was not set. |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html - |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")} |""".stripMargin // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTunerSuite.scala new file mode 100644 index 000000000..7ccab0c96 --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationAutoTunerSuite.scala @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.qualification + +import scala.collection.mutable + +import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames} +import com.nvidia.spark.rapids.tool.profiling.{BaseAutoTunerSuite, Profiler} + +/** + * Suite to test the Qualification Tool's AutoTuner + */ +class QualificationAutoTunerSuite extends BaseAutoTunerSuite { + + /** + * Helper method to build a worker info string with CPU properties + */ + protected def buildCpuWorkerInfoAsString( + customProps: Option[mutable.Map[String, String]] = None, + numCores: Option[Int] = Some(32), + systemMemory: Option[String] = Some("122880MiB"), + numWorkers: Option[Int] = Some(4)): String = { + buildWorkerInfoAsString(customProps, numCores, systemMemory, numWorkers) + } + + test("test AutoTuner for Qualification sets batch size to 1GB") { + // mock the properties loaded from eventLog + val logEventsProps: mutable.Map[String, String] = + mutable.LinkedHashMap[String, String]( + "spark.executor.cores" -> "32", + "spark.executor.instances" -> "1", + "spark.executor.memory" -> "80g", + "spark.executor.instances" -> "1" + ) + val workerInfo = buildCpuWorkerInfoAsString(None, Some(32), + Some("212992MiB"), Some(5)) + val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0), + logEventsProps, Some(defaultSparkVersion)) + val clusterPropsOpt = QualificationAutoTunerConfigsProvider + .loadClusterPropertiesFromContent(workerInfo) + val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt) + val autoTuner = QualificationAutoTunerConfigsProvider.buildAutoTunerFromProps( + workerInfo, infoProvider, platform) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + val expectedResults = Seq( + "--conf spark.rapids.sql.batchSizeBytes=1073741824", + "- 'spark.rapids.sql.batchSizeBytes' was not set." + ) + assert(expectedResults.forall(autoTunerOutput.contains)) + } +}