Skip to content

Commit

Permalink
Merge dev into main
Browse files Browse the repository at this point in the history
Signed-off-by: spark-rapids automation <[email protected]>
  • Loading branch information
nvauto committed Mar 14, 2024
2 parents 8fcef52 + a8ed8f3 commit 518d17a
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {

def getDurations(tcs: ArrayBuffer[TaskCase]): (Long, Long, Long, Double) = {
val durations = tcs.map(_.duration)
if (durations.size > 0 ) {
if (durations.nonEmpty ) {
(durations.sum, durations.max, durations.min,
ToolUtils.calculateAverage(durations.sum, durations.size, 1))
} else {
Expand All @@ -49,22 +49,20 @@ class Analysis(apps: Seq[ApplicationInfo]) {
def jobAndStageMetricsAggregation(): Seq[JobStageAggTaskMetricsProfileResult] = {
val allJobRows = apps.flatMap { app =>
app.jobIdToInfo.map { case (id, jc) =>
val stageIdsInJob = jc.stageIds
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
}.keys.map(_._1).toSeq
jc.stageIds.contains(sid)
}.keys.map(_._1).toSet
if (stagesInJob.isEmpty) {
None
} else {
val tasksInJob = app.taskEnd.filter { tc =>
stagesInJob.contains(tc.stageId)
}
// count duplicate task attempts
val numTaskAttempt = tasksInJob.size
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInJob)
Some(JobStageAggTaskMetricsProfileResult(app.index,
s"job_$id",
numTaskAttempt,
tasksInJob.size,
jc.duration,
tasksInJob.map(_.diskBytesSpilled).sum,
durSum,
Expand Down Expand Up @@ -100,9 +98,8 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
val allJobStageRows = apps.flatMap { app =>
app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
jc.stageIds.contains(sid)
}
if (stagesInJob.isEmpty) {
None
Expand All @@ -111,12 +108,10 @@ class Analysis(apps: Seq[ApplicationInfo]) {
val tasksInStage = app.taskEnd.filter { tc =>
tc.stageId == id
}
// count duplicate task attempts
val numAttempts = tasksInStage.size
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInStage)
Some(JobStageAggTaskMetricsProfileResult(app.index,
s"stage_$id",
numAttempts,
tasksInStage.size,
sc.duration,
tasksInStage.map(_.diskBytesSpilled).sum,
durSum,
Expand Down Expand Up @@ -153,17 +148,16 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
// stages that are missing from a job, perhaps dropped events
val stagesWithoutJobs = apps.flatMap { app =>
val allStageinJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
val allStageInJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
jc.stageIds.contains(sid)
}
}
val missing = app.stageIdToInfo.keys.toSeq.diff(allStageinJobs.keys.toSeq)
val missing = app.stageIdToInfo.keys.toSet.diff(allStageInJobs.keys.toSet)
if (missing.isEmpty) {
Seq.empty
} else {
missing.map { case ((id, saId)) =>
missing.map { case (id, saId) =>
val scOpt = app.stageIdToInfo.get((id, saId))
scOpt match {
case None =>
Expand Down Expand Up @@ -214,11 +208,11 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}

val allRows = allJobRows ++ allJobStageRows ++ stagesWithoutJobs
val filteredRows = allRows.filter(_.isDefined).map(_.get)
if (filteredRows.size > 0) {
val filteredRows = allRows.flatMap(row => row)
if (filteredRows.nonEmpty) {
val sortedRows = filteredRows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
(cols.appIndex, -(sortDur), cols.id)
(cols.appIndex, -sortDur, cols.id)
}
sortedRows
} else {
Expand All @@ -231,12 +225,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
jc.sqlID.isDefined && jc.sqlID.get == sqlId
}
if (jcs.isEmpty) {
None
} else {
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSet
val tasksInSQL = app.taskEnd.filter { tc =>
stageIdsForSQL.contains(tc.stageId)
}
Expand Down Expand Up @@ -298,7 +292,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}
}
val allFiltered = allRows.filter(_.isDefined).map(_.get)
val allFiltered = allRows.flatMap(row => row)
if (allFiltered.size > 0) {
val sortedRows = allFiltered.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
Expand All @@ -314,12 +308,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, _) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
jc.sqlID.isDefined && jc.sqlID.get == sqlId
}
if (jcs.isEmpty) {
None
} else {
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSet

val tasksInSQL = app.taskEnd.filter { tc =>
stageIdsForSQL.contains(tc.stageId)
Expand All @@ -344,7 +338,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}
}
val allFiltered = allRows.filter(_.isDefined).map(_.get)
val allFiltered = allRows.flatMap(row => row)
if (allFiltered.size > 0) {
val sortedRows = allFiltered.sortBy { cols =>
(cols.appIndex, cols.sqlId)
Expand All @@ -359,12 +353,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
apps.map { app =>
val maxOfSqls = app.sqlIdToInfo.map { case (sqlId, _) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
jc.sqlID.isDefined && jc.sqlID.get == sqlId
}
if (jcs.isEmpty) {
0L
} else {
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSet
val tasksInSQL = app.taskEnd.filter { tc =>
stageIdsForSQL.contains(tc.stageId)
}
Expand Down Expand Up @@ -394,7 +388,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
sqlCase.sqlCpuTimePercent)
}
}
if (allRows.size > 0) {
if (allRows.nonEmpty) {
val sortedRows = allRows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
(cols.appIndex, cols.sqlID, sortDur)
Expand Down Expand Up @@ -443,8 +437,8 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}

val allNonEmptyRows = allRows.filter(_.isDefined).map(_.get)
if (allNonEmptyRows.size > 0) {
val allNonEmptyRows = allRows.flatMap(row => row)
if (allNonEmptyRows.nonEmpty) {
val sortedRows = allNonEmptyRows.sortBy { cols =>
(cols.appIndex, cols.stageId, cols.stageAttemptId, cols.taskId, cols.taskAttemptId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,8 @@ class AutoTuner(

/**
* Recommendation for 'spark.sql.files.maxPartitionBytes' based on input size for each task.
* Note that the logic can be disabled by adding the property to [[limitedLogicRecommendations]]
* which is one of the arguments of [[getRecommendedProperties()]].
* Note that the logic can be disabled by adding the property to "limitedLogicRecommendations"
* which is one of the arguments of [[getRecommendedProperties]].
*/
private def recommendMaxPartitionBytes(): Unit = {
val maxPartitionProp =
Expand All @@ -873,8 +873,8 @@ class AutoTuner(

/**
* Recommendations for 'spark.sql.shuffle.partitions' based on spills and skew in shuffle stages.
* Note that the logic can be disabled by adding the property to [[limitedLogicRecommendations]]
* which is one of the arguments of [[getRecommendedProperties()]].
* Note that the logic can be disabled by adding the property to "limitedLogicRecommendations"
* which is one of the arguments of [[getRecommendedProperties]].
*/
def recommendShufflePartitions(): Unit = {
val lookup = "spark.sql.shuffle.partitions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
CollectInformation.addNewProps(propsToKeep, props, numApps)
}
val allRows = props.map { case (k, v) => Seq(k) ++ v }.toSeq
if (allRows.size > 0) {
if (allRows.nonEmpty) {
val resRows = allRows.map(r => RapidsPropertyProfileResult(r(0), outputHeaders, r))
resRows.sortBy(cols => cols.key)
} else {
Expand All @@ -259,7 +259,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
val allWholeStages = apps.flatMap { app =>
app.wholeStage
}
if (allWholeStages.size > 0) {
if (allWholeStages.nonEmpty) {
allWholeStages.sortBy(cols => (cols.appIndex, cols.sqlID, cols.nodeID))
} else {
Seq.empty
Expand All @@ -269,7 +269,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
// Print SQL Plan Metrics
def getSQLPlanMetrics: Seq[SQLAccumProfileResults] = {
val sqlAccums = CollectInformation.generateSQLAccums(apps)
if (sqlAccums.size > 0) {
if (sqlAccums.nonEmpty) {
sqlAccums.sortBy(cols => (cols.appIndex, cols.sqlID, cols.nodeID,
cols.nodeName, cols.accumulatorId, cols.metricType))
} else {
Expand All @@ -286,11 +286,11 @@ object CollectInformation extends Logging {
def generateSQLAccums(apps: Seq[ApplicationInfo]): Seq[SQLAccumProfileResults] = {
val allRows = apps.flatMap { app =>
app.allSQLMetrics.map { metric =>
val sqlId = metric.sqlID
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
// Avoid getOrElse to reduce memory allocations
jc.sqlID.isDefined && jc.sqlID.get == metric.sqlID
}
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSet
val accumsOpt = app.taskStageAccumMap.get(metric.accumulatorId)
val taskMax = accumsOpt match {
case Some(accums) =>
Expand Down Expand Up @@ -326,7 +326,7 @@ object CollectInformation extends Logging {
val driverMax = driverAccumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
a.sqlID == sqlId
a.sqlID == metric.sqlID
}
val accumValues = filtered.map(_.value).sortWith(_ < _)
if (accumValues.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -144,7 +144,7 @@ case class QueryPlanWithMetrics(plan: SparkPlanInfoWithStage, metrics: Map[Long,
* Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the
* SparkPlan tree, and each edge represents a parent-child relationship between two nodes.
*/
case class SparkPlanGraph(
case class SparkPlanGraphForDot(
nodes: Seq[SparkPlanGraphNode],
edges: Seq[SparkPlanGraphEdge],
appId: String,
Expand Down Expand Up @@ -187,14 +187,14 @@ object SparkPlanGraph {
appId: String,
sqlId: String,
physicalPlan: String,
stageIdToStageMetrics: Map[Int, StageMetrics]): SparkPlanGraph = {
stageIdToStageMetrics: Map[Int, StageMetrics]): SparkPlanGraphForDot = {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
val exchanges = mutable.HashMap[SparkPlanInfoWithStage, SparkPlanGraphNode]()
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null, null, exchanges,
stageIdToStageMetrics)
new SparkPlanGraph(nodes, edges, appId, sqlId, physicalPlan)
SparkPlanGraphForDot(nodes, edges, appId, sqlId, physicalPlan)
}

@tailrec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ class SQLExecutionInfoClass(
var duration: Option[Long],
var hasDatasetOrRDD: Boolean,
var problematic: String = "",
var sqlCpuTimePercent: Double = -1)
var sqlCpuTimePercent: Double = -1) {
def setDsOrRdd(value: Boolean): Unit = {
hasDatasetOrRDD = value
}
}

case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
nodeName: String, accumulatorId: Long, name: String, min: Long, median:Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val app = new ApplicationInfo(hadoopConf, path, index)
EventLogPathProcessor.logApplicationInfo(app)
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to process ${path.eventLog.toString}")
logInfo(s"Took ${endTime - startTime}ms to create App for ${path.eventLog.toString}")
Some(app)
} catch {
case _: com.fasterxml.jackson.core.JsonParseException =>
Expand All @@ -327,9 +327,12 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
* and returns the summary information. The summary information is much smaller than
* the ApplicationInfo because it has processed and combined many of the raw events.
*/
private def processApps(apps: Seq[ApplicationInfo], printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter): (ApplicationSummaryInfo,
Option[CompareSummaryInfo]) = {
private def processApps(
apps: Seq[ApplicationInfo],
printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter)
: (ApplicationSummaryInfo, Option[CompareSummaryInfo]) = {
val startTime = System.currentTimeMillis()

val collect = new CollectInformation(apps)
val appInfo = collect.getAppInfo
Expand Down Expand Up @@ -403,7 +406,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
s"to $outputDir in $duration second(s)\n")
}
}
(ApplicationSummaryInfo(appInfo, dsInfo, execInfo, jobInfo, rapidsProps,
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo, execInfo, jobInfo, rapidsProps,
rapidsJar, sqlMetrics, jsMetAgg, sqlTaskAggMetrics, durAndCpuMet, skewInfo,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, sqlStageInfo, wholeStage, maxTaskInputInfo,
Expand Down
Loading

0 comments on commit 518d17a

Please sign in to comment.