Skip to content

Commit

Permalink
[FEA] Enable AQE related recommendations in Profiler Auto-tuner (#688)
Browse files Browse the repository at this point in the history
* Add recommendations in AutoTuner for AQE configs and unit tests

---------

Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Co-authored-by: Partho Sarthi <[email protected]>
  • Loading branch information
cindyyuanjiang and parthosa authored Jan 11, 2024
1 parent 63b3671 commit c0b4ddf
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ trait AppInfoSqlTaskAggMetricsVisitor {
def getSpilledMetrics: Seq[Long]
}

trait AppInfoSQLMaxTaskInputSizes {
trait AppInfoSQLTaskInputSizes {
def getMaxInput: Double
def getMeanInput: Double
def getMeanShuffleRead: Double
}

trait AppInfoReadMetrics {
Expand All @@ -69,13 +71,15 @@ trait AppInfoReadMetrics {
*/
class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter
with AppInfoSqlTaskAggMetricsVisitor
with AppInfoSQLMaxTaskInputSizes
with AppInfoSQLTaskInputSizes
with AppInfoReadMetrics {
override def getSparkProperty(propKey: String): Option[String] = None
override def getRapidsProperty(propKey: String): Option[String] = None
override def getProperty(propKey: String): Option[String] = None
override def getSparkVersion: Option[String] = None
override def getMaxInput: Double = 0.0
override def getMeanInput: Double = 0.0
override def getMeanShuffleRead: Double = 0.0
override def getJvmGCFractions: Seq[Double] = Seq()
override def getSpilledMetrics: Seq[Long] = Seq()
override def getRapidsJars: Seq[String] = Seq()
Expand Down Expand Up @@ -161,4 +165,20 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo)
.values
.sum
}

override def getMeanInput: Double = {
if (app.ioMetrics.nonEmpty) {
app.ioMetrics.map(_.inputBytesReadSum).sum * 1.0 / app.ioMetrics.size
} else {
0.0
}
}

override def getMeanShuffleRead: Double = {
if (app.ioMetrics.nonEmpty) {
app.ioMetrics.map(_.srTotalBytesReadSum).sum * 1.0 / app.ioMetrics.size
} else {
0.0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.yaml.snakeyaml.constructor.{Constructor, ConstructorException}
import org.yaml.snakeyaml.representer.Representer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.{GpuTypes, ToolUtils}
import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil

/**
Expand Down Expand Up @@ -79,15 +79,18 @@ class GpuWorkerProps(
}

/**
* If the GPU memory is missing, it will sets a default valued based on the GPU device and the
* static HashMap [[AutoTuner.DEF_WORKER_GPU_MEMORY_MB]].
* If the GPU memory is missing, it will sets a default valued based on the GPU device type.
* If it is still missing, it sets a default to 15109m.
*
* @return true if the value has been updated.
*/
def setDefaultGpuMemIfMissing(): Boolean = {
if (memory == null || memory.isEmpty || memory.startsWith("0")) {
memory = AutoTuner.DEF_WORKER_GPU_MEMORY_MB.getOrElse(getName, "15109m")
memory = try {
GpuTypes.getGpuMem(getName)
} catch {
case _: IllegalArgumentException => "15109m"
}
true
} else {
false
Expand Down Expand Up @@ -586,6 +589,7 @@ class AutoTuner(
appendRecommendation("spark.rapids.shuffle.multiThreaded.writer.threads", numExecutorCores)
appendRecommendation("spark.rapids.sql.multiThreadedRead.numThreads",
Math.max(20, numExecutorCores))
appendRecommendation("spark.rapids.sql.batchSizeBytes", BATCH_SIZE_BYTES)

recommendAQEProperties()
}
Expand Down Expand Up @@ -656,7 +660,7 @@ class AutoTuner(

private def recommendAQEProperties(): Unit = {
val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled")
.getOrElse("false").toLowerCase
.getOrElse("false").toLowerCase
if (aqeEnabled == "false") {
appendComment(commentsForMissingProps("spark.sql.adaptive.enabled"))
}
Expand All @@ -682,11 +686,56 @@ class AutoTuner(
}
case None =>
}
if (getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes").isEmpty) {
// The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear
// scaling until it is full and 128m makes the GPU more full, but too large can be slightly
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
val advisoryPartitionSizeProperty =
getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes")
if (appInfoProvider.getMeanInput < AQE_INPUT_SIZE_BYTES_THRESHOLD) {
if(advisoryPartitionSizeProperty.isEmpty) {
// The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear
// scaling until it is full and 128m makes the GPU more full, but too large can be slightly
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
}
}
if (appInfoProvider.getMeanInput > AQE_INPUT_SIZE_BYTES_THRESHOLD &&
appInfoProvider.getMeanShuffleRead > AQE_SHUFFLE_READ_BYTES_THRESHOLD) {
// AQE Recommendations for large input and large shuffle reads
val advisoryPartitionSizeInBytes = clusterProps.gpu.name match {
case GpuTypes.A100 => "64m"
case GpuTypes.T4 => "32m"
case _ => AQE_DEF_ADVISORY_PARTITION_SIZE
}
appendRecommendation(
"spark.sql.adaptive.advisoryPartitionSizeInBytes",
advisoryPartitionSizeInBytes
)
val initialPartitionNumProperty =
getPropertyValue("spark.sql.adaptive.coalescePartitions.initialPartitionNum").map(_.toInt)
if (initialPartitionNumProperty.getOrElse(0) <= AQE_MIN_INITIAL_PARTITION_NUM) {
val initialPartitionNum = clusterProps.gpu.name match {
case GpuTypes.A100 => 400
case GpuTypes.T4 => 800
case _ => AQE_DEF_INITIAL_PARTITION_NUM
}
appendRecommendation(
"spark.sql.adaptive.coalescePartitions.initialPartitionNum",
initialPartitionNum
)
}
// We need to set this to false, else Spark ignores the target size specified by
// spark.sql.adaptive.advisoryPartitionSizeInBytes.
// Reference: https://spark.apache.org/docs/latest/sql-performance-tuning.html
appendRecommendation("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false")
}

val autoBroadcastJoinThresholdProperty =
getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(convertToMB)
if (autoBroadcastJoinThresholdProperty.isEmpty) {
appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.")
} else if (autoBroadcastJoinThresholdProperty.get >
convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) {
appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " +
s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance\n" +
" regression. Should be set to a lower number.")
}
}

Expand Down Expand Up @@ -995,10 +1044,14 @@ object AutoTuner extends Logging {
// GPU count defaults to 1 if it is missing.
val DEF_WORKER_GPU_COUNT = 1
// GPU default device is T4
val DEF_WORKER_GPU_NAME = "T4"
val DEF_WORKER_GPU_NAME = GpuTypes.T4
// T4 default memory is 16G
// A100 set default to 40GB
val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map("T4"-> "15109m", "A100" -> "40960m")
val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map(
GpuTypes.T4 -> "15109m", GpuTypes.A100 -> "40960m", GpuTypes.V100 -> "16384m",
GpuTypes.K80 -> "12288m", GpuTypes.P100 -> "16384m", GpuTypes.P100 -> "16384m",
GpuTypes.P4 -> "8192m", GpuTypes.L4 -> "24576m", GpuTypes.A10 -> "24576m",
GpuTypes.A10G -> "24576m")
// Default Number of Workers 1
val DEF_NUM_WORKERS = 1
// Default distinct read location thresholds is 50%
Expand All @@ -1009,6 +1062,14 @@ object AutoTuner extends Logging {
val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p")
private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" +
"additional-functionality/advanced_configs.html#advanced-configuration"
// Value of batchSizeBytes that performs best overall
private val BATCH_SIZE_BYTES = 2147483647
private val AQE_INPUT_SIZE_BYTES_THRESHOLD = 35000
private val AQE_SHUFFLE_READ_BYTES_THRESHOLD = 50000
private val AQE_DEF_ADVISORY_PARTITION_SIZE = "32m"
private val AQE_DEF_INITIAL_PARTITION_NUM = 800
private val AQE_MIN_INITIAL_PARTITION_NUM = 200
private val AQE_AUTOBROADCAST_JOIN_THRESHOLD = "100m"

val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.memory" ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,30 @@ object SupportedMLFuncsName {
}

case class GpuEventLogException(message: String) extends Exception(message)

object GpuTypes {
val A100 = "A100"
val T4 = "T4"
val V100 = "V100"
val K80 = "K80"
val P100 = "P100"
val P4 = "P4"
val L4 = "L4"
val A10 = "A10"
val A10G = "A10G"

def getGpuMem(gpu: String): String = {
gpu match {
case A100 => "40960m" // A100 set default to 40GB
case T4 => "15109m" // T4 default memory is 16G
case V100 => "16384m"
case K80 => "12288m"
case P100 => "16384m"
case P4 => "8192m"
case L4 => "24576m"
case A10 => "24576m"
case A10G => "24576m"
case _ => throw new IllegalArgumentException(s"Invalid input gpu type: $gpu")
}
}
}
Loading

0 comments on commit c0b4ddf

Please sign in to comment.