Skip to content

Commit

Permalink
Add recommendations in AutoTuner from driver logs
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Nov 28, 2023
1 parent 40cd46b commit 8720dd5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ class RecommendationEntry(val name: String,
class AutoTuner(
val clusterProps: ClusterProperties,
val appInfoProvider: AppSummaryInfoBaseProvider,
val platform: Platform) extends Logging {
val platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]) extends Logging {

import AutoTuner._

Expand All @@ -350,11 +351,9 @@ class AutoTuner(
}

def getPropertyValue(key: String): Option[String] = {
val fromProfile = appInfoProvider.getProperty(key)
fromProfile match {
case None => Option(clusterProps.softwareProperties.get(key))
case Some(_) => fromProfile
}
val fromProfile = Option(appInfoProvider).flatMap(_.getProperty(key))
// If the value is not found above, fallback to cluster properties
fromProfile.orElse(Option(clusterProps.softwareProperties.get(key)))
}

def initRecommendations(): Unit = {
Expand Down Expand Up @@ -819,6 +818,23 @@ class AutoTuner(
appendRecommendation("spark.sql.shuffle.partitions", s"$shufflePartitions")
}

/**
* Analyzes unsupported driver logs and generates recommendations for configuration properties.
*/
private def recommendFromDriverLogs(): Unit = {
val doc_url = "https://nvidia.github.io/spark-rapids/docs/additional-functionality/" +
"advanced_configs.html#advanced-configuration"
// Iterate through unsupported operators' reasons and check for matching properties
unsupportedOperators.map(_.reason).foreach { operatorReason =>
recommendationsFromDriverLogs.collect {
case (config, recommendedValue) if operatorReason.contains(config) =>
appendRecommendation(config, recommendedValue)
appendComment(s"Using $config does not guarantee to produce the same results as CPU. " +
s"Please refer to $doc_url")
}
}
}

def appendOptionalComment(lookup: String, comment: String): Unit = {
if (!skippedRecommendations.contains(lookup)) {
appendComment(comment)
Expand Down Expand Up @@ -921,6 +937,9 @@ class AutoTuner(
case (property, value) => appendRecommendation(property, value)
}
}
if(unsupportedOperators.nonEmpty) {
recommendFromDriverLogs()
}
(toRecommendationsProfileResult, toCommentProfileResult)
}
}
Expand Down Expand Up @@ -1017,15 +1036,22 @@ object AutoTuner extends Logging {
" If the Spark RAPIDS jar is being bundled with your Spark\n" +
" distribution, this step is not needed.")
)

// Recommended values for specific unsupported configurations
private val recommendationsFromDriverLogs: Map[String, String] = Map(
"spark.rapids.sql.incompatibleDateFormats.enabled" -> "true"
)

// the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar
val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r

private def handleException(
ex: Exception,
appInfo: AppSummaryInfoBaseProvider,
platform: Platform): AutoTuner = {
platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]): AutoTuner = {
logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")"))
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform)
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform, unsupportedOperators)
val msg = ex match {
case cEx: ConstructorException => cEx.getContext
case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString
Expand Down Expand Up @@ -1079,10 +1105,11 @@ object AutoTuner extends Logging {
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

Expand All @@ -1093,10 +1120,11 @@ object AutoTuner extends Logging {
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(filePath)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ object ProfileMain extends Logging {
}

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
profiler.profile(eventLogFsFiltered)
if (driverLog.nonEmpty){
profiler.profileDriver(driverLog)
if (driverLog.nonEmpty) {
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty)
}
profiler.profile(eventLogFsFiltered)
(0, filteredLogs.size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea

private val useAutoTuner: Boolean = appArgs.autoTuner()
private var progressBar: Option[ConsoleProgressBar] = None
private var unsupportedDriverOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty

logInfo(s"Threadpool size is $nThreads")

Expand Down Expand Up @@ -124,10 +125,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String): Unit = {
def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = {
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)

try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
unsupportedDriverOperators = driverLogProcessor.processDriverLog()
Expand Down Expand Up @@ -489,7 +489,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
} else {
appsSum
}
sums.foreach { app =>
sums.foreach { app: ApplicationSummaryInfo =>
profileOutputWriter.writeText("### A. Information Collected ###")
profileOutputWriter.write("Application Information", app.appInfo)
profileOutputWriter.write("Application Log Path Mapping", app.appLogPath)
Expand Down

0 comments on commit 8720dd5

Please sign in to comment.