Skip to content

Commit

Permalink
Split AutoTuner for Profiling and Qualification and Override BATCH_SI…
Browse files Browse the repository at this point in the history
…ZE_BYTES (#1471)

* Split AutoTuner for Profiling and Qualification and override batch size

Signed-off-by: Partho Sarthi <[email protected]>

* Refactor AutoTuner and move it to tuning package

Signed-off-by: Partho Sarthi <[email protected]>

* Address feedbacks

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Dec 31, 2024
1 parent 8a78f5d commit 5380342
Show file tree
Hide file tree
Showing 13 changed files with 813 additions and 523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool
import scala.annotation.tailrec

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.ClusterProperties
import com.nvidia.spark.rapids.tool.tuning.ClusterProperties

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.tuning.ProfilingAutoTunerConfigsProvider
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -117,7 +118,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))
default = Some(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH))

validate(filterCriteria) {
case crit if (crit.endsWith("-newest-filesystem") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, Platform, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.tuning.{AutoTuner, ProfilingAutoTunerConfigsProvider}
import com.nvidia.spark.rapids.tool.views._
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -421,9 +421,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
// assumptions made in the code
if (appInfo.isDefined && appInfo.get.appInfo.head.pluginEnabled) {
val appInfoProvider = AppSummaryInfoBaseProvider.fromAppInfo(appInfo)
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val clusterPropsOpt = loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider,
val workerInfoPath = appArgs.workerInfo
.getOrElse(ProfilingAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH)
val clusterPropsOpt = ProfilingAutoTunerConfigsProvider.loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = ProfilingAutoTunerConfigsProvider.buildAutoTuner(appInfoProvider,
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt), driverInfoProvider)

// The autotuner allows skipping some properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.tool.{EventLogInfo, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.tuning.{QualificationAutoTunerConfigsProvider, TunerContext}
import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -147,7 +146,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
// we need a platform per application because it's storing cluster information which could
// vary between applications, especially when using dynamic allocation
val platform = {
val clusterPropsOpt = AutoTuner.loadClusterProps(workerInfoPath)
val clusterPropsOpt = QualificationAutoTunerConfigsProvider.loadClusterProps(workerInfoPath)
PlatformFactory.createInstance(platformArg, clusterPropsOpt)
}
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import com.nvidia.spark.rapids.tool.tuning.QualificationAutoTunerConfigsProvider
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -195,7 +195,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))
default = Some(QualificationAutoTunerConfigsProvider.DEFAULT_WORKER_INFO_PATH))
val clusterReport: ScallopOption[Boolean] =
toggle("cluster-report",
default = Some(true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.tuning.{QualificationAutoTunerConfigsProvider, TunerContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
Expand Down Expand Up @@ -75,7 +74,8 @@ object QualificationMain extends Logging {
// This platform instance should not be used for anything other then referencing the
// files for this particular Platform.
val referencePlatform = try {
val clusterPropsOpt = loadClusterProps(appArgs.workerInfo())
val clusterPropsOpt =
QualificationAutoTunerConfigsProvider.loadClusterProps(appArgs.workerInfo())
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt)
} catch {
case NonFatal(e) =>
Expand Down
Loading

0 comments on commit 5380342

Please sign in to comment.