-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] Add IO diagnostic output for GPU slowness in Profiler tool #1451
base: dev
Are you sure you want to change the base?
Conversation
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang ! Overall looks good. Were you able to test this PR with large eventlogs? Any impact on the runtime?
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AnalysisUtils.scala
Outdated
Show resolved
Hide resolved
val stageDiagnosticInfo = HashMap.empty[String, StatisticsMetrics] | ||
|
||
sqlAccums.foreach { sqlAccum => | ||
val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We can move this before sqlAccums.foreach. i.e we can cache stageTaskIds once per stage rather than inside the loop for each accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @nartal1, updated this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang. Made some minor comments.
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
new AccumInfo(AccumMetaRef(0L, AccumNameRef(""))) | ||
) | ||
// Compute the metric's statistics (min, median, max, sum) for the given stage | ||
val accumInfoStatistics = getAccumInfoStatisticsInStage(accumInfo, stageTaskIds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block seems to be deconstructing the tuple returned from getAccumInfoStatisticsInStage()
and creates an instance of StatisticsMetrics using the individual values (min, med, max, sum).
Can getAccumInfoStatisticsInStage()
directly return an Option[StatisticsMetrics]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAccumInfoStatisticsInStage
is also used in generateStageLevelAccums
. I didn't want to create an extra object in this function. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A tuple (Long, Long, Long, Long)
is also an object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this part, please see new changes.
* @return A sequence of `IODiagnosticResult` objects containing diagnostic metrics. | ||
*/ | ||
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = { | ||
val zeroRecord = StatisticsMetrics.ZERO_RECORD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we move zeroRecord
to the else block where it is being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, updated.
} | ||
|
||
/** | ||
* Normalize a metric name to its IO diagnostic metric constant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems to be generic. I believe we are performing normalization because we want to support variations in output rows such as join output rows, number of output rows
Can we specify the reason for this normalization in the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, updated.
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Outdated
Show resolved
Hide resolved
outputBatchesMed: Long, | ||
outputBatchesMax: Long, | ||
outputBatchesSum: Long, | ||
buffeTimeMin: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bufferTime
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AnalysisUtils.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Seq(appIndex.toString, sqlID.toString, nodeID.toString, | ||
StringUtils.reformatCSVString(nodeName), accumulatorId.toString, | ||
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString, | ||
total.toString, StringUtils.reformatCSVString(metricType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
case class AccumProfileResults(appIndex: Int, stageId: Int, accMetaRef: AccumMetaRef, | ||
min: Long, median: Long, max: Long, total: Long) extends ProfileResult { | ||
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min", | ||
"median", "max", "total") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
|
||
override def convertToSeq: Seq[String] = { | ||
Seq(appIndex.toString, stageId.toString, accMetaRef.id.toString, accMetaRef.getName(), | ||
min.toString, median.toString, max.toString, total.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
} | ||
|
||
override def convertToCSVSeq: Seq[String] = { | ||
Seq(appIndex.toString, stageId.toString, accMetaRef.id.toString, | ||
accMetaRef.name.csvValue, min.toString, | ||
median.toString, max.toString, total.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
Thanks @cindyyuanjiang for running performance evaluation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang
While I was working on #1461, I found some potential performance optimizations in AggregateStage which will conflict with the current PR.
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
Outdated
Show resolved
Hide resolved
new AccumInfo(AccumMetaRef(0L, AccumNameRef(""))) | ||
) | ||
// Compute the metric's statistics (min, median, max, sum) for the given stage | ||
val accumInfoStatistics = getAccumInfoStatisticsInStage(accumInfo, stageTaskIds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A tuple (Long, Long, Long, Long)
is also an object.
sqlAccums.foreach { sqlAccum => | ||
val accumInfo = app.accumManager.accumInfoMap.getOrElse( | ||
sqlAccum.accumulatorId, | ||
new AccumInfo(AccumMetaRef(0L, AccumNameRef(""))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could cause potential problems.
I made a comment about that in a different PR #1468 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @amahussein , updated this. Also refactored usage of filterKeys
in this PR.
Thanks @amahussein! This is the eventlog you shared earlier with me for performance testing. Profiler tool takes about 1.2 hours to run on this eventlog on my local desktop. I will follow up with the aggregation methods CPU time offline. |
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
…when unnecessary Signed-off-by: cindyyuanjiang <[email protected]>
The eventlog should not take that long even before we merged #1468
We can re-iterate further offline. |
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
nanoToMilliSec(result.srFetchWaitTime.max), nanoToMilliSec(result.srFetchWaitTime.total), | ||
nanoToMilliSec(result.swWriteTime.min), nanoToMilliSec(result.swWriteTime.median), | ||
nanoToMilliSec(result.swWriteTime.max), nanoToMilliSec(result.swWriteTime.total), | ||
result.gpuSemaphoreWait.total, result.nodeNames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reformat for better readability.
@@ -74,16 +76,33 @@ case class JobInfoProfileResult( | |||
sqlID: Option[Long], | |||
startTime: Long, | |||
endTime: Option[Long]) extends ProfileResult { | |||
override val outputHeaders = Seq("appIndex", "jobID", "stageIds", "sqlID", "startTime", "endTime") | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated format for better readability.
override def convertToSeq: Seq[String] = { | ||
val stageIdStr = s"[${stageIds.mkString(",")}]" | ||
Seq(appIndex.toString, jobID.toString, stageIdStr, sqlID.map(_.toString).getOrElse(null), | ||
startTime.toString, endTime.map(_.toString).getOrElse(null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated format for better readability.
override def convertToCSVSeq: Seq[String] = { | ||
val stageIdStr = s"[${stageIds.mkString(",")}]" | ||
Seq(appIndex.toString, jobID.toString, StringUtils.reformatCSVString(stageIdStr), | ||
sqlID.map(_.toString).getOrElse(null), startTime.toString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated format for better readability.
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
New iteration changes:
Ideas need further discussions:
cc: @amahussein |
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang. Few comments.
@@ -36,7 +36,7 @@ case class AccumNameRef(value: String) { | |||
// create a new CSV string even though they represent the same AccumulatorName. | |||
val csvValue: String = StringUtils.reformatCSVString(value) | |||
|
|||
def isDiagnosticMetrics(): Boolean = getAllDiagnosticMetrics.contains(value) | |||
def isDiagnosticMetrics(): Boolean = allDiagnosticMetrics.contains(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ()
can be removed for accessor like method
@@ -245,7 +262,7 @@ case class SQLAccumProfileResults( | |||
max.toString, | |||
total.toString, | |||
metricType, | |||
stageIds) | |||
stageIds.mkString(",")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are computing stageIds.mkString(",")
twice in this case class. Can we avoid this redundant computation by storing it in this case class?
/** | ||
* Check if a metric name belongs to IO diagnostic metrics | ||
*/ | ||
def isIODiagnosticMetricName(metric: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a similar method isDiagnosticMetrics()
that has been included in class AccumNameRef
where as
isIODiagnosticMetricName()
is part of object IOAccumDiagnosticMetrics
.
Can we improve the consistency (maybe include isDiagnosticMetrics in its object StageAccumDiagnosticMetrics
)?
spark-rapids-tools/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala
Lines 37 to 39 in cc80b48
val csvValue: String = StringUtils.reformatCSVString(value) | |
def isDiagnosticMetrics(): Boolean = allDiagnosticMetrics.contains(value) |
} | ||
|
||
// Compute the metric's statistics and store the results if available | ||
metricStats match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use .map()
instead of match{}
for a more functional approach?
// TODO: check if accumulator ID is in driverAccumMap, currently skipped | ||
val accumInfo = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId) | ||
|
||
val metricStats: Option[StatisticsMetrics] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use accumInfoOpt.flatMap()
for a functional approach?
stageTaskIds.collect { | ||
case taskId if accumInfo.taskUpdatesMap.contains(taskId) => | ||
accumInfo.taskUpdatesMap(taskId) | ||
}(breakOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good use of breakOut
here
Contributes to #1374
Changes
io_diagnostic_metrics.csv
IOAccumDiagnosticMetrics
to store selected IO related metric names and methodsIODiagnosticResult
to represent each IO diagnostic resultcore/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
, cache results fromgenerateSQLAccums
and use them to compute IO diagnostic metrics in functiongenerateIODiagnosticAccums
IODiagnostics
in classDiagnosticSummaryInfo
AccumProfileResults
andSQLAccumProfileResults
presentation for better readabilityTesting
core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala
Example Output
Follow-up Issue
#1454