Skip to content

Commit

Permalink
[FEA] Add stage/task level diagnostic output for GPU slowness in Prof…
Browse files Browse the repository at this point in the history
…iler tool (#1375)

* initial implementation

Signed-off-by: cindyyuanjiang <[email protected]>

* updated output schema based on offline discussion

Signed-off-by: cindyyuanjiang <[email protected]>

* address feedback to merge two tables together

Signed-off-by: cindyyuanjiang <[email protected]>

* update order of columns

Signed-off-by: cindyyuanjiang <[email protected]>

* get gpu semaphore time

Signed-off-by: cindyyuanjiang <[email protected]>

* add benchmark

Signed-off-by: cindyyuanjiang <[email protected]>

* clean up code

Signed-off-by: cindyyuanjiang <[email protected]>

* add unit test

Signed-off-by: cindyyuanjiang <[email protected]>

* new expectation file

Signed-off-by: cindyyuanjiang <[email protected]>

* address review feedback

Signed-off-by: cindyyuanjiang <[email protected]>

* address review feedback

Signed-off-by: cindyyuanjiang <[email protected]>

* add new file

Signed-off-by: cindyyuanjiang <[email protected]>

* remove unnecessary comment

Signed-off-by: cindyyuanjiang <[email protected]>

* address review feedback

Signed-off-by: cindyyuanjiang <[email protected]>

* refactored for memory optimization

Signed-off-by: cindyyuanjiang <[email protected]>

* addressed review feedback

Signed-off-by: cindyyuanjiang <[email protected]>

* refactor stageDiagnosticResults

Signed-off-by: cindyyuanjiang <[email protected]>

* change num attemps to tasks

Signed-off-by: cindyyuanjiang <[email protected]>

* remove diagnostic from applicationsummaryinfo

Signed-off-by: cindyyuanjiang <[email protected]>

* remove unused import

Signed-off-by: cindyyuanjiang <[email protected]>

* new file

Signed-off-by: cindyyuanjiang <[email protected]>

* add diagnostic view in qual tool output

Signed-off-by: cindyyuanjiang <[email protected]>

* remove diagnostic vire from qual tool profile.log file

Signed-off-by: cindyyuanjiang <[email protected]>

* address review feedback

Signed-off-by: cindyyuanjiang <[email protected]>

* add profile benchmark class

Signed-off-by: cindyyuanjiang <[email protected]>

* fix profiler benchmark

Signed-off-by: cindyyuanjiang <[email protected]>

---------

Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Nov 22, 2024
1 parent 9cba927 commit de40e8d
Show file tree
Hide file tree
Showing 19 changed files with 560 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -32,6 +32,7 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param maxTaskInputSizes a sequence of SQLMaxTaskInputSizes that contains the maximum input size
* @param stageDiagnostics the stage level Spark metrics for diagnostic purposes
*/
case class AggRawMetricsResult(
jobAggs: Seq[JobAggTaskMetricsProfileResult],
Expand All @@ -40,4 +41,5 @@ case class AggRawMetricsResult(
sqlAggs: Seq[SQLTaskAggMetricsProfileResult],
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes])
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnostics: Seq[StageDiagnosticResult])
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis

object StageAccumDiagnosticMetrics {
val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled"
val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled"
val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead"
val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten"
val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten"
val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime"
val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime"
val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait"

/**
* Get all diagnostic metrics
*/
def getAllDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC,
DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC,
SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC,
GPU_SEMAPHORE_WAIT_METRIC)
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// SQLPlanParser.
var unsupportedSQLPlan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and a set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and diagnostic metrics results (stored as a map between metric name
// and AccumProfileResults)
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] =
HashMap.empty[Long, HashMap[String, AccumProfileResults]]

/**
* Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping
* @param accum AccumProfileResults to be analyzed
*/
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = {
val stageId = accum.stageId
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults]
}
stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum
}

/**
* Connects Operators to Stages using AccumulatorIDs.
Expand Down Expand Up @@ -261,6 +279,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
validNodes.map(n => s"${n.name}(${n.id.toString})")
}.getOrElse(Seq.empty)
stageToNodeNames(sModel.stageInfo.stageId) = nodeNames
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
Expand Down Expand Up @@ -339,14 +358,19 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
Some(AccumProfileResults(
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum))
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
* object to aggregate the Raw metrics and returns the result
* @param app the AppBase to be analyzed
* @param index the application index
* @param sqlAnalyzer optional AppSQLPlanAnalyzer used to aggregate diagnostic metrics,
* this is already present in ApplicationInfo for Profiler, but for
* Qualification this argument needs to be provided.
* @return a single record of AggRawMetricsResult containing all the raw aggregated Spark
* metrics
*/
def getAggRawMetrics(app: AppBase, index: Int): AggRawMetricsResult = {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
Expand All @@ -38,7 +42,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
analysisObj.aggregateSparkMetricsBySql(index),
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)))
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
}

/**
Expand All @@ -59,7 +64,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
agg1.sqlAggs ++ agg2.sqlAggs,
agg1.ioAggs ++ agg2.ioAggs,
agg1.sqlDurAggs ++ agg2.sqlDurAggs,
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes)
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes,
agg1.stageDiagnostics ++ agg2.stageDiagnostics)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package com.nvidia.spark.rapids.tool.analysis

import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, TaskModel}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef, TaskModel}

/**
* Does analysis on the DataFrames from object of AppBase.
Expand All @@ -50,14 +52,14 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// Hashmap to cache the stage level metrics. It is initialized to None just in case the caller
// does not call methods in order starting with stage level metrics.
private var stageLevelCache:
Option[mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult]] = None
Option[LinkedHashMap[Int, StageAggTaskMetricsProfileResult]] = None

// Getter method used to protect the cache from out-of-order calls.
// If the stage-level metrics are not generated yet, generates and add them to the cache
private def stageLevelSparkMetrics(
index: Int): mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult] = {
index: Int): LinkedHashMap[Int, StageAggTaskMetricsProfileResult] = {
if (stageLevelCache.isEmpty) {
stageLevelCache = Some(mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult]())
stageLevelCache = Some(LinkedHashMap[Int, StageAggTaskMetricsProfileResult]())
aggregateSparkMetricsByStageInternal(index)
}
stageLevelCache.get
Expand Down Expand Up @@ -320,6 +322,62 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlRows.toSeq
}

/**
* Aggregates the diagnostic SparkMetrics by stage.
* @param index the App-index (used by the profiler tool)
* @param analyzer optional AppSQLPlanAnalyzer which is used to pull stage level
* information like node names and diagnostic metrics results, only
* Qualification needs to provide this argument.
* @return sequence of StageDiagnosticAggTaskMetricsProfileResult
*/
def aggregateDiagnosticMetricsByStage(index: Int, analyzer: Option[AppSQLPlanAnalyzer] = None):
Seq[StageDiagnosticResult] = {
val sqlAnalyzer = analyzer match {
case Some(res) => res
case None =>
// for Profiler this is present in ApplicationInfo
app.asInstanceOf[ApplicationInfo].planMetricProcessor
}
val zeroAccumProfileResults =
AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L)

// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numTasks = tasksInStage.size
val nodeNames = sqlAnalyzer.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics.
getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]).
withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))

StageDiagnosticResult(index,
app.getAppName,
app.appId,
sm.stageInfo.stageId,
sm.duration,
numTasks,
srTotalBytesMetrics.min,
srTotalBytesMetrics.med,
srTotalBytesMetrics.max,
srTotalBytesMetrics.total,
diagnosticMetricsMap(MEMORY_SPILLED_METRIC),
diagnosticMetricsMap(DISK_SPILLED_METRIC),
diagnosticMetricsMap(INPUT_BYTES_READ_METRIC),
diagnosticMetricsMap(OUTPUT_BYTES_WRITTEN_METRIC),
diagnosticMetricsMap(SW_TOTAL_BYTES_METRIC),
diagnosticMetricsMap(SR_FETCH_WAIT_TIME_METRIC),
diagnosticMetricsMap(SW_WRITE_TIME_METRIC),
diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC),
nodeNames)
}.toSeq
}

/**
* Aggregates the SparkMetrics by stage. This is an internal method to populate the cached metrics
* to be used by other aggregators.
Expand All @@ -336,8 +394,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// Note:
// - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type,
// but avoiding it for readability.
val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonPeakMemoryAccumInfos = ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = ArrayBuffer[AccumInfo]()

if (app.isPhoton) {
app.accumManager.applyToAccumInfoMap { accumInfo =>
Expand Down Expand Up @@ -434,6 +492,23 @@ object AppSparkMetricsAnalyzer {
}
}

/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
StatisticsMetrics(0L, 0L, 0L, 0L)
}
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
val med = if (len % 2 == 0) {
(sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
} else {
sortedArr(len / 2)
}
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}

def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

/**
* Stores Profiler diagnostic info.
* TODO: We plan to add two more fields/views in upcoming PRs.
*/
case class DiagnosticSummaryInfo(
stageDiagnostics: Seq[StageDiagnosticResult]
)
Loading

0 comments on commit de40e8d

Please sign in to comment.