Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add stage/task level diagnostic output for GPU slowness in Profiler tool #1375

Merged
merged 30 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1265e47
initial implementation
cindyyuanjiang Oct 2, 2024
6aefee9
updated output schema based on offline discussion
cindyyuanjiang Oct 4, 2024
5a56d30
address feedback to merge two tables together
cindyyuanjiang Oct 9, 2024
be13fa8
update order of columns
cindyyuanjiang Oct 12, 2024
7b895c5
get gpu semaphore time
cindyyuanjiang Oct 14, 2024
68c34f6
add benchmark
cindyyuanjiang Oct 21, 2024
20dd4c8
clean up code
cindyyuanjiang Oct 23, 2024
41f6ff0
resolve merge conflict
cindyyuanjiang Oct 23, 2024
7cd25e9
add unit test
cindyyuanjiang Oct 25, 2024
f740192
new expectation file
cindyyuanjiang Oct 25, 2024
6931086
resolve merge conflict
cindyyuanjiang Oct 25, 2024
2dcdb9b
address review feedback
cindyyuanjiang Oct 30, 2024
ac162f0
address review feedback
cindyyuanjiang Oct 30, 2024
2d53682
add new file
cindyyuanjiang Oct 30, 2024
9d53668
remove unnecessary comment
cindyyuanjiang Oct 30, 2024
23946bc
address review feedback
cindyyuanjiang Oct 31, 2024
b583126
refactored for memory optimization
cindyyuanjiang Nov 6, 2024
058686d
addressed review feedback
cindyyuanjiang Nov 6, 2024
bdd2292
refactor stageDiagnosticResults
cindyyuanjiang Nov 9, 2024
3a8cf9e
change num attemps to tasks
cindyyuanjiang Nov 12, 2024
1361d53
remove diagnostic from applicationsummaryinfo
cindyyuanjiang Nov 13, 2024
c54a4b7
remove unused import
cindyyuanjiang Nov 13, 2024
882d403
new file
cindyyuanjiang Nov 13, 2024
f985b44
Merge branch 'dev' into profiler-diagnostic
cindyyuanjiang Nov 14, 2024
f3b78ff
add diagnostic view in qual tool output
cindyyuanjiang Nov 15, 2024
8b317e6
remove diagnostic vire from qual tool profile.log file
cindyyuanjiang Nov 15, 2024
ebfc6e3
address review feedback
cindyyuanjiang Nov 15, 2024
b88119a
Merge branch 'dev' into profiler-diagnostic
cindyyuanjiang Nov 19, 2024
056d4b2
add profile benchmark class
cindyyuanjiang Nov 20, 2024
de47ef4
fix profiler benchmark
cindyyuanjiang Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -32,6 +32,7 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param maxTaskInputSizes a sequence of SQLMaxTaskInputSizes that contains the maximum input size
* @param stageDiagnostics the stage level Spark metrics for diagnostic purposes
*/
case class AggRawMetricsResult(
jobAggs: Seq[JobAggTaskMetricsProfileResult],
Expand All @@ -40,4 +41,5 @@ case class AggRawMetricsResult(
sqlAggs: Seq[SQLTaskAggMetricsProfileResult],
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes])
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnostics: Seq[StageDiagnosticResult])
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis

object StageAccumDiagnosticMetrics {
val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled"
val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled"
val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead"
val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten"
val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten"
val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime"
val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime"
val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait"

/**
* Get all diagnostic metrics
*/
def getAllDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC,
DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC,
SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC,
GPU_SEMAPHORE_WAIT_METRIC)
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// SQLPlanParser.
var unsupportedSQLPlan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and a set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and diagnostic metrics results (stored as a map between metric name
// and AccumProfileResults)
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] =
HashMap.empty[Long, HashMap[String, AccumProfileResults]]

/**
* Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping
* @param accum AccumProfileResults to be analyzed
*/
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = {
val stageId = accum.stageId
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults]
}
stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum
}

/**
* Connects Operators to Stages using AccumulatorIDs.
Expand Down Expand Up @@ -261,6 +279,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
validNodes.map(n => s"${n.name}(${n.id.toString})")
}.getOrElse(Seq.empty)
stageToNodeNames(sModel.stageInfo.stageId) = nodeNames
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
Expand Down Expand Up @@ -339,14 +358,19 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
Some(AccumProfileResults(
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum))
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
* object to aggregate the Raw metrics and returns the result
* @param app the AppBase to be analyzed
* @param index the application index
* @param sqlAnalyzer optional AppSQLPlanAnalyzer used to aggregate diagnostic metrics,
* this is already present in ApplicationInfo for Profiler, but for
* Qualification this argument needs to be provided.
* @return a single record of AggRawMetricsResult containing all the raw aggregated Spark
* metrics
*/
def getAggRawMetrics(app: AppBase, index: Int): AggRawMetricsResult = {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
parthosa marked this conversation as resolved.
Show resolved Hide resolved
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
Expand All @@ -38,7 +42,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
analysisObj.aggregateSparkMetricsBySql(index),
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)))
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
}

/**
Expand All @@ -59,7 +64,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
agg1.sqlAggs ++ agg2.sqlAggs,
agg1.ioAggs ++ agg2.ioAggs,
agg1.sqlDurAggs ++ agg2.sqlDurAggs,
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes)
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes,
parthosa marked this conversation as resolved.
Show resolved Hide resolved
agg1.stageDiagnostics ++ agg2.stageDiagnostics)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package com.nvidia.spark.rapids.tool.analysis

import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, TaskModel}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef, TaskModel}

/**
* Does analysis on the DataFrames from object of AppBase.
Expand All @@ -50,14 +52,14 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// Hashmap to cache the stage level metrics. It is initialized to None just in case the caller
// does not call methods in order starting with stage level metrics.
private var stageLevelCache:
Option[mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult]] = None
Option[LinkedHashMap[Int, StageAggTaskMetricsProfileResult]] = None

// Getter method used to protect the cache from out-of-order calls.
// If the stage-level metrics are not generated yet, generates and add them to the cache
private def stageLevelSparkMetrics(
index: Int): mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult] = {
index: Int): LinkedHashMap[Int, StageAggTaskMetricsProfileResult] = {
if (stageLevelCache.isEmpty) {
stageLevelCache = Some(mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult]())
stageLevelCache = Some(LinkedHashMap[Int, StageAggTaskMetricsProfileResult]())
aggregateSparkMetricsByStageInternal(index)
}
stageLevelCache.get
Expand Down Expand Up @@ -320,6 +322,62 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlRows.toSeq
}

/**
* Aggregates the diagnostic SparkMetrics by stage.
* @param index the App-index (used by the profiler tool)
* @param analyzer optional AppSQLPlanAnalyzer which is used to pull stage level
* information like node names and diagnostic metrics results, only
* Qualification needs to provide this argument.
* @return sequence of StageDiagnosticAggTaskMetricsProfileResult
*/
def aggregateDiagnosticMetricsByStage(index: Int, analyzer: Option[AppSQLPlanAnalyzer] = None):
Seq[StageDiagnosticResult] = {
val sqlAnalyzer = analyzer match {
case Some(res) => res
case None =>
// for Profiler this is present in ApplicationInfo
app.asInstanceOf[ApplicationInfo].planMetricProcessor
}
val zeroAccumProfileResults =
AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L)

// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
parthosa marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numTasks = tasksInStage.size
val nodeNames = sqlAnalyzer.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics.
getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]).
withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))

StageDiagnosticResult(index,
app.getAppName,
app.appId,
sm.stageInfo.stageId,
sm.duration,
numTasks,
srTotalBytesMetrics.min,
srTotalBytesMetrics.med,
srTotalBytesMetrics.max,
srTotalBytesMetrics.total,
diagnosticMetricsMap(MEMORY_SPILLED_METRIC),
diagnosticMetricsMap(DISK_SPILLED_METRIC),
diagnosticMetricsMap(INPUT_BYTES_READ_METRIC),
diagnosticMetricsMap(OUTPUT_BYTES_WRITTEN_METRIC),
diagnosticMetricsMap(SW_TOTAL_BYTES_METRIC),
diagnosticMetricsMap(SR_FETCH_WAIT_TIME_METRIC),
diagnosticMetricsMap(SW_WRITE_TIME_METRIC),
diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC),
nodeNames)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make an encapsulating object for this so that we dont have large arg list as well as a single place to hold the metrics we care about -- easier to update it.

Copy link
Collaborator Author

@cindyyuanjiang cindyyuanjiang Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kuhushukla! Can you elaborate a bit more on this? I thought StageDiagnosticResult is the encapsulating object. It has similar structure as other profiler results, for example - https://github.com/NVIDIA/spark-rapids-tools/blob/dev/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala#L417.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arg list is very large and that on its own would be nice to abstract away in a case class etc.

Copy link
Collaborator Author

@cindyyuanjiang cindyyuanjiang Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @kuhushukla! I experimented with a few things like encapsulating part of the arg list into a separate case class, but overall I think this presentation has the best readability. It also aligns with other classes in this file and current unit tests. We can discuss more offline if there is something else we should try.

parthosa marked this conversation as resolved.
Show resolved Hide resolved
}.toSeq
}

/**
* Aggregates the SparkMetrics by stage. This is an internal method to populate the cached metrics
* to be used by other aggregators.
Expand All @@ -336,8 +394,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// Note:
// - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type,
// but avoiding it for readability.
val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonPeakMemoryAccumInfos = ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = ArrayBuffer[AccumInfo]()

if (app.isPhoton) {
app.accumManager.applyToAccumInfoMap { accumInfo =>
Expand Down Expand Up @@ -434,6 +492,23 @@ object AppSparkMetricsAnalyzer {
}
}

/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
StatisticsMetrics(0L, 0L, 0L, 0L)
}
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
val med = if (len % 2 == 0) {
(sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
} else {
sortedArr(len / 2)
}
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}

def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

/**
* Stores Profiler diagnostic info.
* TODO: We plan to add two more fields/views in upcoming PRs.
*/
case class DiagnosticSummaryInfo(
stageDiagnostics: Seq[StageDiagnosticResult]
)
Loading