Skip to content

Commit

Permalink
Updating autotuner to generation recommendation always, even without …
Browse files Browse the repository at this point in the history
…cluster info (#650)

Signed-off-by: mattahrens <[email protected]>
  • Loading branch information
mattahrens authored Nov 7, 2023
1 parent a5683c3 commit 876c407
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ class AutoTuner(
}
}

def calculateRecommendations(): Unit = {
def calculateClusterLevelRecommendations(): Unit = {
recommendExecutorInstances()
val numExecutorCores = calcNumExecutorCores
val execCoresExpr = () => numExecutorCores
Expand All @@ -593,6 +593,10 @@ class AutoTuner(
appendRecommendation("spark.rapids.sql.multiThreadedRead.numThreads",
Math.max(20, numExecutorCores))

recommendAQEProperties()
}

def calculateJobLevelRecommendations(): Unit = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
appendRecommendation("spark.shuffle.manager",
"com.nvidia.spark.rapids.spark" + shuffleManagerVersion + ".RapidsShuffleManager")
Expand All @@ -601,7 +605,7 @@ class AutoTuner(
recommendFileCache()
recommendMaxPartitionBytes()
recommendShufflePartitions()
recommendGeneralProperties()
recommendGCProperty()
recommendClassPathEntries()
}

Expand Down Expand Up @@ -631,7 +635,17 @@ class AutoTuner(
}
}

private def recommendGeneralProperties(): Unit = {
private def recommendGCProperty(): Unit = {
val jvmGCFraction = appInfoProvider.getJvmGCFractions
if (jvmGCFraction.nonEmpty) { // avoid zero division
if ((jvmGCFraction.sum / jvmGCFraction.size) > MAX_JVM_GCTIME_FRACTION) {
appendComment("Average JVM GC time is very high. " +
"Other Garbage Collectors can be used for better performance.")
}
}
}

private def recommendAQEProperties(): Unit = {
val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled")
.getOrElse("false").toLowerCase
if (aqeEnabled == "false") {
Expand Down Expand Up @@ -665,13 +679,6 @@ class AutoTuner(
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
}
val jvmGCFraction = appInfoProvider.getJvmGCFractions
if (jvmGCFraction.nonEmpty) { // avoid zero division
if ((jvmGCFraction.sum / jvmGCFraction.size) > MAX_JVM_GCTIME_FRACTION) {
appendComment("Average JVM GC time is very high. " +
"Other Garbage Collectors can be used for better performance.")
}
}
}

/**
Expand Down Expand Up @@ -905,9 +912,10 @@ class AutoTuner(
}
skipList.foreach(skipSeq => skipSeq.foreach(_ => skippedRecommendations.add(_)))
skippedRecommendations ++= selectedPlatform.recommendationsToExclude
initRecommendations()
calculateJobLevelRecommendations()
if (processPropsAndCheck) {
initRecommendations()
calculateRecommendations()
calculateClusterLevelRecommendations()
} else {
// add all default comments
addDefaultComments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,31 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
val expectedResults =
s"""|Cannot recommend properties. See Comments.
s"""|
|Spark Properties:
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' should be set to (gpuCount * numWorkers).
|- 'spark.executor.memory' should be set to at least 2GB/core.
|- 'spark.rapids.memory.pinnedPool.size' should be set to 2048m.
|- 'spark.rapids.sql.concurrentGpuTasks' should be set to Max(4, (gpuMemory / 8G)).
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
| If the Spark RAPIDS jar is being bundled with your
| Spark distribution, this step is not needed.
|- The RAPIDS Shuffle Manager requires spark.driver.extraClassPath
| and spark.executor.extraClassPath settings to include the
| path to the Spark RAPIDS plugin jar.
| If the Spark RAPIDS jar is being bundled with your Spark
| distribution, this step is not needed.
|- java.io.FileNotFoundException: File non-existing.yaml does not exist
|""".stripMargin
assert(expectedResults == autoTunerOutput)
Expand All @@ -212,16 +228,32 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
s"""|
|Spark Properties:
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' should be set to (gpuCount * numWorkers).
|- 'spark.executor.memory' should be set to at least 2GB/core.
|- 'spark.rapids.memory.pinnedPool.size' should be set to 2048m.
|- 'spark.rapids.sql.concurrentGpuTasks' should be set to Max(4, (gpuMemory / 8G)).
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- Incorrect values in worker system information: {numCores: 0, memory: 122880MiB, numWorkers: 4}.
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
| If the Spark RAPIDS jar is being bundled with your
| Spark distribution, this step is not needed.
|- The RAPIDS Shuffle Manager requires spark.driver.extraClassPath
| and spark.executor.extraClassPath settings to include the
| path to the Spark RAPIDS plugin jar.
| If the Spark RAPIDS jar is being bundled with your Spark
| distribution, this step is not needed.
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
Expand All @@ -234,16 +266,32 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
s"""|
|Spark Properties:
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' should be set to (gpuCount * numWorkers).
|- 'spark.executor.memory' should be set to at least 2GB/core.
|- 'spark.rapids.memory.pinnedPool.size' should be set to 2048m.
|- 'spark.rapids.sql.concurrentGpuTasks' should be set to Max(4, (gpuMemory / 8G)).
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- Incorrect values in worker system information: {numCores: 32, memory: , numWorkers: 4}.
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
| If the Spark RAPIDS jar is being bundled with your
| Spark distribution, this step is not needed.
|- The RAPIDS Shuffle Manager requires spark.driver.extraClassPath
| and spark.executor.extraClassPath settings to include the
| path to the Spark RAPIDS plugin jar.
| If the Spark RAPIDS jar is being bundled with your Spark
| distribution, this step is not needed.
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
Expand All @@ -256,16 +304,32 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
s"""|
|Spark Properties:
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' should be set to (gpuCount * numWorkers).
|- 'spark.executor.memory' should be set to at least 2GB/core.
|- 'spark.rapids.memory.pinnedPool.size' should be set to 2048m.
|- 'spark.rapids.sql.concurrentGpuTasks' should be set to Max(4, (gpuMemory / 8G)).
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- Incorrect values in worker system information: {numCores: 32, memory: 0m, numWorkers: 4}.
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
| If the Spark RAPIDS jar is being bundled with your
| Spark distribution, this step is not needed.
|- The RAPIDS Shuffle Manager requires spark.driver.extraClassPath
| and spark.executor.extraClassPath settings to include the
| path to the Spark RAPIDS plugin jar.
| If the Spark RAPIDS jar is being bundled with your Spark
| distribution, this step is not needed.
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
Expand Down
30 changes: 4 additions & 26 deletions user_tools/src/spark_rapids_pytools/rapids/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from spark_rapids_pytools.cloud_api.sp_types import ClusterBase
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.common.utilities import Utils
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool


Expand Down Expand Up @@ -70,22 +70,13 @@ def _process_offline_cluster_args(self):
# only if we succeed to get the GPU cluster, we can generate auto-tuner-input
self._generate_autotuner_input()

def __load_disabled_recommendation_report(self) -> str:
template_file_name = self.ctxt.get_value('toolOutput', 'recommendations', 'disabledInfoMsgTemplate')
template_path = Utils.resource_path(f'templates/{template_file_name}')
return TemplateGenerator.render_template_file(template_path, {'CLUSTER_ARG': 'cluster'})

def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None):
gpu_cluster_arg = offline_cluster_opts.get('gpuCluster')
if gpu_cluster_arg:
gpu_cluster_obj = self._create_migration_cluster('GPU', gpu_cluster_arg)
self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj)
return True
# If we are here, we know that the workerInfoPath was not set as well.
# Then we can remind the user that recommendations won't be calculated
disabled_recommendations_msg = self.__load_disabled_recommendation_report()
self.ctxt.set_ctxt('disabledRecommendationsMsg', disabled_recommendations_msg)
self.logger.info(disabled_recommendations_msg)
return False

def _generate_autotuner_file_for_cluster(self, file_path: str, cluster_ob: ClusterBase):
Expand Down Expand Up @@ -135,10 +126,10 @@ def _generate_autotuner_input(self):
self.ctxt.set_ctxt('autoTunerFilePath', autotuner_input_path)

def _create_autotuner_rapids_args(self) -> list:
# Add the autotuner argument if the autotunerPath exists
# Add the autotuner argument, also add worker-info if the autotunerPath exists
autotuner_path = self.ctxt.get_ctxt('autoTunerFilePath')
if autotuner_path is None:
return []
return ['--auto-tuner']
return ['--auto-tuner', '--worker-info', autotuner_path]

def __read_single_app_output(self, file_path: str) -> (str, List[str], List[str]):
Expand Down Expand Up @@ -209,14 +200,6 @@ def _write_summary(self):
print(Utils.gen_multiline_str(self._report_tool_full_location(),
self.ctxt.get_ctxt('wrapperOutputContent')))

def __generate_report_no_recommendations(self):
prof_app_dirs = FSUtil.get_subdirectories(self.ctxt.get_rapids_output_folder())
wrapper_content = [Utils.gen_report_sec_header('Recommendations'),
self.ctxt.get_ctxt('disabledRecommendationsMsg'),
Utils.gen_report_sec_header('Profiling status'),
f'Total application profiled: {len(prof_app_dirs)}']
self.ctxt.set_ctxt('wrapperOutputContent', wrapper_content)

def __generate_report_with_recommendations(self):
prof_app_dirs = FSUtil.get_subdirectories(self.ctxt.get_rapids_output_folder())
profiling_log = self.ctxt.get_value('toolOutput', 'recommendations', 'fileName')
Expand Down Expand Up @@ -261,12 +244,7 @@ def _process_output(self):
if not self._evaluate_rapids_jar_tool_output_exist():
return

if self.ctxt.get_ctxt('autoTunerFilePath'):
# if autotuner is enabled, generate full recommendations summary
self.__generate_report_with_recommendations()
else:
# generate a brief summary
self.__generate_report_no_recommendations()
self.__generate_report_with_recommendations()

def _init_rapids_arg_list(self) -> List[str]:
return self._create_autotuner_rapids_args()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ toolOutput:
subFolder: rapids_4_spark_profile
recommendations:
fileName: profile.log
disabledInfoMsgTemplate: 'info_recommendations_disabled.ms'
headers:
section: '### D. Recommended Configuration ###'
sparkProperties: 'Spark Properties:'
Expand Down

This file was deleted.

0 comments on commit 876c407

Please sign in to comment.