Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding EMR-specific tunings for shuffle manager and ignoring jar #1419

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory}
import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, GpuDevice, Platform, PlatformFactory, PlatformNames}
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
Expand Down Expand Up @@ -757,6 +757,14 @@ class AutoTuner(
case ver if ver.contains("11.3") => "330db"
case _ => "332db"
}
} else if (sparkVersion.contains("amzn")) {
sparkVersion match {
case ver if ver.contains("3.5.1") => "351"
case ver if ver.contains("3.5.0") => "350"
case ver if ver.contains("3.4.1") => "341"
case ver if ver.contains("3.4.0") => "340"
case _ => "332"
}
} else {
shuffleManagerVersion
}
Expand Down Expand Up @@ -889,6 +897,10 @@ class AutoTuner(
val missingRapidsJarsEntry = classPathComments("rapids.jars.missing")
val multipleRapidsJarsEntry = classPathComments("rapids.jars.multiple")

if (platform.platformName == PlatformNames.EMR) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment before that block to explain why we skip EMR?
we can also consider making this as a function in the platform. By default it returns True and EMR returns False.

if (platfrom.requirePathRecommendations()) {
 recommendClassPathEntries()
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the input. added bool in Platforms.scala in latest commit.

return
}

appInfoProvider.getRapidsJars match {
case Seq() =>
// No rapids jars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.yaml.snakeyaml.{DumperOptions, Yaml}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil


case class DriverInfoProviderMockTest(unsupportedOps: Seq[DriverLogUnsupportedOperators])
extends BaseDriverLogInfoProvider(None) {
override def getUnsupportedOperators: Seq[DriverLogUnsupportedOperators] = unsupportedOps
Expand Down Expand Up @@ -2551,4 +2550,71 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Test EMR sets shuffle manager properly and doesn't need Spark RAPIDS jar") {
// mock the properties loaded from eventLog
val logEventsProps: mutable.Map[String, String] =
mutable.LinkedHashMap[String, String](
"spark.executor.cores" -> "32",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "80g",
"spark.executor.resource.gpu.amount" -> "1",
"spark.executor.instances" -> "1"
)
val emrWorkerInfo = buildWorkerInfoAsString(None, Some(32),
Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString))
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
logEventsProps, Some("3.4.1-amzn-1"))
val clusterPropsOpt = loadClusterPropertiesFromContent(emrWorkerInfo)
val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt)
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(emrWorkerInfo, infoProvider,
platform)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.instances=20
|--conf spark.executor.memory=24000m
|--conf spark.executor.memoryOverhead=15564m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=48
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=48
|--conf spark.rapids.sql.batchSizeBytes=2147483647
|--conf spark.rapids.sql.concurrentGpuTasks=2
|--conf spark.rapids.sql.format.parquet.multithreaded.combine.waitTime=1000
|--conf spark.rapids.sql.multiThreadedRead.numThreads=64
|--conf spark.rapids.sql.reader.multithreaded.combine.sizeBytes=10485760
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark341.RapidsShuffleManager
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionSize=4m
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|--conf spark.task.resource.gpu.amount=0.03125
|
|Comments:
|- 'spark.executor.memoryOverhead' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
|- 'spark.rapids.sql.batchSizeBytes' was not set.
|- 'spark.rapids.sql.concurrentGpuTasks' was not set.
|- 'spark.rapids.sql.format.parquet.multithreaded.combine.waitTime' was not set.
|- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set.
|- 'spark.rapids.sql.reader.multithreaded.combine.sizeBytes' was not set.
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionSize' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' was not set.
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}
}