Skip to content

Commit

Permalink
Split AutoTuner for Profiling and Qualification and override batch size
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Dec 18, 2024
1 parent 4143ccc commit 162f61d
Show file tree
Hide file tree
Showing 11 changed files with 574 additions and 314 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))
default = Some(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH))

validate(filterCriteria) {
case crit if (crit.endsWith("-newest-filesystem") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.views._
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -419,9 +418,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
// assumptions made in the code
if (appInfo.isDefined && appInfo.get.appInfo.head.pluginEnabled) {
val appInfoProvider = AppSummaryInfoBaseProvider.fromAppInfo(appInfo)
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val clusterPropsOpt = loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider,
val workerInfoPath = appArgs.workerInfo
.getOrElse(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH)
val clusterPropsOpt = ProfilingAutoTunerConfigsProvider.loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider.buildAutoTuner(appInfoProvider,
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt), driverInfoProvider)

// The autotuner allows skipping some properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.tool.{EventLogInfo, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator
Expand Down Expand Up @@ -147,7 +146,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
// we need a platform per application because it's storing cluster information which could
// vary between applications, especially when using dynamic allocation
val platform = {
val clusterPropsOpt = AutoTuner.loadClusterProps(workerInfoPath)
val clusterPropsOpt = QualificationAutoTunerConfigsProvider.loadClusterProps(workerInfoPath)
PlatformFactory.createInstance(platformArg, clusterPropsOpt)
}
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -195,7 +194,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))
default = Some(QualificationAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH))
val clusterReport: ScallopOption[Boolean] =
toggle("cluster-report",
default = Some(true),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform}
import com.nvidia.spark.rapids.tool.profiling.{AutoTuner, AutoTunerConfigsProvider, ClusterProperties, DriverLogInfoProvider}


/**
* Implementation of the `AutoTuner` designed the Qualification Tool. This class can be used to
* implement the logic to recommend AutoTuner configurations by the Qualification Tool.
*/
class QualificationAutoTuner(
clusterProps: ClusterProperties,
appInfoProvider: AppSummaryInfoBaseProvider,
platform: Platform,
driverInfoProvider: DriverLogInfoProvider)
extends AutoTuner(clusterProps, appInfoProvider, platform, driverInfoProvider,
QualificationAutoTunerConfigsProvider)

/**
* Provides configuration settings for the Qualification Tool's AutoTuner
*/
object QualificationAutoTunerConfigsProvider extends AutoTunerConfigsProvider {

// For qualification tool's auto-tuner, the batch size to be recommended is 1GB
// See https://github.com/NVIDIA/spark-rapids-tools/issues/1399
override val BATCH_SIZE_BYTES = 1073741824

override def createAutoTunerInstance(
clusterProps: ClusterProperties,
appInfoProvider: AppSummaryInfoBaseProvider,
platform: Platform,
driverInfoProvider: DriverLogInfoProvider): AutoTuner = {
new QualificationAutoTuner(clusterProps, appInfoProvider, platform, driverInfoProvider)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.tuning.TunerContext

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -75,7 +74,8 @@ object QualificationMain extends Logging {
// This platform instance should not be used for anything other then referencing the
// files for this particular Platform.
val referencePlatform = try {
val clusterPropsOpt = loadClusterProps(appArgs.workerInfo())
val clusterPropsOpt =
QualificationAutoTunerConfigsProvider.loadClusterProps(appArgs.workerInfo())
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt)
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import scala.util.{Failure, Success, Try}
import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform, ToolTextFileWriter}
import com.nvidia.spark.rapids.tool.analysis.AggRawMetricsResult
import com.nvidia.spark.rapids.tool.profiling.{AutoTuner, DataSourceProfileResult, Profiler}
import com.nvidia.spark.rapids.tool.qualification.QualificationAutoTunerConfigsProvider
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

/**
* Implementation of the AutoTuner for Qualification.
* A wrapper class to run the AutoTuner for Qualification Tool.
* @param appInfoProvider Provider of the qualification analysis data
* @param tunerContext Container which holds the arguments passed to the AutoTuner execution
*/
class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider,
class QualificationAutoTunerRunner(val appInfoProvider: QualAppSummaryInfoProvider,
val tunerContext: TunerContext) {

// When enabled, the profiler recommendations should only include updated settings.
Expand Down Expand Up @@ -65,7 +66,8 @@ class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider,
}

def runAutoTuner(platform: Platform): TuningResult = {
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider, platform)
val autoTuner: AutoTuner =
QualificationAutoTunerConfigsProvider.buildAutoTuner(appInfoProvider, platform)
val (recommendations, comments) =
autoTuner.getRecommendedProperties(showOnlyUpdatedProps = filterByUpdatedPropsEnabled)
// Combine the GPU recommendations with all others.
Expand All @@ -82,17 +84,17 @@ class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider,
}
}

object QualificationAutoTuner extends Logging {
object QualificationAutoTunerRunner extends Logging {
def apply(appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo],
tunerContext: TunerContext,
rawAggMetrics: AggRawMetricsResult,
dsInfo: Seq[DataSourceProfileResult]): Option[QualificationAutoTuner] = {
dsInfo: Seq[DataSourceProfileResult]): Option[QualificationAutoTunerRunner] = {
Try {
val qualInfoProvider: QualAppSummaryInfoProvider =
AppSummaryInfoBaseProvider.fromQualAppInfo(appInfo, appAggStats, rawAggMetrics, dsInfo)
.asInstanceOf[QualAppSummaryInfoProvider]
new QualificationAutoTuner(qualInfoProvider, tunerContext)
new QualificationAutoTunerRunner(qualInfoProvider, tunerContext)
} match {
case Success(q) => Some(q)
case Failure(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class TunerContext (
val sqlAnalyzer = AppSQLPlanAnalyzer(appInfo, appIndex)
val rawAggMetrics =
QualSparkMetricsAnalyzer.getAggRawMetrics(appInfo, appIndex, Some(sqlAnalyzer))
QualificationAutoTuner(appInfo, appAggStats, this, rawAggMetrics, dsInfo).collect {
QualificationAutoTunerRunner(appInfo, appAggStats, this, rawAggMetrics, dsInfo).collect {
case qualTuner =>
Try {
qualTuner.runAutoTuner(platform)
Expand Down
Loading

0 comments on commit 162f61d

Please sign in to comment.