Skip to content

Commit

Permalink
[FEA] AutoTuner warns that non-utf8 may not support some GPU expressions
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Fixes NVIDIA#713, Fixes NVIDIA#734

This PR changes the behavior of the AutTuner to display a comment when
the file-encoding of an application is set to a value that is not "utf-8".

The changes also improves the extraction of the RAPIDS jars values.

*Changes for 713*:

- Added a new field in `ApplicationSummaryInfo` that represents the
  systemProperties
- Capture SystemProperties in the App in order to be able to check the
  file-encoding
- Moved map properties to `CacheableProps` so that it can be used by the
  Qualification as well.
- Moved String-conversion methods from AutTuner to StringUtils
- Moved `getEventFromJsonMethod` to EventUtils object
- Added a new UnitTest for the AutoTuner
- Updated ApplicationInfoSuite unitTests

*Changes for 734*:

- Fixed the implementation of `CollectInformation.getRapidsJARInfo`
  • Loading branch information
amahussein committed Jan 22, 2024
1 parent 418c3e7 commit 446335f
Show file tree
Hide file tree
Showing 18 changed files with 459 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,38 @@

package com.nvidia.spark.rapids.tool.profiling

import org.apache.spark.sql.rapids.tool.ToolUtils

case class ApplicationSummaryInfo(
val appInfo: Seq[AppInfoProfileResults],
val dsInfo: Seq[DataSourceProfileResult],
val execInfo: Seq[ExecutorInfoProfileResult],
val jobInfo: Seq[JobInfoProfileResult],
val rapidsProps: Seq[RapidsPropertyProfileResult],
val rapidsJar: Seq[RapidsJarProfileResult],
val sqlMetrics: Seq[SQLAccumProfileResults],
val jsMetAgg: Seq[JobStageAggTaskMetricsProfileResult],
val sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult],
val durAndCpuMet: Seq[SQLDurationExecutorTimeProfileResult],
val skewInfo: Seq[ShuffleSkewProfileResult],
val failedTasks: Seq[FailedTaskProfileResults],
val failedStages: Seq[FailedStagesProfileResults],
val failedJobs: Seq[FailedJobsProfileResults],
val removedBMs: Seq[BlockManagerRemovedProfileResult],
val removedExecutors: Seq[ExecutorsRemovedProfileResult],
val unsupportedOps: Seq[UnsupportedOpsProfileResult],
val sparkProps: Seq[RapidsPropertyProfileResult],
val sqlStageInfo: Seq[SQLStageInfoProfileResult],
val wholeStage: Seq[WholeStageCodeGenResults],
val maxTaskInputBytesRead: Seq[SQLMaxTaskInputSizes],
val appLogPath: Seq[AppLogPathProfileResults],
val ioMetrics: Seq[IOAnalysisProfileResult])
appInfo: Seq[AppInfoProfileResults],
dsInfo: Seq[DataSourceProfileResult],
execInfo: Seq[ExecutorInfoProfileResult],
jobInfo: Seq[JobInfoProfileResult],
rapidsProps: Seq[RapidsPropertyProfileResult],
rapidsJar: Seq[RapidsJarProfileResult],
sqlMetrics: Seq[SQLAccumProfileResults],
jsMetAgg: Seq[JobStageAggTaskMetricsProfileResult],
sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult],
durAndCpuMet: Seq[SQLDurationExecutorTimeProfileResult],
skewInfo: Seq[ShuffleSkewProfileResult],
failedTasks: Seq[FailedTaskProfileResults],
failedStages: Seq[FailedStagesProfileResults],
failedJobs: Seq[FailedJobsProfileResults],
removedBMs: Seq[BlockManagerRemovedProfileResult],
removedExecutors: Seq[ExecutorsRemovedProfileResult],
unsupportedOps: Seq[UnsupportedOpsProfileResult],
sparkProps: Seq[RapidsPropertyProfileResult],
sqlStageInfo: Seq[SQLStageInfoProfileResult],
wholeStage: Seq[WholeStageCodeGenResults],
maxTaskInputBytesRead: Seq[SQLMaxTaskInputSizes],
appLogPath: Seq[AppLogPathProfileResults],
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult])

trait AppInfoPropertyGetter {
def getSparkProperty(propKey: String): Option[String]
def getRapidsProperty(propKey: String): Option[String]
def getSystemProperty(propKey: String): Option[String]
def getProperty(propKey: String): Option[String]
def getSparkVersion: Option[String]
def getRapidsJars: Seq[String]
Expand Down Expand Up @@ -76,7 +80,16 @@ class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter
def isAppInfoAvailable = false
override def getSparkProperty(propKey: String): Option[String] = None
override def getRapidsProperty(propKey: String): Option[String] = None
override def getProperty(propKey: String): Option[String] = None
override def getSystemProperty(propKey: String): Option[String] = None
override def getProperty(propKey: String): Option[String] = {
if (propKey.startsWith(ToolUtils.PROPS_RAPIDS_KEY_PREFIX)) {
getRapidsProperty(propKey)
} else if (propKey.startsWith("spark")){
getSparkProperty(propKey)
} else {
getSystemProperty(propKey)
}
}
override def getSparkVersion: Option[String] = None
override def getMaxInput: Double = 0.0
override def getMeanInput: Double = 0.0
Expand Down Expand Up @@ -118,12 +131,8 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo)
findPropertyInProfPropertyResults(propKey, app.rapidsProps)
}

override def getProperty(propKey: String): Option[String] = {
if (propKey.startsWith("spark.rapids")) {
getRapidsProperty(propKey)
} else {
getSparkProperty(propKey)
}
override def getSystemProperty(propKey: String): Option[String] = {
findPropertyInProfPropertyResults(propKey, app.sysProps)
}

override def getSparkVersion: Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.yaml.snakeyaml.representer.Representer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{GpuTypes, ToolUtils}
import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil
import org.apache.spark.sql.rapids.tool.util.{StringUtils, WebCrawlerUtil}

/**
* A wrapper class that stores all the GPU properties.
Expand Down Expand Up @@ -222,9 +222,9 @@ class RecommendationEntry(val name: String,
propValue match {
case None => None
case Some(value) =>
if (AutoTuner.containsMemoryUnits(value)) {
if (StringUtils.isMemorySize(value)) {
// if it is memory return the bytes unit
Some(s"${AutoTuner.convertFromHumanReadableSize(value)}")
Some(s"${StringUtils.convertMemorySizeToBytes(value)}")
} else {
propValue
}
Expand Down Expand Up @@ -466,7 +466,7 @@ class AutoTuner(
*/
def calcGpuConcTasks(): Long = {
Math.min(MAX_CONC_GPU_TASKS,
convertToMB(clusterProps.gpu.memory) / DEF_GPU_MEM_PER_TASK_MB)
StringUtils.convertToMB(clusterProps.gpu.memory) / DEF_GPU_MEM_PER_TASK_MB)
}

/**
Expand All @@ -477,7 +477,7 @@ class AutoTuner(
private def calcAvailableMemPerExec(): Double = {
// account for system overhead
val usableWorkerMem =
Math.max(0, convertToMB(clusterProps.system.memory) - DEF_SYSTEM_RESERVE_MB)
Math.max(0, StringUtils.convertToMB(clusterProps.system.memory) - DEF_SYSTEM_RESERVE_MB)
// clusterProps.gpu.getCount can never be 0. This is verified in processPropsAndCheck()
(1.0 * usableWorkerMem) / clusterProps.gpu.getCount
}
Expand Down Expand Up @@ -608,6 +608,7 @@ class AutoTuner(
recommendShufflePartitions()
recommendGCProperty()
recommendClassPathEntries()
recommendSystemProperties()
}

def getShuffleManagerClassName() : Option[String] = {
Expand Down Expand Up @@ -735,22 +736,36 @@ class AutoTuner(
}

val autoBroadcastJoinThresholdProperty =
getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(convertToMB)
getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(StringUtils.convertToMB)
if (autoBroadcastJoinThresholdProperty.isEmpty) {
appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.")
} else if (autoBroadcastJoinThresholdProperty.get >
convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) {
StringUtils.convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) {
appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " +
s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance\n" +
" regression. Should be set to a lower number.")
}
}

/**
* Checks the system properties and give feedback to the user.
* For example file.encoding=UTF-8 is required for some ops like GpuRegEX.
*/
private def recommendSystemProperties(): Unit = {
appInfoProvider.getSystemProperty("file.encoding").collect {
case encoding =>
if (!ToolUtils.isFileEncodingRecommended(encoding)) {
appendComment(s"file.encoding should be [${ToolUtils.SUPPORTED_ENCODINGS.mkString}]" +
s" because GPU only supports the charset when using some expressions.")
}
}
}

/**
* Check the class path entries with the following rules:
* 1- If ".*rapids-4-spark.*jar" is missing then add a comment that the latest jar should be
* included in the classpath unless it is part of the spark
* 2- If there are more than 1 entry for ".*rapids-4-spark.*jar", then add a comment that the
* 2- If there are more than 1 entry for ".*rapids-4-spark.*jar", then add a comment that
* there should be only 1 jar in the class path.
* 3- If there are cudf jars, ignore that for now.
* 4- If there is a new release recommend that to the user
Expand Down Expand Up @@ -811,7 +826,7 @@ class AutoTuner(
private def calculateMaxPartitionBytes(maxPartitionBytes: String): String = {
// AutoTuner only supports a single app right now, so we get whatever value is here
val inputBytesMax = appInfoProvider.getMaxInput / 1024 / 1024
val maxPartitionBytesNum = convertToMB(maxPartitionBytes)
val maxPartitionBytesNum = StringUtils.convertToMB(maxPartitionBytes)
if (inputBytesMax == 0.0) {
maxPartitionBytesNum.toString
} else {
Expand Down Expand Up @@ -860,7 +875,7 @@ class AutoTuner(
if (isCalculationEnabled("spark.sql.files.maxPartitionBytes")) {
calculateMaxPartitionBytes(maxPartitionProp)
} else {
s"${convertToMB(maxPartitionProp)}"
s"${StringUtils.convertToMB(maxPartitionProp)}"
}
appendRecommendationForMemoryMB("spark.sql.files.maxPartitionBytes", recommended)
}
Expand Down Expand Up @@ -1221,59 +1236,6 @@ object AutoTuner extends Logging {
}
}

/**
* Converts size from human readable to bytes.
* Eg, "4m" -> 4194304.
*/
def convertFromHumanReadableSize(size: String): Long = {
val sizesArr = size.toLowerCase.split("(?=[a-z])")
val sizeNum = sizesArr(0).toDouble
if (sizesArr.length > 1) {
val sizeUnit = sizesArr(1)
assert(SUPPORTED_SIZE_UNITS.contains(sizeUnit), s"$size is not a valid human readable size")
(sizeNum * Math.pow(1024, SUPPORTED_SIZE_UNITS.indexOf(sizeUnit))).toLong
} else {
sizeNum.toLong
}
}

def containsMemoryUnits(size: String): Boolean = {
val sizesArr = size.toLowerCase.split("(?=[a-z])")
if (sizesArr.length > 1) {
SUPPORTED_SIZE_UNITS.contains(sizesArr(1))
} else {
false
}
}

def convertToMB(size: String): Long = {
convertFromHumanReadableSize(size) / (1024 * 1024)
}

/**
* Converts size from bytes to human readable.
* Eg, 4194304 -> "4m", 633554 -> "618.70k".
*/
def convertToHumanReadableSize(size: Long): String = {
if (size < 0L) {
return "0b"
}

val unitIndex = (Math.log10(size) / Math.log10(1024)).toInt
assert(unitIndex < SUPPORTED_SIZE_UNITS.size,
s"$size is too large to convert to human readable size")

val sizeNum = size * 1.0/Math.pow(1024, unitIndex)
val sizeUnit = SUPPORTED_SIZE_UNITS(unitIndex)

// If sizeNum is an integer omit fraction part
if ((sizeNum % 1) == 0) {
f"${sizeNum.toLong}$sizeUnit"
} else {
f"$sizeNum%.2f$sizeUnit"
}
}

/**
* Given the spark property "spark.master", it checks whether memoryOverhead should be
* enabled/disabled. For Spark Standalone Mode, memoryOverhead property is skipped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.ToolTextFileWriter

import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.rapids.tool.SQLMetricsStats
import org.apache.spark.sql.rapids.tool.{SQLMetricsStats, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

case class StageMetrics(numTasks: Int, duration: String)
Expand Down Expand Up @@ -53,7 +53,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
AppLogPathProfileResults(app.index, a.appName, a.appId, app.eventLogPath)
}
if (allRows.nonEmpty) {
allRows.sortBy(cols => (cols.appIndex))
allRows.sortBy(cols => cols.appIndex)
} else {
Seq.empty
}
Expand All @@ -63,12 +63,18 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
def getRapidsJARInfo: Seq[RapidsJarProfileResult] = {
val allRows = apps.flatMap { app =>
if (app.gpuMode) {
// Look for rapids-4-spark and cuDF jar
val rapidsJar = app.classpathEntries.filterKeys(_ matches ".*rapids-4-spark.*jar")
val cuDFJar = app.classpathEntries.filterKeys(_ matches ".*cudf.*jar")
val cols = (rapidsJar.keys ++ cuDFJar.keys).toSeq
val rowsWithAppindex = cols.map(jar => RapidsJarProfileResult(app.index, jar))
rowsWithAppindex
// Look for rapids-4-spark and cuDF jar in classPathEntries
val rapidsJars = app.classpathEntries.filterKeys(_ matches ToolUtils.RAPIDS_JAR_REGEX.regex)
if (rapidsJars.nonEmpty) {
val cols = rapidsJars.keys.toSeq
val rowsWithAppindex = cols.map(jar => RapidsJarProfileResult(app.index, jar))
rowsWithAppindex
} else {
// Look for the rapids-4-spark and cuDF jars in Spark Properties
ToolUtils.extractRAPIDSJarsFromProps(app.sparkProperties).map {
jar => RapidsJarProfileResult(app.index, jar)
}
}
} else {
Seq.empty
}
Expand Down Expand Up @@ -203,25 +209,30 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
}
}

// Print RAPIDS related or all Spark Properties
// This table is inverse of the other tables where the row keys are
// property keys and the columns are the application values. So
// column1 would be all the key values for app index 1.
def getProperties(rapidsOnly: Boolean): Seq[RapidsPropertyProfileResult] = {
/**
* Print RAPIDS related or all Spark Properties when the propSource is set to "rapids".
* Note that RAPIDS related properties are not necessarily starting with prefix 'spark.rapids'.
* This table is inverse of the other tables where the row keys are property keys and the columns
* are the application values. So column1 would be all the key values for app index 1.
* @param propSource defines which type of properties to be retrieved the properties from.
* It can be: rapids, spark, or system
* @return List of properties relevant to the source.
*/
private def getProperties(propSource: String): Seq[RapidsPropertyProfileResult] = {
val outputHeaders = ArrayBuffer("propertyName")
val props = HashMap[String, ArrayBuffer[String]]()
var numApps = 0
apps.foreach { app =>
numApps += 1
outputHeaders += s"appIndex_${app.index}"
val propsToKeep = if (rapidsOnly) {
app.sparkProperties.filterKeys { key =>
key.startsWith("spark.rapids") || key.startsWith("spark.executorEnv.UCX") ||
key.startsWith("spark.shuffle.manager") || key.equals("spark.shuffle.service.enabled")
}
} else {
val propsToKeep = if (propSource.equals("rapids")) {
app.sparkProperties.filterKeys { ToolUtils.isRapidsPropKey(_) }
} else if (propSource.equals("spark")) {
// remove the rapids related ones
app.sparkProperties.filterKeys(key => !(key.contains("spark.rapids")))
app.sparkProperties.filterKeys(key => !key.contains(ToolUtils.PROPS_RAPIDS_KEY_PREFIX))
} else {
// get the system properties
app.systemProperties
}
CollectInformation.addNewProps(propsToKeep, props, numApps)
}
Expand All @@ -234,6 +245,16 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
}
}

def getRapidsProperties: Seq[RapidsPropertyProfileResult] = {
getProperties("rapids")
}
def getSparkProperties: Seq[RapidsPropertyProfileResult] = {
getProperties("spark")
}
def getSystemProperties: Seq[RapidsPropertyProfileResult] = {
getProperties("system")
}

// Print SQL whole stage code gen mapping
def getWholeStageCodeGenMapping: Seq[WholeStageCodeGenResults] = {
val allWholeStages = apps.flatMap { app =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,13 +33,6 @@ object ProfileUtils {
.getOrCreate()
}

// Convert a null-able String to Option[Long]
def stringToLong(in: String): Option[Long] = try {
Some(in.toLong)
} catch {
case _: NumberFormatException => None
}

// Convert Option[Long] to String
def optionLongToString(in: Option[Long]): String = try {
in.get.toString
Expand All @@ -48,7 +41,7 @@ object ProfileUtils {
}

// Check if the job/stage is GPU mode is on
def isPluginEnabled(properties: collection.mutable.Map[String, String]): Boolean = {
def isPluginEnabled(properties: collection.Map[String, String]): Boolean = {
ToolUtils.isPluginEnabled(properties.toMap)
}

Expand Down
Loading

0 comments on commit 446335f

Please sign in to comment.