Skip to content

Commit

Permalink
Merge dev into main
Browse files Browse the repository at this point in the history
Signed-off-by: spark-rapids automation <[email protected]>
  • Loading branch information
nvauto committed Nov 16, 2023
2 parents 5bfd44f + b61e307 commit 93dc275
Show file tree
Hide file tree
Showing 59 changed files with 733 additions and 1,905 deletions.
4 changes: 2 additions & 2 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ The Profiling tool generates information which can be used for debugging and pro
Information such as Spark version, executor information, properties and so on. This runs on either CPU or
GPU generated event logs.

Please refer to [Qualification tool documentation](docs/spark-qualification-tool.md)
and [Profiling tool documentation](docs/spark-profiling-tool.md)
Please refer to [Qualification tool documentation](https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-qualification-tool.html)
and [Profiling tool documentation](https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-profiling-tool.html)
for more details on how to use the tools.

## Build
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
729 changes: 0 additions & 729 deletions core/docs/spark-profiling-tool.md

This file was deleted.

1,027 changes: 0 additions & 1,027 deletions core/docs/spark-qualification-tool.md

This file was deleted.

2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>23.10.0</version>
<version>23.10.1-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ class AutoTuner(
}
}

def calculateRecommendations(): Unit = {
def calculateClusterLevelRecommendations(): Unit = {
recommendExecutorInstances()
val numExecutorCores = calcNumExecutorCores
val execCoresExpr = () => numExecutorCores
Expand All @@ -593,6 +593,10 @@ class AutoTuner(
appendRecommendation("spark.rapids.sql.multiThreadedRead.numThreads",
Math.max(20, numExecutorCores))

recommendAQEProperties()
}

def calculateJobLevelRecommendations(): Unit = {
val shuffleManagerVersion = appInfoProvider.getSparkVersion.get.filterNot("().".toSet)
appendRecommendation("spark.shuffle.manager",
"com.nvidia.spark.rapids.spark" + shuffleManagerVersion + ".RapidsShuffleManager")
Expand All @@ -601,7 +605,7 @@ class AutoTuner(
recommendFileCache()
recommendMaxPartitionBytes()
recommendShufflePartitions()
recommendGeneralProperties()
recommendGCProperty()
recommendClassPathEntries()
}

Expand Down Expand Up @@ -631,7 +635,17 @@ class AutoTuner(
}
}

private def recommendGeneralProperties(): Unit = {
private def recommendGCProperty(): Unit = {
val jvmGCFraction = appInfoProvider.getJvmGCFractions
if (jvmGCFraction.nonEmpty) { // avoid zero division
if ((jvmGCFraction.sum / jvmGCFraction.size) > MAX_JVM_GCTIME_FRACTION) {
appendComment("Average JVM GC time is very high. " +
"Other Garbage Collectors can be used for better performance.")
}
}
}

private def recommendAQEProperties(): Unit = {
val aqeEnabled = getPropertyValue("spark.sql.adaptive.enabled")
.getOrElse("false").toLowerCase
if (aqeEnabled == "false") {
Expand Down Expand Up @@ -665,13 +679,6 @@ class AutoTuner(
// problematic because this is the compressed shuffle size
appendRecommendation("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
}
val jvmGCFraction = appInfoProvider.getJvmGCFractions
if (jvmGCFraction.nonEmpty) { // avoid zero division
if ((jvmGCFraction.sum / jvmGCFraction.size) > MAX_JVM_GCTIME_FRACTION) {
appendComment("Average JVM GC time is very high. " +
"Other Garbage Collectors can be used for better performance.")
}
}
}

/**
Expand Down Expand Up @@ -892,7 +899,8 @@ class AutoTuner(
"Spark applications utilizing RAPIDS Accelerator for Apache Spark")
if (!isPluginLoaded) {
appendComment("RAPIDS Accelerator for Apache Spark jar is missing in \"spark.plugins\". " +
"Please refer to https://nvidia.github.io/spark-rapids/Getting-Started")
"Please refer to " +
"https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html")
}
if (!rapidsEnabled) {
appendComment("Please enable Spark RAPIDS Accelerator for Apache Spark by setting " +
Expand All @@ -905,9 +913,10 @@ class AutoTuner(
}
skipList.foreach(skipSeq => skipSeq.foreach(_ => skippedRecommendations.add(_)))
skippedRecommendations ++= selectedPlatform.recommendationsToExclude
initRecommendations()
calculateJobLevelRecommendations()
if (processPropsAndCheck) {
initRecommendations()
calculateRecommendations()
calculateClusterLevelRecommendations()
} else {
// add all default comments
addDefaultComments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ extends ProfileResult {
}
}

case class DriverLogUnsupportedOperators(
operatorName: String, count: Int, reason: String) extends ProfileResult {
override val outputHeaders = Seq("operatorName", "count", "reason")

override def convertToSeq: Seq[String] = {
Seq(operatorName, count.toString, reason)
}

override def convertToCSVSeq: Seq[String] = {
Seq(StringUtils.reformatCSVString(operatorName), count.toString,
StringUtils.reformatCSVString(reason))
}
}

class StageInfoClass(val info: StageInfo) {
var completionTime: Option[Long] = None
var failureReason: Option[String] = None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

import scala.io.Source

import org.apache.spark.internal.Logging

class DriverLogProcessor(driverlogPath: String) extends Logging {
def processDriverLog(): Seq[DriverLogUnsupportedOperators] = {
val source = Source.fromFile(driverlogPath)
// Create a map to store the counts for each operator and reason
var countsMap = Map[(String, String), Int]().withDefaultValue(0)
try {
// Process each line in the file
for (line <- source.getLines()) {
// condition to check if the line contains unsupported operators
if (line.contains("cannot run on GPU") &&
!line.contains("not all expressions can be replaced")) {
val operatorName = line.split("<")(1).split(">")(0)
val reason = line.split("because")(1).trim()
val key = (operatorName, reason)
countsMap += key -> (countsMap(key) + 1)
}
}
} catch {
case e: Exception =>
logError(s"Unexpected exception processing driver log: $driverlogPath", e)
} finally {
source.close()
}
countsMap.map(x => DriverLogUnsupportedOperators(x._1._1, x._2, x._1._2)).toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Profiling Tool for the RAPIDS Accelerator and Apache Spark
Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
com.nvidia.spark.rapids.tool.profiling.ProfileMain [options]
<eventlogs | eventlog directories ...>
[eventlogs | eventlog directories ...]
""")

val outputDirectory: ScallopOption[String] =
Expand All @@ -38,8 +38,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
" rapids_4_spark_profile. It will overwrite any existing files" +
" with the same name.",
default = Some("."))
val driverlog: ScallopOption[String] =
opt[String](required = false,
descr = "Driver log filename - eg: /path/to/driverlog. Default is empty.")
val eventlog: ScallopOption[List[String]] =
trailArg[List[String]](required = true,
trailArg[List[String]](required = false,
descr = "Event log filenames(space separated) or directories containing event logs." +
" eg: s3a://<BUCKET>/eventlog1 /path/to/eventlog2")
val filterCriteria: ScallopOption[String] =
Expand Down Expand Up @@ -143,6 +146,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
Right(Unit)
}

// verify that either driverlog or eventlog is specified
validateOpt(driverlog, eventlog) {
case (None, None) => Left("Error, one of driverlog or eventlog must be specified")
case _ => Right(Unit)
}
verify()

override def onError(e: Throwable) = e match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ object ProfileMain extends Logging {
def mainInternal(appArgs: ProfileArgs, enablePB: Boolean = false): (Int, Int) = {

// Parsing args
val eventlogPaths = appArgs.eventlog()
val eventlogPaths = appArgs.eventlog.getOrElse(List.empty[String])
val driverLog = appArgs.driverlog.getOrElse("")
val filterN = appArgs.filterCriteria
val matchEventLogs = appArgs.matchEventLogs
val hadoopConf = RapidsToolsConfUtil.newHadoopConf
Expand All @@ -62,13 +63,24 @@ object ProfileMain extends Logging {
eventLogFsFiltered
}

if (filteredLogs.isEmpty) {
logWarning("No event logs to process after checking paths, exiting!")
if (filteredLogs.isEmpty && driverLog.isEmpty) {
logWarning("No event logs to process after checking paths and no driver log " +
"to process, exiting!")
return (0, filteredLogs.size)
}

// Check that only one eventlog is provided when driver log is passed
if (driverLog.nonEmpty && filteredLogs.size > 1) {
logWarning("Only a single eventlog should be provided for processing " +
"when a driver log is passed, exiting!")
return (0, filteredLogs.size)
}

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
profiler.profile(eventLogFsFiltered)
if (driverLog.nonEmpty){
profiler.profileDriver(driverLog)
}
(0, filteredLogs.size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String): Unit = {
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)

try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
val unsupportedDrivers = driverLogProcessor.processDriverLog()
profileOutputWriter.write(s"Unsupported operators in driver log",
unsupportedDrivers)
} finally {
profileOutputWriter.close()
}
}

private def errorHandler(error: Throwable, path: EventLogInfo) = {
error match {
case oom: OutOfMemoryError =>
Expand Down Expand Up @@ -530,6 +544,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
object Profiler {
// This tool's output log file name
val PROFILE_LOG_NAME = "profile"
val DRIVER_LOG_NAME = "driver"
val COMPARE_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_compare"
val COMBINED_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_combined"
val SUBDIR = "rapids_4_spark_profile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,82 @@ class QualificationAppInfo(
}
}

/**
* Checks the stage ID in the execution information.
* This function determines the associated stages for the given execution information by
* checking the stages in the current execution information, the previous execution information,
* and the next execution information. If there are associated stages, it returns a sequence of
* stage ID and execution information pairs. Otherwise, it returns an optional execution
* information(not associated with any stage). If there is stage ID associated with both the
* previous and the next execution information, then the current execution information is
* associated with the stage ID of the previous execution information.
* @param prev The previous execution information.
* @param execInfo The current execution information.
* @param next The next execution information.
* @return A tuple containing a sequence of stage ID and execution information pairs,
* and an optional execution information.
*/
private def checkStageIdInExec(prev: Option[ExecInfo],
execInfo: ExecInfo, next: Option[ExecInfo]): (Seq[(Int, ExecInfo)], Option[ExecInfo]) = {
val associatedStages = {
if (execInfo.stages.nonEmpty) {
execInfo.stages.toSeq
} else {
if (prev.exists(_.stages.nonEmpty)) {
prev.flatMap(_.stages.headOption).toSeq
} else if (next.exists(_.stages.nonEmpty)) {
next.flatMap(_.stages.headOption).toSeq
} else {
// we don't know what stage its in or its duration
logDebug(s"No stage associated with ${execInfo.exec} " +
s"so speedup factor isn't applied anywhere.")
Seq.empty
}
}
}
if (associatedStages.nonEmpty) {
(associatedStages.map((_, execInfo)), None)
} else {
(Seq.empty, Some(execInfo))
}
}

private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = {
val execsWithoutStages = new ArrayBuffer[ExecInfo]()
val perStageSum = execInfos.flatMap { execInfo =>
if (execInfo.stages.size > 1) {
execInfo.stages.map((_, execInfo))
} else if (execInfo.stages.size < 1) {
// we don't know what stage its in or its duration
logDebug(s"No stage associated with ${execInfo.exec} " +
s"so speedup factor isn't applied anywhere.")
execsWithoutStages += execInfo
Seq.empty

// This is to get the mapping between stageId and execs. This is primarily done based on
// accumulatorId. If an Exec has some metrics generated, then an accumulatorId will be
// generated for that Exec. This accumulatorId is used to get the stageId. If an Exec
// doesn't have any metrics, then we will try to get the stageId by looking at the
// neighbor Execs. If either of the neighbor Execs has a stageId, then we will use that
// to assign the same stageId to the current Exec as it's most likely that the current
// Exec is part of the same stage as the neighbor Exec.
val execInfosInOrder = execInfos.reverse
val execsToStageMap = execInfosInOrder.indices.map {
// corner case to handle first element
case 0 => if (execInfosInOrder.size > 1) {
// If there are more than one Execs, then check if the next Exec has a stageId.
checkStageIdInExec(None, execInfosInOrder(0), Some(execInfosInOrder(1)))
} else {
Seq((execInfo.stages.head, execInfo))
checkStageIdInExec(None, execInfosInOrder(0), None)
}
}.groupBy(_._1).map { case (stage, execInfos) =>
(stage, execInfos.map(_._2))
// corner case to handle last element
case i if i == execInfosInOrder.size - 1 && execInfosInOrder.size > 1 =>
// If there are more than one Execs, then check if the previous Exec has a stageId.
checkStageIdInExec(Some(execInfosInOrder(i - 1)), execInfosInOrder(i), None)
case i =>
checkStageIdInExec(Some(execInfosInOrder(i - 1)),
execInfosInOrder(i), Some(execInfosInOrder(i + 1)))
}
(perStageSum, execsWithoutStages.toSeq)
val perStageSum = execsToStageMap.map(_._1).toList.flatten
.groupBy(_._1).map { case (stage, execInfo) =>
(stage, execInfo.map(_._2))
}

// Add all the execs that don't have a stageId to execsWithoutStages.
execsWithoutStages ++= execsToStageMap.map(_._2).toList.flatten

(perStageSum, execsWithoutStages)
}

private def flattenedExecs(execs: Seq[ExecInfo]): Seq[ExecInfo] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1626104300434","Not Recommended",1.01,129484.66,1619.33,2429,1469,131104,2429,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,128847,0,1469,3.0,false,"CollectLimit","",30
"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19120.15,7050.84,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand","",30
"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30
Loading

0 comments on commit 93dc275

Please sign in to comment.