Skip to content

Commit

Permalink
Refactor AutoTuner and move it to tuning package
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Dec 19, 2024
1 parent 162f61d commit b4c84c0
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool
import scala.annotation.tailrec

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.ClusterProperties
import com.nvidia.spark.rapids.tool.tuning.ClusterProperties

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.tuning.ProfilingAutoTunerConfigsProvider
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.tuning.{AutoTuner, ProfilingAutoTunerConfigsProvider}
import com.nvidia.spark.rapids.tool.views._
import org.apache.hadoop.conf.Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.tool.{EventLogInfo, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.tuning.{QualificationAutoTunerConfigsProvider, TunerContext}
import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator
import org.apache.hadoop.conf.Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.tuning.QualificationAutoTunerConfigsProvider
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.tuning.{QualificationAutoTunerConfigsProvider, TunerContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling
package com.nvidia.spark.rapids.tool.tuning

import java.io.{BufferedReader, InputStreamReader, IOException}
import java.util
Expand All @@ -28,6 +28,7 @@ import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory}
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.yaml.snakeyaml.{DumperOptions, LoaderOptions, Yaml}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.qualification
package com.nvidia.spark.rapids.tool.tuning

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform}
import com.nvidia.spark.rapids.tool.profiling.{AutoTuner, AutoTunerConfigsProvider, ClusterProperties, DriverLogInfoProvider}

import com.nvidia.spark.rapids.tool.profiling.DriverLogInfoProvider

/**
* Implementation of the `AutoTuner` designed the Qualification Tool. This class can be used to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, Platform, ToolTextFileWriter}
import com.nvidia.spark.rapids.tool.analysis.AggRawMetricsResult
import com.nvidia.spark.rapids.tool.profiling.{AutoTuner, DataSourceProfileResult, Profiler}
import com.nvidia.spark.rapids.tool.qualification.QualificationAutoTunerConfigsProvider
import com.nvidia.spark.rapids.tool.profiling.{DataSourceProfileResult, Profiler}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable

import com.nvidia.spark.rapids.tool.AppSummaryInfoBaseProvider
import com.nvidia.spark.rapids.tool.profiling._
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.yaml.snakeyaml.{DumperOptions, Yaml}

import org.apache.spark.internal.Logging


case class DriverInfoProviderMockTest(unsupportedOps: Seq[DriverLogUnsupportedOperators])
extends BaseDriverLogInfoProvider(None) {
override def getUnsupportedOperators: Seq[DriverLogUnsupportedOperators] = unsupportedOps
}

class AppInfoProviderMockTest(val maxInput: Double,
val spilledMetrics: Seq[Long],
val jvmGCFractions: Seq[Double],
val propsFromLog: mutable.Map[String, String],
val sparkVersion: Option[String],
val rapidsJars: Seq[String],
val distinctLocationPct: Double,
val redundantReadSize: Long,
val meanInput: Double,
val meanShuffleRead: Double,
val shuffleStagesWithPosSpilling: Set[Long],
val shuffleSkewStages: Set[Long]) extends AppSummaryInfoBaseProvider {
override def isAppInfoAvailable = true
override def getMaxInput: Double = maxInput
override def getMeanInput: Double = meanInput
override def getMeanShuffleRead: Double = meanShuffleRead
override def getSpilledMetrics: Seq[Long] = spilledMetrics
override def getJvmGCFractions: Seq[Double] = jvmGCFractions
override def getRapidsProperty(propKey: String): Option[String] = propsFromLog.get(propKey)
override def getSparkProperty(propKey: String): Option[String] = propsFromLog.get(propKey)
override def getSystemProperty(propKey: String): Option[String] = propsFromLog.get(propKey)
override def getSparkVersion: Option[String] = sparkVersion
override def getRapidsJars: Seq[String] = rapidsJars
override def getDistinctLocationPct: Double = distinctLocationPct
override def getRedundantReadSize: Long = redundantReadSize
override def getShuffleStagesWithPosSpilling: Set[Long] = shuffleStagesWithPosSpilling
override def getShuffleSkewStages: Set[Long] = shuffleSkewStages
}

/**
* Base class for AutoTuner test suites
*/
abstract class BaseAutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {

val defaultSparkVersion = "3.1.1"

val defaultDataprocProps: mutable.Map[String, String] = {
mutable.LinkedHashMap[String, String](
"spark.dynamicAllocation.enabled" -> "true",
"spark.driver.maxResultSize" -> "7680m",
"spark.driver.memory" -> "15360m",
"spark.executor.cores" -> "16",
"spark.executor.instances" -> "2",
"spark.executor.resource.gpu.amount" -> "1",
"spark.executor.memory" -> "26742m",
"spark.executor.memoryOverhead" -> "7372m",
"spark.executorEnv.OPENBLAS_NUM_THREADS" -> "1",
"spark.extraListeners" -> "com.google.cloud.spark.performance.DataprocMetricsListener",
"spark.rapids.memory.pinnedPool.size" -> "2048m",
"spark.scheduler.mode" -> "FAIR",
"spark.sql.cbo.enabled" -> "true",
"spark.sql.adaptive.enabled" -> "true",
"spark.ui.port" -> "0",
"spark.yarn.am.memory" -> "640m"
)
}

protected final def buildWorkerInfoAsString(
customProps: Option[mutable.Map[String, String]] = None,
numCores: Option[Int] = Some(32),
systemMemory: Option[String] = Some("122880MiB"),
numWorkers: Option[Int] = Some(4),
gpuCount: Option[Int] = None,
gpuMemory: Option[String] = None,
gpuDevice: Option[String] = None): String = {
val gpuWorkerProps = new GpuWorkerProps(
gpuMemory.getOrElse(""), gpuCount.getOrElse(0), gpuDevice.getOrElse(""))
val cpuSystem = new SystemClusterProps(
numCores.getOrElse(0), systemMemory.getOrElse(""), numWorkers.getOrElse(0))
val systemProperties = customProps match {
case None => mutable.Map[String, String]()
case Some(newProps) => newProps
}
val convertedMap = new util.LinkedHashMap[String, String](systemProperties.asJava)
val clusterProps = new ClusterProperties(cpuSystem, gpuWorkerProps, convertedMap)
// set the options to convert the object into formatted yaml content
val options = new DumperOptions()
options.setIndent(2)
options.setPrettyFlow(true)
options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
val yaml = new Yaml(options)
val rawString = yaml.dump(clusterProps)
// Skip the first line as it contains "the name of the class"
rawString.split("\n").drop(1).mkString("\n")
}

protected def getMockInfoProvider(maxInput: Double,
spilledMetrics: Seq[Long],
jvmGCFractions: Seq[Double],
propsFromLog: mutable.Map[String, String],
sparkVersion: Option[String],
rapidsJars: Seq[String] = Seq(),
distinctLocationPct: Double = 0.0,
redundantReadSize: Long = 0,
meanInput: Double = 0.0,
meanShuffleRead: Double = 0.0,
shuffleStagesWithPosSpilling: Set[Long] = Set(),
shuffleSkewStages: Set[Long] = Set()): AppSummaryInfoBaseProvider = {
new AppInfoProviderMockTest(maxInput, spilledMetrics, jvmGCFractions, propsFromLog,
sparkVersion, rapidsJars, distinctLocationPct, redundantReadSize, meanInput, meanShuffleRead,
shuffleStagesWithPosSpilling, shuffleSkewStages)
}
}
Loading

0 comments on commit b4c84c0

Please sign in to comment.