From 6e8f1f5b7bd2dac77d1a2f9eca3eb7b224b28e7d Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 5 Dec 2023 09:37:59 -0800 Subject: [PATCH] Enable features via config that are off by default in the profiler AutoTuner (#668) --- .../rapids/tool/profiling/AutoTuner.scala | 62 ++++++-- .../rapids/tool/profiling/ProfileArgs.scala | 2 +- .../rapids/tool/profiling/ProfileMain.scala | 6 +- .../rapids/tool/profiling/Profiler.scala | 46 ++++-- .../qualification/QualificationArgs.scala | 2 +- .../tool/profiling/AutoTunerSuite.scala | 148 ++++++++++++++++++ 6 files changed, 233 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index e0f923bfb..5419df9c6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -330,7 +330,8 @@ class RecommendationEntry(val name: String, class AutoTuner( val clusterProps: ClusterProperties, val appInfoProvider: AppSummaryInfoBaseProvider, - val platform: Platform) extends Logging { + val platform: Platform, + unsupportedOperators: Seq[DriverLogUnsupportedOperators]) extends Logging { import AutoTuner._ @@ -350,11 +351,9 @@ class AutoTuner( } def getPropertyValue(key: String): Option[String] = { - val fromProfile = appInfoProvider.getProperty(key) - fromProfile match { - case None => Option(clusterProps.softwareProperties.get(key)) - case Some(_) => fromProfile - } + val fromProfile = Option(appInfoProvider).flatMap(_.getProperty(key)) + // If the value is not found above, fallback to cluster properties + fromProfile.orElse(Option(clusterProps.softwareProperties.get(key))) } def initRecommendations(): Unit = { @@ -819,6 +818,20 @@ class AutoTuner( appendRecommendation("spark.sql.shuffle.partitions", s"$shufflePartitions") } + /** + * Analyzes unsupported driver logs and generates recommendations for configuration properties. + */ + private def recommendFromDriverLogs(): Unit = { + // Iterate through unsupported operators' reasons and check for matching properties + unsupportedOperators.map(_.reason).foreach { operatorReason => + recommendationsFromDriverLogs.collect { + case (config, recommendedValue) if operatorReason.contains(config) => + appendRecommendation(config, recommendedValue) + appendComment(commentForExperimentalConfig(config)) + } + } + } + def appendOptionalComment(lookup: String, comment: String): Unit = { if (!skippedRecommendations.contains(lookup)) { appendComment(comment) @@ -921,6 +934,9 @@ class AutoTuner( case (property, value) => appendRecommendation(property, value) } } + if (unsupportedOperators.nonEmpty) { + recommendFromDriverLogs() + } (toRecommendationsProfileResult, toCommentProfileResult) } } @@ -970,6 +986,8 @@ object AutoTuner extends Logging { val DEF_READ_SIZE_THRESHOLD = 100 * 1024L * 1024L * 1024L val DEFAULT_WORKER_INFO_PATH = "./worker_info.yaml" val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p") + private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" + + "additional-functionality/advanced_configs.html#advanced-configuration" val commentsForMissingProps: Map[String, String] = Map( "spark.executor.memory" -> @@ -1017,15 +1035,27 @@ object AutoTuner extends Logging { " If the Spark RAPIDS jar is being bundled with your Spark\n" + " distribution, this step is not needed.") ) + + // Recommended values for specific unsupported configurations + private val recommendationsFromDriverLogs: Map[String, String] = Map( + "spark.rapids.sql.incompatibleDateFormats.enabled" -> "true" + ) + + def commentForExperimentalConfig(config: String): String = { + s"Using $config does not guarantee to produce the same results as CPU. " + + s"Please refer to $DOC_URL." + } + // the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r private def handleException( ex: Exception, appInfo: AppSummaryInfoBaseProvider, - platform: Platform): AutoTuner = { + platform: Platform, + unsupportedOperators: Seq[DriverLogUnsupportedOperators]): AutoTuner = { logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")")) - val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform) + val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform, unsupportedOperators) val msg = ex match { case cEx: ConstructorException => cEx.getContext case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString @@ -1075,26 +1105,30 @@ object AutoTuner extends Logging { def buildAutoTunerFromProps( clusterProps: String, singleAppProvider: AppSummaryInfoBaseProvider, - platform: Platform = PlatformFactory.createInstance()): AutoTuner = { + platform: Platform = PlatformFactory.createInstance(), + unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = { try { val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps) - new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform) + new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform, + unsupportedOperators) } catch { case e: Exception => - handleException(e, singleAppProvider, platform) + handleException(e, singleAppProvider, platform, unsupportedOperators) } } def buildAutoTuner( filePath: String, singleAppProvider: AppSummaryInfoBaseProvider, - platform: Platform = PlatformFactory.createInstance()): AutoTuner = { + platform: Platform = PlatformFactory.createInstance(), + unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = { try { val clusterPropsOpt = loadClusterProps(filePath) - new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform) + new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform, + unsupportedOperators) } catch { case e: Exception => - handleException(e, singleAppProvider, platform) + handleException(e, singleAppProvider, platform, unsupportedOperators) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala index 3723e4aea..839e3789e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids.tool.profiling -import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames} +import com.nvidia.spark.rapids.tool.PlatformNames import org.rogach.scallop.{ScallopConf, ScallopOption} import org.rogach.scallop.exceptions.ScallopException diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala index d839f76a8..71759a9ae 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala @@ -77,10 +77,10 @@ object ProfileMain extends Logging { } val profiler = new Profiler(hadoopConf, appArgs, enablePB) - profiler.profile(eventLogFsFiltered) - if (driverLog.nonEmpty){ - profiler.profileDriver(driverLog) + if (driverLog.nonEmpty) { + profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty) } + profiler.profile(eventLogFsFiltered) (0, filteredLogs.size) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index ae99e7343..a3fff6067 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -124,15 +124,21 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea progressBar.foreach(_.finishAll()) } - def profileDriver(driverLogInfos: String): Unit = { + def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = { val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver", Profiler.DRIVER_LOG_NAME, numOutputRows, true) - try { val driverLogProcessor = new DriverLogProcessor(driverLogInfos) - val unsupportedDrivers = driverLogProcessor.processDriverLog() + val unsupportedDriverOperators = driverLogProcessor.processDriverLog() profileOutputWriter.write(s"Unsupported operators in driver log", - unsupportedDrivers) + unsupportedDriverOperators) + if (eventLogsEmpty && useAutoTuner) { + // Since event logs are empty, AutoTuner will not run while processing event logs. + // We need to run it here explicitly. + val (properties, comments) = runAutoTuner(None, unsupportedDriverOperators) + profileOutputWriter.writeText("\n### A. Recommended Configuration ###\n") + profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments)) + } } finally { profileOutputWriter.close() } @@ -403,6 +409,26 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea appLogPath, ioAnalysisMetrics), compareRes) } + /** + * A wrapper method to run the AutoTuner. + * @param appInfo Summary of the application for tuning. + * @param unsupportedDriverOperators List of unsupported operators from driver log + */ + private def runAutoTuner(appInfo: Option[ApplicationSummaryInfo], + unsupportedDriverOperators: Seq[DriverLogUnsupportedOperators]) + : (Seq[RecommendedPropertyResult], Seq[RecommendedCommentResult]) = { + val appInfoProvider = appInfo.map(new SingleAppSummaryInfoProvider(_)).orNull + val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH) + val platform = appArgs.platform() + val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, appInfoProvider, + PlatformFactory.createInstance(platform), unsupportedDriverOperators) + + // The autotuner allows skipping some properties, + // e.g., getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the + // recommendation related to executor instances. + autoTuner.getRecommendedProperties() + } + def writeOutput(profileOutputWriter: ProfileOutputWriter, appsSum: Seq[ApplicationSummaryInfo], outputCombined: Boolean, comparedRes: Option[CompareSummaryInfo] = None): Unit = { @@ -464,7 +490,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea } else { appsSum } - sums.foreach { app => + sums.foreach { app: ApplicationSummaryInfo => profileOutputWriter.writeText("### A. Information Collected ###") profileOutputWriter.write("Application Information", app.appInfo) profileOutputWriter.write("Application Log Path Mapping", app.appLogPath) @@ -510,15 +536,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea Some("Unsupported SQL Ops")) if (useAutoTuner) { - val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH) - val platform = appArgs.platform() - val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, - new SingleAppSummaryInfoProvider(app), - PlatformFactory.createInstance(platform)) - // the autotuner allows skipping some properties - // e.g. getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the - // recommendation related to executor instances. - val (properties, comments) = autoTuner.getRecommendedProperties() + val (properties, comments) = runAutoTuner(Some(app), Seq.empty) profileOutputWriter.writeText("\n### D. Recommended Configuration ###\n") profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments)) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 24942b4b9..c72477034 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids.tool.qualification -import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames} +import com.nvidia.spark.rapids.tool.PlatformNames import org.rogach.scallop.{ScallopConf, ScallopOption} import org.rogach.scallop.exceptions.ScallopException diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index 670e60958..755c4f6c8 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -1359,4 +1359,152 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { // scalastyle:on line.size.limit assert(expectedResults == autoTunerOutput) } + + test("Recommendations generated for unsupported operators from driver logs only") { + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.task.resource.gpu.amount" -> "0.0625") + val unsupportedDriverOperators = Seq( + DriverLogUnsupportedOperators( + "FromUnixTime", 1, + "Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " + + "CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " + + "results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " + + "to force onto GPU.") + ) + val workerInfo = buildWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null, + PlatformFactory.createInstance(), unsupportedDriverOperators) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""| + |Spark Properties: + |--conf spark.rapids.sql.incompatibleDateFormats.enabled=true + | + |Comments: + |- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set. + |- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark + |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html + |- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")} + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } + + test("Recommendations generated for unsupported operators from driver and event logs") { + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.task.resource.gpu.amount" -> "0.0625") + val unsupportedDriverOperators = Seq( + DriverLogUnsupportedOperators( + "FromUnixTime", 1, + "Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " + + "CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " + + "results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " + + "to force onto GPU.") + ) + val workerInfo = buildWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, + getGpuAppMockInfoProvider, PlatformFactory.createInstance(), unsupportedDriverOperators) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""| + |Spark Properties: + |--conf spark.executor.cores=16 + |--conf spark.executor.instances=8 + |--conf spark.executor.memory=32768m + |--conf spark.executor.memoryOverhead=8396m + |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.shuffle.multiThreaded.reader.threads=16 + |--conf spark.rapids.shuffle.multiThreaded.writer.threads=16 + |--conf spark.rapids.sql.incompatibleDateFormats.enabled=true + |--conf spark.rapids.sql.multiThreadedRead.numThreads=20 + |--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager + |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m + |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128 + |--conf spark.sql.files.maxPartitionBytes=512m + |--conf spark.sql.shuffle.partitions=200 + | + |Comments: + |- 'spark.executor.instances' was not set. + |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.memory.pinnedPool.size' was not set. + |- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set. + |- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set. + |- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set. + |- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set. + |- 'spark.shuffle.manager' was not set. + |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. + |- 'spark.sql.adaptive.enabled' should be enabled for better performance. + |- 'spark.sql.files.maxPartitionBytes' was not set. + |- 'spark.sql.shuffle.partitions' was not set. + |- ${AutoTuner.classPathComments("rapids.jars.missing")} + |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")} + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } + + + test("Recommendations generated for empty unsupported operators from driver logs only") { + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.task.resource.gpu.amount" -> "0.0625") + val workerInfo = buildWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null, + PlatformFactory.createInstance(), Seq.empty) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""|Cannot recommend properties. See Comments. + | + |Comments: + |- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark + |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } + + test("Recommendations not generated for unsupported operators from driver logs") { + // This test does not generate any recommendations for the unsupported operator 'Literal' + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.task.resource.gpu.amount" -> "0.0625") + val unsupportedDriverOperators = Seq( + DriverLogUnsupportedOperators( + "Literal", 3, + "expression Literal 1700518632630000 produces an unsupported type TimestampType") + ) + val workerInfo = buildWorkerInfoAsString(Some(customProps)) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null, + PlatformFactory.createInstance(), unsupportedDriverOperators) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""|Cannot recommend properties. See Comments. + | + |Comments: + |- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark + |- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } }