Skip to content

Commit

Permalink
Added autobroadcastjointhreshold, batchSizeBytes and tests
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>

Add recommendations in AutoTuner for AQE configs for partitions

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Dec 12, 2023
1 parent 9f60c9e commit 52e5039
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ trait AppInfoSqlTaskAggMetricsVisitor {
def getSpilledMetrics: Seq[Long]
}

trait AppInfoSQLMaxTaskInputSizes {
trait AppInfoSQLTaskInputSizes {
def getMaxInput: Double
def getMeanInput: Double
def getMeanShuffleRead: Double
}

trait AppInfoReadMetrics {
Expand All @@ -69,13 +71,15 @@ trait AppInfoReadMetrics {
*/
class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter
with AppInfoSqlTaskAggMetricsVisitor
with AppInfoSQLMaxTaskInputSizes
with AppInfoSQLTaskInputSizes
with AppInfoReadMetrics {
override def getSparkProperty(propKey: String): Option[String] = None
override def getRapidsProperty(propKey: String): Option[String] = None
override def getProperty(propKey: String): Option[String] = None
override def getSparkVersion: Option[String] = None
override def getMaxInput: Double = 0.0
override def getMeanInput: Double = 0.0
override def getMeanShuffleRead: Double = 0.0
override def getJvmGCFractions: Seq[Double] = Seq()
override def getSpilledMetrics: Seq[Long] = Seq()
override def getRapidsJars: Seq[String] = Seq()
Expand Down Expand Up @@ -161,4 +165,20 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo)
.values
.sum
}

override def getMeanInput: Double = {
if (app.ioMetrics.nonEmpty) {
app.ioMetrics.map(_.inputBytesReadSum).sum * 1.0 / app.ioMetrics.size
} else {
0.0
}
}

override def getMeanShuffleRead: Double = {
if (app.ioMetrics.nonEmpty) {
app.ioMetrics.map(_.srTotalBytesReadSum).sum * 1.0 / app.ioMetrics.size
} else {
0.0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.yaml.snakeyaml.constructor.{Constructor, ConstructorException}
import org.yaml.snakeyaml.representer.Representer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.{GpuTypes, ToolUtils}
import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil

/**
Expand Down Expand Up @@ -586,6 +586,7 @@ class AutoTuner(
appendRecommendation("spark.rapids.shuffle.multiThreaded.writer.threads", numExecutorCores)
appendRecommendation("spark.rapids.sql.multiThreadedRead.numThreads",
Math.max(20, numExecutorCores))
appendRecommendation("spark.rapids.sql.batchSizeBytes", 2147483647)

recommendAQEProperties()
}
Expand Down Expand Up @@ -641,7 +642,7 @@ class AutoTuner(

private def recommendAQEProperties(): Unit = {
val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled")
.getOrElse("false").toLowerCase
.getOrElse("false").toLowerCase
if (aqeEnabled == "false") {
appendComment(commentsForMissingProps("spark.sql.adaptive.enabled"))
}
Expand All @@ -667,11 +668,50 @@ class AutoTuner(
}
case None =>
}
if (getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes").isEmpty) {
// The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear
// scaling until it is full and 128m makes the GPU more full, but too large can be slightly
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
val advisoryPartitionSizeProperty =
getPropertyValue("spark.sql.adaptive.advisoryPartitionSizeInBytes")
val initialPartitionNumProperty =
getPropertyValue("spark.sql.adaptive.coalescePartitions.initialPartitionNum").map(_.toInt)
if (appInfoProvider.getMeanInput < AQE_INPUT_SIZE_BYTES_THRESHOLD) {
if(advisoryPartitionSizeProperty.isEmpty) {
// The default is 64m, but 128m is slightly better for the GPU as the GPU has sub-linear
// scaling until it is full and 128m makes the GPU more full, but too large can be slightly
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
}
} else if (appInfoProvider.getMeanShuffleRead > AQE_SHUFFLE_READ_BYTES_THRESHOLD) {
// AQE Recommendations for large input and large shuffle reads
val advisoryPartitionSizeInBytes = clusterProps.gpu.name match {
case GpuTypes.A100 => "64m"
case GpuTypes.T4 => "32m"
case _ => AQE_DEF_ADVISORY_PARTITION_SIZE
}
appendRecommendation(
"spark.sql.adaptive.advisoryPartitionSizeInBytes",
advisoryPartitionSizeInBytes
)
val initialPartitionNum = clusterProps.gpu.name match {
case GpuTypes.A100 => 400
case GpuTypes.T4 => 800
case _ => AQE_DEF_INITIAL_PARTITION_NUM
}
if (initialPartitionNumProperty.getOrElse(0) <= AQE_MIN_INITIAL_PARTITION_NUM) {
appendRecommendation(
"spark.sql.adaptive.coalescePartitions.initialPartitionNum",
initialPartitionNum
)
}
}

val autoBroadcastJoinThresholdProperty =
getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(convertToMB)
if (autoBroadcastJoinThresholdProperty.isEmpty) {
appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.")
} else if (autoBroadcastJoinThresholdProperty.get >
convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) {
appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " +
s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance regression. Should " +
"be set to a lower number.")
}
}

Expand Down Expand Up @@ -977,7 +1017,8 @@ object AutoTuner extends Logging {
val DEF_WORKER_GPU_NAME = "T4"
// T4 default memory is 16G
// A100 set default to 40GB
val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map("T4"-> "15109m", "A100" -> "40960m")
val DEF_WORKER_GPU_MEMORY_MB: Map[String, String] = Map(
GpuTypes.T4-> "15109m", GpuTypes.A100 -> "40960m")
// Default Number of Workers 1
val DEF_NUM_WORKERS = 1
// Default distinct read location thresholds is 50%
Expand All @@ -988,6 +1029,12 @@ object AutoTuner extends Logging {
val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p")
private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" +
"additional-functionality/advanced_configs.html#advanced-configuration"
private val AQE_INPUT_SIZE_BYTES_THRESHOLD = 35000
private val AQE_SHUFFLE_READ_BYTES_THRESHOLD = 50000
private val AQE_DEF_ADVISORY_PARTITION_SIZE = "32m"
private val AQE_DEF_INITIAL_PARTITION_NUM = 800
private val AQE_MIN_INITIAL_PARTITION_NUM = 200
private val AQE_AUTOBROADCAST_JOIN_THRESHOLD = "100m"

val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.memory" ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,8 @@ object SupportedMLFuncsName {
}

case class GpuEventLogException(message: String) extends Exception(message)

object GpuTypes {
val A100 = "A100"
val T4 = "T4"
}
Loading

0 comments on commit 52e5039

Please sign in to comment.