Skip to content

Commit

Permalink
[FEA] Qualification tool triggers the AutoTuner module (NVIDIA#739)
Browse files Browse the repository at this point in the history
* [FEA] Qualification tool triggers the AtutoTuner module

Fixes NVIDIA#700

This is an incremental step toward the full automation of App migration to GPU.

- Add Qual arg `--auto-tuner` to toggle the AutoTuner module. Default is Off.
- Add Qual arg `--worker-info` to pass the GPU worker info to the Qual's AutoTuner.
- When AutoTuner is enabled, the Qual tool will launch the AutoTuner module to make some basic recommendations/comments based on the Spark/Env properties.
- A new folder `rapids_4_spark_qualification_output/tuning` is created which contains a text formatted file for each app. Each file is named after the AppID.
- No unit-tests is added for now because: 1- the recommendations are based on the Profiler's implementation; and the feature is disabled by default.
- There will be followup to incrementally split the logic of the AutoTuner into two classes that aim to tailor the rules/policies of the recommendations to the CPU applications.

---------

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein authored Jan 25, 2024
1 parent d69dab8 commit a154c0b
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.tuning.QualAppSummaryInfoProvider

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

case class ApplicationSummaryInfo(
appInfo: Seq[AppInfoProfileResults],
Expand Down Expand Up @@ -201,4 +204,16 @@ object AppSummaryInfoBaseProvider {
case _ => new AppSummaryInfoBaseProvider()
}
}

/**
* Constructs an application information provider based on the results of Qualification
* tool.
* @param appInfo
* @param appAggStats optional aggregate of application stats
* @return object that can be used by the AutoTuner to calculate the recommendations
*/
def fromQualAppInfo(appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo] = None): AppSummaryInfoBaseProvider = {
new QualAppSummaryInfoProvider(appInfo, appAggStats)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.rapids.tool.qualification._
Expand All @@ -34,7 +35,8 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
penalizeTransitions: Boolean) extends RuntimeReporter {
penalizeTransitions: Boolean,
tunerContext: Option[TunerContext]) extends RuntimeReporter {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand Down Expand Up @@ -181,6 +183,19 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
case Right(app: QualificationAppInfo) =>
// Case with successful creation of QualificationAppInfo
val qualSumInfo = app.aggregateStats()
tunerContext.collect {
// Run the autotuner if it is enabled.
// Note that we call the autotuner anyway without checking the aggregate results
// because the Autotuner can still make some recommendations based on the information
// enclosed by the QualificationInfo object
case tuner =>
// autoTuner is enabled for Qualification
tuner.tuneApplication(app, qualSumInfo).collect {
case res =>
logInfo(s"RecommendedProps ${app.appId} = ${res.recommendations.mkString("\n")}")
logInfo(s"Comments ${app.appId} = ${res.comments.mkString("\n")}")
}
}
if (qualSumInfo.isDefined) {
allApps.add(qualSumInfo.get)
progressBar.foreach(_.reportSuccessfulProcess())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -164,6 +165,16 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "Custom speedup factor file used to get estimated GPU speedup that is specific " +
"to the user's environment. If the file is not provided, it defaults to use the " +
"speedup files included in the jar.")
val autoTuner: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Toggle AutoTuner module.",
default = Some(false))
val workerInfo: ScallopOption[String] =
opt[String](required = false,
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))

validate(order) {
case o if (QualificationArgs.isOrderAsc(o) || QualificationArgs.isOrderDesc(o)) => Right(Unit)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.tuning.TunerContext

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
Expand Down Expand Up @@ -58,15 +59,21 @@ object QualificationMain extends Logging {
val order = appArgs.order.getOrElse("desc")
val uiEnabled = appArgs.htmlReport.getOrElse(false)
val reportSqlLevel = appArgs.perSql.getOrElse(false)
val platform = appArgs.platform()
val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false)
val penalizeTransitions = appArgs.penalizeTransitions.getOrElse(true)

val hadoopConf = RapidsToolsConfUtil.newHadoopConf
val platform = try {
PlatformFactory.createInstance(appArgs.platform())
} catch {
case ie: IllegalStateException =>
logError("Error creating the platform", ie)
return (1, Seq[QualificationSummaryInfo]())
}

val pluginTypeChecker = try {
new PluginTypeChecker(
PlatformFactory.createInstance(platform),
platform,
appArgs.speedupFactorFile.toOption)
} catch {
case ie: IllegalStateException =>
Expand All @@ -93,10 +100,16 @@ object QualificationMain extends Logging {
logWarning("No event logs to process after checking paths, exiting!")
return (0, Seq[QualificationSummaryInfo]())
}

// create the AutoTuner context object
val tunerContext = if (appArgs.autoTuner()) {
TunerContext(platform, appArgs.workerInfo(), outputDirectory, Option(hadoopConf))
} else {
None
}
val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled,
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions)
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions,
tunerContext)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import com.nvidia.spark.rapids.tool.profiling.AppSummaryInfoBaseProvider

import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

/**
* Implementation of AppInfoPropertyGetter to wrap the output of the Qualification analysis.
* @param appInfo the main QualificationAppInfo object representing the CPU application.
* @param appAggStats optional stats aggregate is included here for future improvement as we may
* need to feed the autotuner with values from the aggregates.
*/
class QualAppSummaryInfoProvider(
val appInfo: QualificationAppInfo,
val appAggStats: Option[QualificationSummaryInfo]) extends AppSummaryInfoBaseProvider {
override def isAppInfoAvailable = true
private def findPropertyInternal(
key: String, props: collection.Map[String, String]): Option[String] = {
props.get(key)
}

override def getSparkProperty(propKey: String): Option[String] = {
findPropertyInternal(propKey, appInfo.sparkProperties)
}

override def getRapidsProperty(propKey: String): Option[String] = {
getSparkProperty(propKey)
}

override def getSystemProperty(propKey: String): Option[String] = {
findPropertyInternal(propKey, appInfo.systemProperties)
}

override def getSparkVersion: Option[String] = {
Option(appInfo.sparkVersion)
}

def getAppID: String = appInfo.appId
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.profiling.{AppSummaryInfoBaseProvider, AutoTuner, Profiler}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

/**
* Implementation of the AutoTuner for Qualification.
* @param appInfoProvider Provider of the qualification analysis data
* @param tunerContext Container which holds the arguments passed to the AutoTuner execution
*/
class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider,
val tunerContext: TunerContext) {

private def writeTuningReport(tuningResult: TuningResult,
outputDir: String, hadoopConf: Configuration): Unit = {
val textFileWriter = new ToolTextFileWriter(outputDir,
s"${tuningResult.appID}.log", s"Tuning Qual app - ${tuningResult.appID}", Option(hadoopConf))
try {
textFileWriter.write(s"### Recommended Configuration for App: ${tuningResult.appID} ###\n")
textFileWriter.write(Profiler.getAutoTunerResultsAsString(
tuningResult.recommendations, tuningResult.comments))
} finally {
textFileWriter.close()
}
}
def runAutoTuner(): TuningResult = {
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(
tunerContext.workerInfoPath, appInfoProvider, tunerContext.platform)
val (recommendations, comments) = autoTuner.getRecommendedProperties()
val resultRecord = TuningResult(appInfoProvider.getAppID, recommendations, comments)
writeTuningReport(resultRecord, tunerContext.getOutputPath, tunerContext.hadoopConf)
resultRecord
}
}

object QualificationAutoTuner extends Logging {
def apply(appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo],
tunerContext: TunerContext): Option[QualificationAutoTuner] = {
Try {
val qualInfoProvider: QualAppSummaryInfoProvider =
AppSummaryInfoBaseProvider.fromQualAppInfo(appInfo, appAggStats)
.asInstanceOf[QualAppSummaryInfoProvider]
new QualificationAutoTuner(qualInfoProvider, tunerContext)
} match {
case Success(q) => Some(q)
case Failure(e) =>
logError(
s"Failed to create Qualification tuning object for application ${appInfo.appId}", e)
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids.tool.Platform
import com.nvidia.spark.rapids.tool.profiling.{RecommendedCommentResult, RecommendedPropertyResult}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil

case class TuningResult(
appID: String,
recommendations: Seq[RecommendedPropertyResult],
comments: Seq[RecommendedCommentResult])

/**
* Container which holds metadata and arguments specific to the execution of the AutoTuner.
* TODO: we need to use the same class in constructing the AutoTuner in the Profiling tools.
* @param platform object representing the host platform on which the application was executed.
* @param workerInfoPath the path of the GPU workers
* @param outputRootDir the output directory to dump the recommendation/comments.
* @param hadoopConf optional configuration to access the remote storage.
*/
case class TunerContext (
platform: Platform,
workerInfoPath: String,
outputRootDir: String,
hadoopConf: Configuration) extends Logging {

def getOutputPath: String = {
s"$outputRootDir/rapids_4_spark_qualification_output/tuning"
}

def tuneApplication(
appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo]): Option[TuningResult] = {
QualificationAutoTuner(appInfo, appAggStats, this).collect {
case qualTuner =>
Try {
qualTuner.runAutoTuner()
} match {
case Success(r) => r
case Failure(e) =>
logError(s"Failed to generate tuning recommendations for app: ${appInfo.appId}", e)
null
}
}
}
}

object TunerContext extends Logging {
def apply(platform: Platform,
workerInfoPath: String,
outputRootDir: String,
hadoopConf: Option[Configuration] = None): Option[TunerContext] = {
Try {
val hConf = hadoopConf.getOrElse(RapidsToolsConfUtil.newHadoopConf())
TunerContext(platform, workerInfoPath, outputRootDir, hConf)
} match {
case Success(c) => Some(c)
case Failure(e) =>
logError("Could not create Tuner Context", e)
None
}
}
}

0 comments on commit a154c0b

Please sign in to comment.