Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add IO diagnostic output for GPU slowness in Profiler tool #1451

Open
wants to merge 28 commits into
base: dev
Choose a base branch
from

Conversation

cindyyuanjiang
Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang commented Dec 6, 2024

Contributes to #1374

Changes

  • Added an IO diagnostic view in Profiler output: io_diagnostic_metrics.csv
    • Added class IOAccumDiagnosticMetrics to store selected IO related metric names and methods
    • Added class IODiagnosticResult to represent each IO diagnostic result
    • In core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala, cache results from generateSQLAccums and use them to compute IO diagnostic metrics in function generateIODiagnosticAccums
    • Added IODiagnostics in class DiagnosticSummaryInfo
    • Reorganized AccumProfileResults and SQLAccumProfileResults presentation for better readability

Testing

  • Added unit test "test IO diagnostic metrics" in core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala

Example Output

appIndex,appName,appId,sqlId,stageId,stageDurationMs,nodeId,nodeName,outputRowsMin,outputRowsMedian,outputRowsMax,outputRowsTotal,scanTimeMin,scanTimeMedian,scanTimeMax,scanTimeTotal,outputBatchesMin,outputBatchesMedian,outputBatchesMax,outputBatchesTotal,bufferTimeMin,bufferTimeMedian,bufferTimeMax,bufferTimeTotal,shuffleWriteTimeMin,shuffleWriteTimeMedian,shuffleWriteTimeMax,shuffleWriteTimeTotal,fetchWaitTimeMin,fetchWaitTimeMedian,fetchWaitTimeMax,fetchWaitTimeTotal,gpuDecodeTimeMin,gpuDecodeTimeMedian,gpuDecodeTimeMax,gpuDecodeTimeTotal
1,Spark shell,local-1622814619968,0,0,1743,16,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,41434653,60830365,100858775,400284505,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,0,1743,21,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,8,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,37444140,92128351,108992798,508750471,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,13,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,3,"GpuColumnarExchange",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,139875,230038,9747416,93193331,0,0,0,0,0,0,0,0

Follow-up Issue

#1454

@cindyyuanjiang cindyyuanjiang self-assigned this Dec 6, 2024
Signed-off-by: cindyyuanjiang <[email protected]>
@cindyyuanjiang cindyyuanjiang changed the title WIP: [FEA] Add IO diagnostic output for GPU slowness in Profiler tool [FEA] Add IO diagnostic output for GPU slowness in Profiler tool Dec 6, 2024
@cindyyuanjiang cindyyuanjiang marked this pull request as ready for review December 6, 2024 23:42
@cindyyuanjiang cindyyuanjiang requested review from amahussein, nartal1 and parthosa and removed request for amahussein December 6, 2024 23:42
@cindyyuanjiang cindyyuanjiang added feature request New feature or request core_tools Scope the core module (scala) affect-output A change that modifies the output (add/remove/rename files, add/remove/rename columns) labels Dec 6, 2024
Copy link
Collaborator

@nartal1 nartal1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang ! Overall looks good. Were you able to test this PR with large eventlogs? Any impact on the runtime?

val stageDiagnosticInfo = HashMap.empty[String, StatisticsMetrics]

sqlAccums.foreach { sqlAccum =>
val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can move this before sqlAccums.foreach. i.e we can cache stageTaskIds once per stage rather than inside the loop for each accumulator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @nartal1, updated this.

Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang. Made some minor comments.

new AccumInfo(AccumMetaRef(0L, AccumNameRef("")))
)
// Compute the metric's statistics (min, median, max, sum) for the given stage
val accumInfoStatistics = getAccumInfoStatisticsInStage(accumInfo, stageTaskIds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block seems to be deconstructing the tuple returned from getAccumInfoStatisticsInStage() and creates an instance of StatisticsMetrics using the individual values (min, med, max, sum).

Can getAccumInfoStatisticsInStage() directly return an Option[StatisticsMetrics] ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAccumInfoStatisticsInStage is also used in generateStageLevelAccums. I didn't want to create an extra object in this function. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A tuple (Long, Long, Long, Long) is also an object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored this part, please see new changes.

* @return A sequence of `IODiagnosticResult` objects containing diagnostic metrics.
*/
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = {
val zeroRecord = StatisticsMetrics.ZERO_RECORD
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we move zeroRecord to the else block where it is being used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated.

}

/**
* Normalize a metric name to its IO diagnostic metric constant
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems to be generic. I believe we are performing normalization because we want to support variations in output rows such as join output rows, number of output rows

Can we specify the reason for this normalization in the comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated.

outputBatchesMed: Long,
outputBatchesMax: Long,
outputBatchesSum: Long,
buffeTimeMin: Long,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bufferTime

Seq(appIndex.toString, sqlID.toString, nodeID.toString,
StringUtils.reformatCSVString(nodeName), accumulatorId.toString,
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString,
total.toString, StringUtils.reformatCSVString(metricType),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

case class AccumProfileResults(appIndex: Int, stageId: Int, accMetaRef: AccumMetaRef,
min: Long, median: Long, max: Long, total: Long) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min",
"median", "max", "total")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.


override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, stageId.toString, accMetaRef.id.toString, accMetaRef.getName(),
min.toString, median.toString, max.toString, total.toString)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, stageId.toString, accMetaRef.id.toString,
accMetaRef.name.csvValue, min.toString,
median.toString, max.toString, total.toString)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

@amahussein
Copy link
Collaborator

@nartal1 thanks! Yes, I ran the performance benchmark on a ~350MB zstd eventlog, the performance is less than 1% slower than the current dev branch.

Thanks @cindyyuanjiang for running performance evaluation.
A 350MB is a little bit small eventlog to start noticing an impact of the code change.
We need to look into the CPUTime of the aggregation methods before and after the code change.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang
While I was working on #1461, I found some potential performance optimizations in AggregateStage which will conflict with the current PR.

new AccumInfo(AccumMetaRef(0L, AccumNameRef("")))
)
// Compute the metric's statistics (min, median, max, sum) for the given stage
val accumInfoStatistics = getAccumInfoStatisticsInStage(accumInfo, stageTaskIds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A tuple (Long, Long, Long, Long) is also an object.

sqlAccums.foreach { sqlAccum =>
val accumInfo = app.accumManager.accumInfoMap.getOrElse(
sqlAccum.accumulatorId,
new AccumInfo(AccumMetaRef(0L, AccumNameRef("")))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could cause potential problems.
I made a comment about that in a different PR #1468 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @amahussein , updated this. Also refactored usage of filterKeys in this PR.

@cindyyuanjiang
Copy link
Collaborator Author

@nartal1 thanks! Yes, I ran the performance benchmark on a ~350MB zstd eventlog, the performance is less than 1% slower than the current dev branch.

Thanks @cindyyuanjiang for running performance evaluation. A 350MB is a little bit small eventlog to start noticing an impact of the code change. We need to look into the CPUTime of the aggregation methods before and after the code change.

Thanks @amahussein! This is the eventlog you shared earlier with me for performance testing. Profiler tool takes about 1.2 hours to run on this eventlog on my local desktop. I will follow up with the aggregation methods CPU time offline.

@amahussein
Copy link
Collaborator

@nartal1 thanks! Yes, I ran the performance benchmark on a ~350MB zstd eventlog, the performance is less than 1% slower than the current dev branch.

Thanks @cindyyuanjiang for running performance evaluation. A 350MB is a little bit small eventlog to start noticing an impact of the code change. We need to look into the CPUTime of the aggregation methods before and after the code change.

Thanks @amahussein! This is the eventlog you shared earlier with me for performance testing. Profiler tool takes about 1.2 hours to run on this eventlog on my local desktop. I will follow up with the aggregation methods CPU time offline.

The eventlog should not take that long even before we merged #1468

We can re-iterate further offline.

nanoToMilliSec(result.srFetchWaitTime.max), nanoToMilliSec(result.srFetchWaitTime.total),
nanoToMilliSec(result.swWriteTime.min), nanoToMilliSec(result.swWriteTime.median),
nanoToMilliSec(result.swWriteTime.max), nanoToMilliSec(result.swWriteTime.total),
result.gpuSemaphoreWait.total, result.nodeNames)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reformat for better readability.

@@ -74,16 +76,33 @@ case class JobInfoProfileResult(
sqlID: Option[Long],
startTime: Long,
endTime: Option[Long]) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "jobID", "stageIds", "sqlID", "startTime", "endTime")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated format for better readability.

override def convertToSeq: Seq[String] = {
val stageIdStr = s"[${stageIds.mkString(",")}]"
Seq(appIndex.toString, jobID.toString, stageIdStr, sqlID.map(_.toString).getOrElse(null),
startTime.toString, endTime.map(_.toString).getOrElse(null))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated format for better readability.

override def convertToCSVSeq: Seq[String] = {
val stageIdStr = s"[${stageIds.mkString(",")}]"
Seq(appIndex.toString, jobID.toString, StringUtils.reformatCSVString(stageIdStr),
sqlID.map(_.toString).getOrElse(null), startTime.toString,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated format for better readability.

Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Dec 24, 2024

New iteration changes:

  1. Move AnalysisUtils.scala file under newly added util folder and rename to DiagnosticMetrics.scala
  2. Remove stageIds from IODiagnosticMetricsMap keys b/c they are unnecessary (same node ID will have same stage IDs)
  3. Change metricNamesToKeyMap schema from "set[string] to string" to "string to string"
  4. Change stageIds to Set[Int] in SQLAccumProfileResults to avoid unnecessary conversions

Ideas need further discussions:

  1. How to deal with accumulators from driverAccumMap in generateIODiagnosticAccums
  2. Now IODiagnosticMetricsMap key is a tuple (Long, Long), is this necessary to change this to a composite map or create a dummy key to save memory?

cc: @amahussein

Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang. Few comments.

@@ -36,7 +36,7 @@ case class AccumNameRef(value: String) {
// create a new CSV string even though they represent the same AccumulatorName.
val csvValue: String = StringUtils.reformatCSVString(value)

def isDiagnosticMetrics(): Boolean = getAllDiagnosticMetrics.contains(value)
def isDiagnosticMetrics(): Boolean = allDiagnosticMetrics.contains(value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: () can be removed for accessor like method

@@ -245,7 +262,7 @@ case class SQLAccumProfileResults(
max.toString,
total.toString,
metricType,
stageIds)
stageIds.mkString(","))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are computing stageIds.mkString(",") twice in this case class. Can we avoid this redundant computation by storing it in this case class?

/**
* Check if a metric name belongs to IO diagnostic metrics
*/
def isIODiagnosticMetricName(metric: String): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a similar method isDiagnosticMetrics() that has been included in class AccumNameRef where as
isIODiagnosticMetricName() is part of object IOAccumDiagnosticMetrics.

Can we improve the consistency (maybe include isDiagnosticMetrics in its object StageAccumDiagnosticMetrics)?

val csvValue: String = StringUtils.reformatCSVString(value)
def isDiagnosticMetrics(): Boolean = allDiagnosticMetrics.contains(value)

}

// Compute the metric's statistics and store the results if available
metricStats match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use .map() instead of match{} for a more functional approach?

// TODO: check if accumulator ID is in driverAccumMap, currently skipped
val accumInfo = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId)

val metricStats: Option[StatisticsMetrics] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use accumInfoOpt.flatMap() for a functional approach?

stageTaskIds.collect {
case taskId if accumInfo.taskUpdatesMap.contains(taskId) =>
accumInfo.taskUpdatesMap(taskId)
}(breakOut)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of breakOut here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affect-output A change that modifies the output (add/remove/rename files, add/remove/rename columns) core_tools Scope the core module (scala) feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants