Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add stage/task level diagnostic output for GPU slowness in Profiler tool #1375

Merged
merged 30 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1265e47
initial implementation
cindyyuanjiang Oct 2, 2024
6aefee9
updated output schema based on offline discussion
cindyyuanjiang Oct 4, 2024
5a56d30
address feedback to merge two tables together
cindyyuanjiang Oct 9, 2024
be13fa8
update order of columns
cindyyuanjiang Oct 12, 2024
7b895c5
get gpu semaphore time
cindyyuanjiang Oct 14, 2024
68c34f6
add benchmark
cindyyuanjiang Oct 21, 2024
20dd4c8
clean up code
cindyyuanjiang Oct 23, 2024
41f6ff0
resolve merge conflict
cindyyuanjiang Oct 23, 2024
7cd25e9
add unit test
cindyyuanjiang Oct 25, 2024
f740192
new expectation file
cindyyuanjiang Oct 25, 2024
6931086
resolve merge conflict
cindyyuanjiang Oct 25, 2024
2dcdb9b
address review feedback
cindyyuanjiang Oct 30, 2024
ac162f0
address review feedback
cindyyuanjiang Oct 30, 2024
2d53682
add new file
cindyyuanjiang Oct 30, 2024
9d53668
remove unnecessary comment
cindyyuanjiang Oct 30, 2024
23946bc
address review feedback
cindyyuanjiang Oct 31, 2024
b583126
refactored for memory optimization
cindyyuanjiang Nov 6, 2024
058686d
addressed review feedback
cindyyuanjiang Nov 6, 2024
bdd2292
refactor stageDiagnosticResults
cindyyuanjiang Nov 9, 2024
3a8cf9e
change num attemps to tasks
cindyyuanjiang Nov 12, 2024
1361d53
remove diagnostic from applicationsummaryinfo
cindyyuanjiang Nov 13, 2024
c54a4b7
remove unused import
cindyyuanjiang Nov 13, 2024
882d403
new file
cindyyuanjiang Nov 13, 2024
f985b44
Merge branch 'dev' into profiler-diagnostic
cindyyuanjiang Nov 14, 2024
f3b78ff
add diagnostic view in qual tool output
cindyyuanjiang Nov 15, 2024
8b317e6
remove diagnostic vire from qual tool profile.log file
cindyyuanjiang Nov 15, 2024
ebfc6e3
address review feedback
cindyyuanjiang Nov 15, 2024
b88119a
Merge branch 'dev' into profiler-diagnostic
cindyyuanjiang Nov 19, 2024
056d4b2
add profile benchmark class
cindyyuanjiang Nov 20, 2024
de47ef4
fix profiler benchmark
cindyyuanjiang Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -32,6 +32,7 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param maxTaskInputSizes a sequence of SQLMaxTaskInputSizes that contains the maximum input size
* @param stageDiagnostics the stage level Spark metrics for diagnostic purposes
*/
case class AggRawMetricsResult(
jobAggs: Seq[JobAggTaskMetricsProfileResult],
Expand All @@ -40,4 +41,5 @@ case class AggRawMetricsResult(
sqlAggs: Seq[SQLTaskAggMetricsProfileResult],
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes])
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
* @param app the Application info objects that contains the SQL plans to be processed
*/
class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(app) {
val GPU_SEMAPHORE_WAIT_METRIC_NAME = "gpuSemaphoreWait"
// A map between (SQL ID, Node ID) and the set of stage IDs
// TODO: The Qualification should use this map instead of building a new set for each exec.
private val sqlPlanNodeIdToStageIds: HashMap[(Long, Long), Set[Int]] =
Expand All @@ -56,6 +57,10 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// SQLPlanParser.
var unsupportedSQLPlan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and the set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and total GPU semaphore wait time
val stageToGpuSemaphoreWaitTime: HashMap[Long, Long] = HashMap.empty[Long, Long]

/**
* Connects Operators to Stages using AccumulatorIDs.
Expand Down Expand Up @@ -261,6 +266,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
validNodes.map(n => s"${n.name}(${n.id.toString})")
}.getOrElse(Seq.empty)
stageToNodeNames(sModel.stageInfo.stageId) = nodeNames
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
Expand Down Expand Up @@ -339,6 +345,9 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
if (accumInfo.infoRef.getName.contains(GPU_SEMAPHORE_WAIT_METRIC_NAME)) {
stageToGpuSemaphoreWaitTime(stageId) = sum
}
Some(AccumProfileResults(
appIndex,
stageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
* @return a single record of AggRawMetricsResult containing all the raw aggregated Spark
* metrics
*/
def getAggRawMetrics(app: AppBase, index: Int): AggRawMetricsResult = {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
parthosa marked this conversation as resolved.
Show resolved Hide resolved
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
Expand All @@ -38,7 +39,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
analysisObj.aggregateSparkMetricsBySql(index),
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)))
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticSparkMetricsByStage(index, sqlAnalyzer))
}

/**
Expand All @@ -59,7 +61,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
agg1.sqlAggs ++ agg2.sqlAggs,
agg1.ioAggs ++ agg2.ioAggs,
agg1.sqlDurAggs ++ agg2.sqlDurAggs,
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes)
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes,
parthosa marked this conversation as resolved.
Show resolved Hide resolved
agg1.stageDiagnostics ++ agg2.stageDiagnostics)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, TaskModel}

/**
Expand Down Expand Up @@ -320,6 +321,89 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlRows.toSeq
}

/**
* Aggregates the diagnostic SparkMetrics by stage.
* @param index the App-index (used by the profiler tool)
* @return sequence of StageDiagnosticAggTaskMetricsProfileResult
*/
def aggregateDiagnosticSparkMetricsByStage(index: Int,
parthosa marked this conversation as resolved.
Show resolved Hide resolved
analyzerInput: Option[AppSQLPlanAnalyzer] = None):
Seq[StageDiagnosticMetricsProfileResult] = {
def bytesToMB(numBytes: Long): Long = numBytes / (1024 * 1024)
val sqlAnalyzer = analyzerInput match {
case Some(res) => res
case None => app.asInstanceOf[ApplicationInfo].planMetricProcessor
}
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
parthosa marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numAttempts = tasksInStage.size
amahussein marked this conversation as resolved.
Show resolved Hide resolved
val (diskSpilledMin, diskSpilledMed, diskSpilledMax, diskSpilledSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.diskBytesSpilled))
val (memSpilledMin, memSpilledMed, memSpilledMax, memSpilledSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.memoryBytesSpilled))
val (inputBytesMin, inputBytesMed, inputBytesMax, inputBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.input_bytesRead))
val (ouputBytesMin, ouputBytesMed, ouputBytesMax, ouputBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.output_bytesWritten))
val (srBytesMin, srBytesMed, srBytesMax, srBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))
val (swBytesMin, swBytesMed, swBytesMax, swBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sw_bytesWritten))
val (srFetchWaitTimeMin, srFetchWaitTimeMed, srFetchWaitTimeMax, srFetchWaitTimeSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_fetchWaitTime))
val (swWriteTimeMin, swWriteTimeMed, swWriteTimeMax, swWriteTimeSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sw_writeTime))
val nodeNames = sqlAnalyzer.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
val gpuSemaphoreWaitSum = sqlAnalyzer.stageToGpuSemaphoreWaitTime.
getOrElse(sm.stageInfo.stageId, 0L)
StageDiagnosticMetricsProfileResult(index,
app.getAppName,
app.appId,
sm.stageInfo.stageId,
sm.duration,
numAttempts, // TODO: why is this numAttempts and not numTasks?
bytesToMB(memSpilledMin),
bytesToMB(memSpilledMed),
bytesToMB(memSpilledMax),
bytesToMB(memSpilledSum),
bytesToMB(diskSpilledMin),
bytesToMB(diskSpilledMed),
bytesToMB(diskSpilledMax),
bytesToMB(diskSpilledSum),
inputBytesMin,
inputBytesMed,
inputBytesMax,
inputBytesSum,
ouputBytesMin,
ouputBytesMed,
ouputBytesMax,
ouputBytesSum,
srBytesMin,
srBytesMed,
srBytesMax,
srBytesSum,
swBytesMin,
swBytesMed,
swBytesMax,
swBytesSum,
srFetchWaitTimeMin,
srFetchWaitTimeMed,
srFetchWaitTimeMax,
srFetchWaitTimeSum,
swWriteTimeMin,
swWriteTimeMed,
swWriteTimeMax,
swWriteTimeSum,
gpuSemaphoreWaitSum,
nodeNames)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make an encapsulating object for this so that we dont have large arg list as well as a single place to hold the metrics we care about -- easier to update it.

Copy link
Collaborator Author

@cindyyuanjiang cindyyuanjiang Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kuhushukla! Can you elaborate a bit more on this? I thought StageDiagnosticResult is the encapsulating object. It has similar structure as other profiler results, for example - https://github.com/NVIDIA/spark-rapids-tools/blob/dev/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala#L417.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arg list is very large and that on its own would be nice to abstract away in a case class etc.

Copy link
Collaborator Author

@cindyyuanjiang cindyyuanjiang Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @kuhushukla! I experimented with a few things like encapsulating part of the arg list into a separate case class, but overall I think this presentation has the best readability. It also aligns with other classes in this file and current unit tests. We can discuss more offline if there is something else we should try.

parthosa marked this conversation as resolved.
Show resolved Hide resolved
}.toSeq
}

/**
* Aggregates the SparkMetrics by stage. This is an internal method to populate the cached metrics
* to be used by other aggregators.
Expand Down Expand Up @@ -434,6 +518,23 @@ object AppSparkMetricsAnalyzer {
}
}

/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): (Long, Long, Long, Long) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? I thought this information is available and can be simply pulled? Please correct me if I am wrong -- for eg, in the existing profiler o/p where does the median value come from?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the implementation to reuse/pull existing metrics results from ProfStageMetricView. I cannot do this for shuffle read total bytes because in ProfStageMetricView there are 2 metrics associated with this: internal.metrics.shuffle.read.localBytesRead and internal.metrics.shuffle.read.remoteBytesRead. I cannot get the min/med/max of shuffle read total bytes by adding the min/med/max of the 2 metrics. I am keeping this function for now, but if it looks too unnecessary I can remove it.

if (arr.isEmpty) {
(0L, 0L, 0L, 0L)
}
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
val med = if (len % 2 == 0) {
(sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
} else {
sortedArr(len / 2)
}
(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
}

def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ case class ApplicationSummaryInfo(
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent])
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Loading