Skip to content

Commit

Permalink
Enable features via config that are off by default in the profiler Au…
Browse files Browse the repository at this point in the history
…toTuner (#668)
  • Loading branch information
parthosa authored Dec 5, 2023
1 parent 3fa1a28 commit 6e8f1f5
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ class RecommendationEntry(val name: String,
class AutoTuner(
val clusterProps: ClusterProperties,
val appInfoProvider: AppSummaryInfoBaseProvider,
val platform: Platform) extends Logging {
val platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]) extends Logging {

import AutoTuner._

Expand All @@ -350,11 +351,9 @@ class AutoTuner(
}

def getPropertyValue(key: String): Option[String] = {
val fromProfile = appInfoProvider.getProperty(key)
fromProfile match {
case None => Option(clusterProps.softwareProperties.get(key))
case Some(_) => fromProfile
}
val fromProfile = Option(appInfoProvider).flatMap(_.getProperty(key))
// If the value is not found above, fallback to cluster properties
fromProfile.orElse(Option(clusterProps.softwareProperties.get(key)))
}

def initRecommendations(): Unit = {
Expand Down Expand Up @@ -819,6 +818,20 @@ class AutoTuner(
appendRecommendation("spark.sql.shuffle.partitions", s"$shufflePartitions")
}

/**
* Analyzes unsupported driver logs and generates recommendations for configuration properties.
*/
private def recommendFromDriverLogs(): Unit = {
// Iterate through unsupported operators' reasons and check for matching properties
unsupportedOperators.map(_.reason).foreach { operatorReason =>
recommendationsFromDriverLogs.collect {
case (config, recommendedValue) if operatorReason.contains(config) =>
appendRecommendation(config, recommendedValue)
appendComment(commentForExperimentalConfig(config))
}
}
}

def appendOptionalComment(lookup: String, comment: String): Unit = {
if (!skippedRecommendations.contains(lookup)) {
appendComment(comment)
Expand Down Expand Up @@ -921,6 +934,9 @@ class AutoTuner(
case (property, value) => appendRecommendation(property, value)
}
}
if (unsupportedOperators.nonEmpty) {
recommendFromDriverLogs()
}
(toRecommendationsProfileResult, toCommentProfileResult)
}
}
Expand Down Expand Up @@ -970,6 +986,8 @@ object AutoTuner extends Logging {
val DEF_READ_SIZE_THRESHOLD = 100 * 1024L * 1024L * 1024L
val DEFAULT_WORKER_INFO_PATH = "./worker_info.yaml"
val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p")
private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" +
"additional-functionality/advanced_configs.html#advanced-configuration"

val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.memory" ->
Expand Down Expand Up @@ -1017,15 +1035,27 @@ object AutoTuner extends Logging {
" If the Spark RAPIDS jar is being bundled with your Spark\n" +
" distribution, this step is not needed.")
)

// Recommended values for specific unsupported configurations
private val recommendationsFromDriverLogs: Map[String, String] = Map(
"spark.rapids.sql.incompatibleDateFormats.enabled" -> "true"
)

def commentForExperimentalConfig(config: String): String = {
s"Using $config does not guarantee to produce the same results as CPU. " +
s"Please refer to $DOC_URL."
}

// the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar
val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r

private def handleException(
ex: Exception,
appInfo: AppSummaryInfoBaseProvider,
platform: Platform): AutoTuner = {
platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]): AutoTuner = {
logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")"))
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform)
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform, unsupportedOperators)
val msg = ex match {
case cEx: ConstructorException => cEx.getContext
case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString
Expand Down Expand Up @@ -1075,26 +1105,30 @@ object AutoTuner extends Logging {
def buildAutoTunerFromProps(
clusterProps: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.createInstance()): AutoTuner = {
platform: Platform = PlatformFactory.createInstance(),
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

def buildAutoTuner(
filePath: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.createInstance()): AutoTuner = {
platform: Platform = PlatformFactory.createInstance(),
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(filePath)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames}
import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ object ProfileMain extends Logging {
}

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
profiler.profile(eventLogFsFiltered)
if (driverLog.nonEmpty){
profiler.profileDriver(driverLog)
if (driverLog.nonEmpty) {
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty)
}
profiler.profile(eventLogFsFiltered)
(0, filteredLogs.size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String): Unit = {
def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = {
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)

try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
val unsupportedDrivers = driverLogProcessor.processDriverLog()
val unsupportedDriverOperators = driverLogProcessor.processDriverLog()
profileOutputWriter.write(s"Unsupported operators in driver log",
unsupportedDrivers)
unsupportedDriverOperators)
if (eventLogsEmpty && useAutoTuner) {
// Since event logs are empty, AutoTuner will not run while processing event logs.
// We need to run it here explicitly.
val (properties, comments) = runAutoTuner(None, unsupportedDriverOperators)
profileOutputWriter.writeText("\n### A. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}
} finally {
profileOutputWriter.close()
}
Expand Down Expand Up @@ -403,6 +409,26 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appLogPath, ioAnalysisMetrics), compareRes)
}

/**
* A wrapper method to run the AutoTuner.
* @param appInfo Summary of the application for tuning.
* @param unsupportedDriverOperators List of unsupported operators from driver log
*/
private def runAutoTuner(appInfo: Option[ApplicationSummaryInfo],
unsupportedDriverOperators: Seq[DriverLogUnsupportedOperators])
: (Seq[RecommendedPropertyResult], Seq[RecommendedCommentResult]) = {
val appInfoProvider = appInfo.map(new SingleAppSummaryInfoProvider(_)).orNull
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, appInfoProvider,
PlatformFactory.createInstance(platform), unsupportedDriverOperators)

// The autotuner allows skipping some properties,
// e.g., getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
autoTuner.getRecommendedProperties()
}

def writeOutput(profileOutputWriter: ProfileOutputWriter,
appsSum: Seq[ApplicationSummaryInfo], outputCombined: Boolean,
comparedRes: Option[CompareSummaryInfo] = None): Unit = {
Expand Down Expand Up @@ -464,7 +490,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
} else {
appsSum
}
sums.foreach { app =>
sums.foreach { app: ApplicationSummaryInfo =>
profileOutputWriter.writeText("### A. Information Collected ###")
profileOutputWriter.write("Application Information", app.appInfo)
profileOutputWriter.write("Application Log Path Mapping", app.appLogPath)
Expand Down Expand Up @@ -510,15 +536,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some("Unsupported SQL Ops"))

if (useAutoTuner) {
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath,
new SingleAppSummaryInfoProvider(app),
PlatformFactory.createInstance(platform))
// the autotuner allows skipping some properties
// e.g. getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
val (properties, comments) = autoTuner.getRecommendedProperties()
val (properties, comments) = runAutoTuner(Some(app), Seq.empty)
profileOutputWriter.writeText("\n### D. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames}
import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1359,4 +1359,152 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations generated for unsupported operators from driver logs only") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"FromUnixTime", 1,
"Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " +
"CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " +
"results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " +
"to force onto GPU.")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.rapids.sql.incompatibleDateFormats.enabled=true
|
|Comments:
|- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set.
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations generated for unsupported operators from driver and event logs") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"FromUnixTime", 1,
"Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " +
"CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " +
"results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " +
"to force onto GPU.")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo,
getGpuAppMockInfoProvider, PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.cores=16
|--conf spark.executor.instances=8
|--conf spark.executor.memory=32768m
|--conf spark.executor.memoryOverhead=8396m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=16
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=16
|--conf spark.rapids.sql.incompatibleDateFormats.enabled=true
|--conf spark.rapids.sql.multiThreadedRead.numThreads=20
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' was not set.
|- 'spark.executor.memoryOverhead' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
|- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set.
|- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set.
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- ${AutoTuner.classPathComments("rapids.jars.missing")}
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}


test("Recommendations generated for empty unsupported operators from driver logs only") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), Seq.empty)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
|
|Comments:
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations not generated for unsupported operators from driver logs") {
// This test does not generate any recommendations for the unsupported operator 'Literal'
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"Literal", 3,
"expression Literal 1700518632630000 produces an unsupported type TimestampType")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
|
|Comments:
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}
}

0 comments on commit 6e8f1f5

Please sign in to comment.