Skip to content

Commit

Permalink
Latest dev pull + changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari committed Aug 2, 2024
2 parents 5339dd9 + 31b82d1 commit 22c6db4
Show file tree
Hide file tree
Showing 33 changed files with 4,771 additions and 887 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.FileNotFoundException
import java.time.LocalDateTime
import java.util.zip.ZipOutputStream

import scala.collection.mutable.{LinkedHashMap, ListBuffer}
import scala.collection.mutable.LinkedHashMap
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -102,35 +102,38 @@ object EventLogPathProcessor extends Logging {
(dbLogFiles.size > 1)
}

// If the user provides wildcard in the eventlogs, then the path processor needs
// to process the path as a pattern. Otherwise, HDFS throws an exception mistakenly that
// no such file exists.
// Once the glob path is checked, the list of eventlogs is the result of the flattenMap
// of all the processed files.
/**
* If the user provides wildcard in the eventlogs, then the path processor needs
* to process the path as a pattern. Otherwise, HDFS throws an exception mistakenly that
* no such file exists.
* Once the glob path is checked, the list of eventlogs is the result of the flattenMap
* of all the processed files.
*
* @return List of (rawPath, List[processedPaths])
*/
private def processWildcardsLogs(eventLogsPaths: List[String],
hadoopConf: Configuration): List[String] = {
val processedLogs = ListBuffer[String]()
eventLogsPaths.foreach { rawPath =>
hadoopConf: Configuration): List[(String, List[String])] = {
eventLogsPaths.map { rawPath =>
if (!rawPath.contains("*")) {
processedLogs += rawPath
(rawPath, List(rawPath))
} else {
try {
val globPath = new Path(rawPath)
val fileContext = FileContext.getFileContext(globPath.toUri(), hadoopConf)
val fileStatuses = fileContext.util().globStatus(globPath)
processedLogs ++= fileStatuses.map(_.getPath.toString)
val paths = fileStatuses.map(_.getPath.toString).toList
(rawPath, paths)
} catch {
case _ : Throwable =>
// Do not fail in this block.
// Instead, ignore the error and add the file as is; then the caller should fail when
// processing the file.
// This will make handling errors more consistent during the processing of the analysis
logWarning(s"Processing pathLog with wildCard has failed: $rawPath")
processedLogs += rawPath
(rawPath, List.empty)
}
}
}
processedLogs.toList
}

def getEventLogInfo(pathString: String,
Expand Down Expand Up @@ -226,7 +229,13 @@ object EventLogPathProcessor extends Logging {
eventLogsPaths: List[String],
hadoopConf: Configuration): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf)
val logsWithTimestamp = logsPathNoWildCards.flatMap(getEventLogInfo(_, hadoopConf)).toMap
val logsWithTimestamp = logsPathNoWildCards.flatMap {
case (rawPath, processedPaths) if processedPaths.isEmpty =>
// If no event logs are found in the path after wildcard expansion, return a failed event
Map(FailedEventLog(new Path(rawPath), s"No event logs found in $rawPath") -> None)
case (_, processedPaths) =>
processedPaths.flatMap(getEventLogInfo(_, hadoopConf))
}.toMap

logDebug("Paths after stringToPath: " + logsWithTimestamp)
// Filter the event logs to be processed based on the criteria. If it is not provided in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
/**
* Connects Operators to Stages using AccumulatorIDs.
* TODO: This function can be fused in the visitNode function to avoid the extra iteration.
*
* @param cb function that creates a SparkPlanGraph. This can be used as a cacheHolder for the
* object created to be used later.
*/
Expand All @@ -87,7 +88,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* Both Qual/Prof analysis.
* For Qual apps, the app.sqlIDtoProblematic won't be set because it is done later during the
* aggregation phase.
* @param sqlId the SQL ID being analyzed
*
* @param sqlId the SQL ID being analyzed
* @param potentialProblems a set of strings that represent the potential problems found in the
* SQL plan.
*/
Expand Down Expand Up @@ -119,26 +121,26 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* 1- allSQLMetrics: a list of SQLMetricInfoCase
* 2- wholeStage: a list of WholeStageCodeGenResults
* 3- unsupportedSQLPlan: a list of UnsupportedSQLPlan that contains the SQL ID, node ID,
* node name.
* TODO: Consider handling the construction of this list in a different way for the
* Qualification
* node name.
* TODO: Consider handling the construction of this list in a different way for the
* Qualification
* 4- sqlPlanNodeIdToStageIds: A map between (SQL ID, Node ID) and the set of stage IDs
*
* It has the following effect on the visitor object:
* 1- It updates the sqlIsDsOrRDD argument to True when the visited node is an RDD or Dataset.
* 2- If the SQLID is an RDD, the potentialProblems is cleared because once SQL is marked as RDD,
* all the other problems are ignored. Note that we need to set that flag only once to True
* for the given SQLID.
* all the other problems are ignored. Note that we need to set that flag only once to True
* for the given SQLID.
* 3- It appends the current node's potential problems to the SQLID problems only if the SQL is
* visitor.sqlIsDsOrRDD is False. Otherwise, it is kind of redundant to keep checking for
* potential problems for every node when they get to be ignored.
* visitor.sqlIsDsOrRDD is False. Otherwise, it is kind of redundant to keep checking for
* potential problems for every node when they get to be ignored.
*
* It has the following effect on the app object:
* 1- it updates dataSourceInfo with V2 and V1 data sources
* 2- it updates sqlIDtoProblematic the map between SQL ID and potential problems
*
*
* @param visitor the visitor context defined per SQLPlan
* @param node the node being currently visited.
* @param node the node being currently visited.
*/
protected def visitNode(visitor: SQLPlanVisitorContext,
node: SparkPlanGraphNode): Unit = {
Expand All @@ -161,7 +163,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
if (nodeIsDsOrRDD) {
// we want to report every node that is an RDD
val thisPlan = UnsupportedSQLPlan(visitor.sqlPIGEntry.sqlID, node.id, node.name, node.desc,
"Contains Dataset or RDD")
"Contains Dataset or RDD")
unsupportedSQLPlan += thisPlan
// If one node is RDD, the Sql should be set too
if (!visitor.sqlIsDsOrRDD) { // We need to set the flag only once for the given sqlID
Expand Down Expand Up @@ -324,23 +326,46 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.map( accumMapEntry => {
def computeStatistics(taskUpdates: Seq[Long]): Option[StatisticsMetrics] = {
if (taskUpdates.isEmpty) {
None
} else {
val min = taskUpdates.min
val max = taskUpdates.max
val sum = taskUpdates.sum
val median = if (taskUpdates.size % 2 == 0) {
val mid = taskUpdates.size / 2
(taskUpdates(mid) + taskUpdates(mid - 1)) / 2
} else {
taskUpdates(taskUpdates.size / 2)
}
Some(StatisticsMetrics(min, median, max, sum))
}
}

app.accumManager.accumInfoMap.flatMap{ accumMapEntry =>
val accumId = accumMapEntry._1
val accumInfo = accumMapEntry._2
val accumStats = app.accumManager.calculateAccStats(accumId)
.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))
AccumProfileResults(
appIndex = appIndex,
stageId = accumInfo.stageValuesMap.keySet.head.toString,
accumulatorId = accumId,
name = accumInfo.infoRef.name.value,
min = accumStats.min,
median = accumStats.med,
max = accumStats.max,
total = accumStats.total
)
}).toSeq
}
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val tasks = app.taskManager.getAllTasksStageAttempt(stageId)
val taskUpdatesSorted = tasks.map( task => {
accumInfo.taskUpdatesMap.getOrElse(task.taskId, 0L)
}).toSeq.sorted
computeStatistics(taskUpdatesSorted).map { stats =>
AccumProfileResults(
appIndex,
stageId,
accumId,
accumInfo.infoRef.name.value,
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
})
}
}.toSeq
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,31 @@ class AutoTuner(
recommendFileCache()
recommendMaxPartitionBytes()
recommendShufflePartitions()
recommendKryoSerializerSetting()
recommendGCProperty()
recommendClassPathEntries()
recommendSystemProperties()
}

// if the user set the serializer to use Kryo, make sure we recommend using the GPU version
// of it.
def recommendKryoSerializerSetting(): Unit = {
getPropertyValue("spark.serializer") match {
case Some(f) if f.contains("org.apache.spark.serializer.KryoSerializer") =>
val existingRegistrars = getPropertyValue("spark.kryo.registrator")
val regToUse = if (existingRegistrars.isDefined && !existingRegistrars.get.isEmpty) {
// spark.kryo.registrator is a comma separated list. If the user set some then
// we need to append our GpuKryoRegistrator to ones they specified.
existingRegistrars.get + ",com.nvidia.spark.rapids.GpuKryoRegistrator"
} else {
"com.nvidia.spark.rapids.GpuKryoRegistrator"
}
appendRecommendation("spark.kryo.registrator", regToUse)
case None =>
// do nothing
}
}

def getShuffleManagerClassName() : Option[String] = {
appInfoProvider.getSparkVersion.map { sparkVersion =>
val shuffleManagerVersion = sparkVersion.filterNot("().".toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,18 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
}
}

case class AccumProfileResults(appIndex: Int, stageId: String, accumulatorId: Long, name: String,
case class AccumProfileResults(appIndex: Int, stageId: Int, accumulatorId: Long, name: String,
min: Long, median: Long, max: Long, total: Long) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min",
"median", "max", "total")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, stageId, accumulatorId.toString, name, min.toString, median.toString,
max.toString, total.toString)
Seq(appIndex.toString, stageId.toString, accumulatorId.toString, name, min.toString,
median.toString, max.toString, total.toString)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(stageId), accumulatorId.toString,
Seq(appIndex.toString, stageId.toString, accumulatorId.toString,
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString,
total.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] {

override def sortView(
rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = {
rows.sortBy(cols => (cols.stageId, cols.appIndex, cols.accumulatorId))
rows.sortBy(cols => (cols.appIndex, cols.stageId, cols.accumulatorId))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AccumInfo(val infoRef: AccMetaRef) {
def addAccToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
val parsedUpdateValue = accumulableInfo.update.flatMap(parseAccumFieldToLong)
// we have to update the stageMap if the stageId does not exist in the map
var updateStageFlag = !stageValuesMap.contains(stageId)
val updateStageFlag = !stageValuesMap.contains(stageId)
// This is for cases where same task updates the same accum multiple times
val existingUpdateValue = taskUpdatesMap.getOrElse(taskId, 0L)
parsedUpdateValue match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val eventLogContent =
"""{"Event":"SparkListenerLogStart","Spark Version":"3.2.1"}
|{"Event":"SparkListenerApplicationStart","App Name":"GPUMetrics", "App ID":"local-16261043003", "Timestamp":123456, "User":"User1"}
|{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5073,"Index":5054,"Attempt":0,"Partition ID":5054,"Launch Time":1712248533994,"Executor ID":"100","Host":"10.154.65.143","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253284920,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.492","Value":"03:13:31.359","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true},{"ID":1016,"Name":"gpuSplitAndRetryCount","Update":"1","Value":"2","Internal":false,"Count Failed Values":true}]}}""".stripMargin
|{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5073,"Index":5054,"Attempt":0,"Partition ID":5054,"Launch Time":1712248533994,"Executor ID":"100","Host":"10.154.65.143","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253284920,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.492","Value":"03:13:31.359","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true},{"ID":1016,"Name":"gpuSplitAndRetryCount","Update":"1","Value":"2","Internal":false,"Count Failed Values":true}]}}
|{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2913,"Index":2894,"Attempt":0,"Partition ID":2894,"Launch Time":1712248532696,"Executor ID":"24","Host":"10.154.65.135","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253285639,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.758","Value":"03:13:32.117","Internal":false,"Count Failed Values":true},{"ID":1015,"Name":"gpuReadSpillFromDiskTime","Update":"00:00:02.599","Value":"00:33:37.153","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true}]}}
|{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2045,"Index":2026,"Attempt":0,"Partition ID":2026,"Launch Time":1712248530708,"Executor ID":"84","Host":"10.154.65.233","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253285667,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.003","Value":"03:13:32.120","Internal":false,"Count Failed Values":true},{"ID":1015,"Name":"gpuReadSpillFromDiskTime","Update":"00:00:00.955","Value":"00:33:38.108","Internal":false,"Count Failed Values":true}]}}""".stripMargin
// scalastyle:on line.size.limit
Files.write(eventLogFilePath, eventLogContent.getBytes(StandardCharsets.UTF_8))

Expand All @@ -236,12 +238,15 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(apps.size == 1)

val collect = new CollectInformation(apps)
val gpuMetrics = collect.getStageLevelMetrics

// Sample eventlog has 3 gpu metrics, gpuSemaphoreWait,
// gpuSpillToHostTime, gpuSplitAndRetryCount
assert(gpuMetrics.size == 3)
val gpuSemaphoreWait = gpuMetrics.find(_.name == "gpuSemaphoreWait")
val stageLevelResults = collect.getStageLevelMetrics

// Sample eventlog has 4 gpu metrics - gpuSemaphoreWait, gpuReadSpillFromDiskTime
// gpuSpillToHostTime, gpuSplitAndRetryCount. But gpuSemaphoreWait and
// gpuReadSpillFromDiskTime metrics are updated in 2 stages(stage 10 and 11).
// So the result will have 6 rows in total since we are reporting stage level metrics.
assert(stageLevelResults.size == 6)
// gpu metrics
val gpuSemaphoreWait = stageLevelResults.find(_.name == "gpuSemaphoreWait")
assert(gpuSemaphoreWait.isDefined)
}
}
Expand Down
Loading

0 comments on commit 22c6db4

Please sign in to comment.