Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Enable AQE related recommendations in Profiler Auto-tuner #688

Merged
merged 8 commits into from
Jan 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ trait AppInfoSqlTaskAggMetricsVisitor {
def getSpilledMetrics: Seq[Long]
}

trait AppInfoSQLMaxTaskInputSizes {
trait AppInfoSQLTaskInputSizes {
def getMaxInput: Double
def getMeanInput: Double
def getMeanShuffleRead: Double
}

trait AppInfoReadMetrics {
Expand All @@ -69,13 +71,15 @@ trait AppInfoReadMetrics {
*/
class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter
with AppInfoSqlTaskAggMetricsVisitor
with AppInfoSQLMaxTaskInputSizes
with AppInfoSQLTaskInputSizes
with AppInfoReadMetrics {
override def getSparkProperty(propKey: String): Option[String] = None
override def getRapidsProperty(propKey: String): Option[String] = None
override def getProperty(propKey: String): Option[String] = None
override def getSparkVersion: Option[String] = None
override def getMaxInput: Double = 0.0
override def getMeanInput: Double = 0.0
override def getMeanShuffleRead: Double = 0.0
override def getJvmGCFractions: Seq[Double] = Seq()
override def getSpilledMetrics: Seq[Long] = Seq()
override def getRapidsJars: Seq[String] = Seq()
Expand Down Expand Up @@ -161,4 +165,20 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo)
.values
.sum
}

override def getMeanInput: Double = {
if (app.ioMetrics.nonEmpty) {
app.ioMetrics.map(_.inputBytesReadSum).sum * 1.0 / app.ioMetrics.size
} else {
0.0
}
}

override def getMeanShuffleRead: Double = {
if (app.ioMetrics.nonEmpty) {
app.ioMetrics.map(_.srTotalBytesReadSum).sum * 1.0 / app.ioMetrics.size
} else {
0.0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.yaml.snakeyaml.constructor.{Constructor, ConstructorException}
import org.yaml.snakeyaml.representer.Representer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.{GpuTypes, ToolUtils}
import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil

/**
Expand Down Expand Up @@ -586,6 +586,7 @@ class AutoTuner(
appendRecommendation("spark.rapids.shuffle.multiThreaded.writer.threads", numExecutorCores)
appendRecommendation("spark.rapids.sql.multiThreadedRead.numThreads",
Math.max(20, numExecutorCores))
appendRecommendation("spark.rapids.sql.batchSizeBytes", 2147483647)
amahussein marked this conversation as resolved.
Show resolved Hide resolved

recommendAQEProperties()
}
Expand Down Expand Up @@ -641,7 +642,7 @@ class AutoTuner(

private def recommendAQEProperties(): Unit = {
val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled")
.getOrElse("false").toLowerCase
.getOrElse("false").toLowerCase
if (aqeEnabled == "false") {
appendComment(commentsForMissingProps("spark.sql.adaptive.enabled"))
}
Expand All @@ -667,11 +668,54 @@ class AutoTuner(
}
case None =>
}
if (getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes").isEmpty) {
// The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear
// scaling until it is full and 128m makes the GPU more full, but too large can be slightly
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
val advisoryPartitionSizeProperty =
getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes")
val initialPartitionNumProperty =
amahussein marked this conversation as resolved.
Show resolved Hide resolved
getPropertyValue("spark.sql.adaptive.coalescePartitions.initialPartitionNum").map(_.toInt)
if (appInfoProvider.getMeanInput < AQE_INPUT_SIZE_BYTES_THRESHOLD) {
amahussein marked this conversation as resolved.
Show resolved Hide resolved
if(advisoryPartitionSizeProperty.isEmpty) {
// The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear
// scaling until it is full and 128m makes the GPU more full, but too large can be slightly
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
}
} else if (appInfoProvider.getMeanShuffleRead > AQE_SHUFFLE_READ_BYTES_THRESHOLD) {
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// AQE Recommendations for large input and large shuffle reads
val advisoryPartitionSizeInBytes = clusterProps.gpu.name match {
case GpuTypes.A100 => "64m"
case GpuTypes.T4 => "32m"
case _ => AQE_DEF_ADVISORY_PARTITION_SIZE
}
appendRecommendation(
"spark.sql.adaptive.advisoryPartitionSizeInBytes",
advisoryPartitionSizeInBytes
)
val initialPartitionNum = clusterProps.gpu.name match {
case GpuTypes.A100 => 400
case GpuTypes.T4 => 800
case _ => AQE_DEF_INITIAL_PARTITION_NUM
}
if (initialPartitionNumProperty.getOrElse(0) <= AQE_MIN_INITIAL_PARTITION_NUM) {
appendRecommendation(
"spark.sql.adaptive.coalescePartitions.initialPartitionNum",
initialPartitionNum
)
}
// We need to set this to false, else Spark ignores the target size specified by
// spark.sql.adaptive.advisoryPartitionSizeInBytes.
// Reference: https://spark.apache.org/docs/latest/sql-performance-tuning.html
appendRecommendation("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false")
}

val autoBroadcastJoinThresholdProperty =
getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(convertToMB)
if (autoBroadcastJoinThresholdProperty.isEmpty) {
appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.")
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
} else if (autoBroadcastJoinThresholdProperty.get >
convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) {
appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " +
amahussein marked this conversation as resolved.
Show resolved Hide resolved
s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance regression. " +
"Should be set to a lower number.")
}
}

Expand Down Expand Up @@ -977,7 +1021,8 @@ object AutoTuner extends Logging {
val DEF_WORKER_GPU_NAME = "T4"
amahussein marked this conversation as resolved.
Show resolved Hide resolved
// T4 default memory is 16G
// A100 set default to 40GB
val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map("T4"-> "15109m", "A100" -> "40960m")
val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map(
GpuTypes.T4-> "15109m", GpuTypes.A100 -> "40960m")
amahussein marked this conversation as resolved.
Show resolved Hide resolved
// Default Number of Workers 1
val DEF_NUM_WORKERS = 1
// Default distinct read location thresholds is 50%
Expand All @@ -988,6 +1033,12 @@ object AutoTuner extends Logging {
val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p")
private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" +
"additional-functionality/advanced_configs.html#advanced-configuration"
private val AQE_INPUT_SIZE_BYTES_THRESHOLD = 35000
private val AQE_SHUFFLE_READ_BYTES_THRESHOLD = 50000
private val AQE_DEF_ADVISORY_PARTITION_SIZE = "32m"
private val AQE_DEF_INITIAL_PARTITION_NUM = 800
private val AQE_MIN_INITIAL_PARTITION_NUM = 200
private val AQE_AUTOBROADCAST_JOIN_THRESHOLD = "100m"

val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.memory" ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,8 @@ object SupportedMLFuncsName {
}

case class GpuEventLogException(message: String) extends Exception(message)

object GpuTypes {
amahussein marked this conversation as resolved.
Show resolved Hide resolved
val A100 = "A100"
val T4 = "T4"
amahussein marked this conversation as resolved.
Show resolved Hide resolved
}
amahussein marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading