Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding EMR-specific tunings for shuffle manager and ignoring jar #1419

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*/
def isPlatformCSP: Boolean = false

/**
* Indicate if the platform requires path recommendations
*/
def requirePathRecommendations: Boolean = true

/**
* The maximum number of Gpus any instance in this platform supports.
*/
Expand Down Expand Up @@ -641,6 +646,7 @@ class EmrPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = A10GGpu

override def isPlatformCSP: Boolean = true
override def requirePathRecommendations: Boolean = false

override def getRetainedSystemProps: Set[String] = Set("EMR_CLUSTER_ID")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,9 @@ class AutoTuner(
recommendShufflePartitions()
recommendKryoSerializerSetting()
recommendGCProperty()
recommendClassPathEntries()
if (platform.requirePathRecommendations) {
recommendClassPathEntries()
}
recommendSystemProperties()
}

Expand Down Expand Up @@ -757,6 +759,14 @@ class AutoTuner(
case ver if ver.contains("11.3") => "330db"
case _ => "332db"
}
} else if (sparkVersion.contains("amzn")) {
sparkVersion match {
case ver if ver.contains("3.5.1") => "351"
case ver if ver.contains("3.5.0") => "350"
case ver if ver.contains("3.4.1") => "341"
case ver if ver.contains("3.4.0") => "340"
case _ => "332"
}
} else {
shuffleManagerVersion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.yaml.snakeyaml.{DumperOptions, Yaml}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil


case class DriverInfoProviderMockTest(unsupportedOps: Seq[DriverLogUnsupportedOperators])
extends BaseDriverLogInfoProvider(None) {
override def getUnsupportedOperators: Seq[DriverLogUnsupportedOperators] = unsupportedOps
Expand Down Expand Up @@ -2551,4 +2550,71 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Test EMR sets shuffle manager properly and doesn't need Spark RAPIDS jar") {
// mock the properties loaded from eventLog
val logEventsProps: mutable.Map[String, String] =
mutable.LinkedHashMap[String, String](
"spark.executor.cores" -> "32",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "80g",
"spark.executor.resource.gpu.amount" -> "1",
"spark.executor.instances" -> "1"
)
val emrWorkerInfo = buildWorkerInfoAsString(None, Some(32),
Some("212992MiB"), Some(5), Some(4), Some(T4Gpu.getMemory), Some(T4Gpu.toString))
val infoProvider = getMockInfoProvider(0, Seq(0), Seq(0.0),
logEventsProps, Some("3.4.1-amzn-1"))
val clusterPropsOpt = loadClusterPropertiesFromContent(emrWorkerInfo)
val platform = PlatformFactory.createInstance(PlatformNames.EMR, clusterPropsOpt)
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(emrWorkerInfo, infoProvider,
platform)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.instances=20
|--conf spark.executor.memory=24000m
|--conf spark.executor.memoryOverhead=15564m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=48
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=48
|--conf spark.rapids.sql.batchSizeBytes=2147483647
|--conf spark.rapids.sql.concurrentGpuTasks=2
|--conf spark.rapids.sql.format.parquet.multithreaded.combine.waitTime=1000
|--conf spark.rapids.sql.multiThreadedRead.numThreads=64
|--conf spark.rapids.sql.reader.multithreaded.combine.sizeBytes=10485760
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark341.RapidsShuffleManager
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionSize=4m
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|--conf spark.task.resource.gpu.amount=0.03125
|
|Comments:
|- 'spark.executor.memoryOverhead' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
|- 'spark.rapids.sql.batchSizeBytes' was not set.
|- 'spark.rapids.sql.concurrentGpuTasks' was not set.
|- 'spark.rapids.sql.format.parquet.multithreaded.combine.waitTime' was not set.
|- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set.
|- 'spark.rapids.sql.reader.multithreaded.combine.sizeBytes' was not set.
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionSize' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' was not set.
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}
}