-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] Add IO diagnostic output for GPU slowness in Profiler tool #1451
base: dev
Are you sure you want to change the base?
Changes from 24 commits
5d36743
5a62749
6978f79
e0fe692
b2a8c0e
e0d6a0c
03087c0
c0e44b2
66c9762
45712e2
3786fd9
f2131ee
5dabc28
f668dda
e9c9508
0bd1e7f
f8346dd
550406c
6d0d798
1ae6143
be428e9
e882518
735546a
43f5c84
e8c2814
b97e2db
631b9c0
cc80b48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,15 +19,16 @@ package com.nvidia.spark.rapids.tool.analysis | |
import scala.collection.breakOut | ||
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet} | ||
|
||
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} | ||
import com.nvidia.spark.rapids.tool.analysis.util.IOAccumDiagnosticMetrics._ | ||
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IODiagnosticResult, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} | ||
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer | ||
|
||
import org.apache.spark.sql.execution.SparkPlanInfo | ||
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} | ||
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry} | ||
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo | ||
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo | ||
import org.apache.spark.sql.rapids.tool.store.DataSourceRecord | ||
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, DataSourceRecord} | ||
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph | ||
|
||
/** | ||
|
@@ -58,23 +59,82 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap | |
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]() | ||
// A map between stage ID and a set of node names | ||
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]] | ||
// A map between stage ID and diagnostic metrics results (stored as a map between metric name | ||
// and AccumProfileResults) | ||
|
||
// A mapping from stage ID to diagnostic metrics results. | ||
// Each stage ID maps to another HashMap, where: | ||
// - The key is the diagnostic metric name (String). | ||
// - The value is an AccumProfileResults object containing the diagnostic data for that metric. | ||
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] = | ||
HashMap.empty[Long, HashMap[String, AccumProfileResults]] | ||
|
||
// A mapping from a unique combination of SQL execution identifiers to a list of IO diagnostic | ||
// metrics results. | ||
// The key is a tuple consisting of: | ||
// - sqlID (Long): The unique identifier for the SQL query. | ||
// - nodeID (Long): The unique identifier for the node. | ||
// The value is an ArrayBuffer of SQLAccumProfileResults objects, storing the IO diagnostic | ||
// metrics for the given key. | ||
val IODiagnosticMetricsMap: HashMap[(Long, Long), ArrayBuffer[SQLAccumProfileResults]] = | ||
HashMap.empty[(Long, Long), ArrayBuffer[SQLAccumProfileResults]] | ||
|
||
/** | ||
* Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping | ||
* @param accum AccumProfileResults to be analyzed | ||
* Updates the stageToDiagnosticMetrics mapping with the provided AccumProfileResults. | ||
* @param accum AccumProfileResults instance containing diagnostic metrics to be added | ||
* to stageToDiagnosticMetrics mapping. | ||
*/ | ||
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = { | ||
val stageId = accum.stageId | ||
|
||
// Initialize an empty mapping for the stage if it doesn't already exist | ||
if (!stageToDiagnosticMetrics.contains(stageId)) { | ||
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults] | ||
} | ||
|
||
stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum | ||
} | ||
|
||
/** | ||
* Updates the IODiagnosticMetricsMap with the provided SQLAccumProfileResults. | ||
* @param accum SQLAccumProfileResults instance containing IO diagnostics metrics | ||
* to be added to IODiagnosticMetricsMap. | ||
*/ | ||
private def updateIODiagnosticMetricsMap(accum: SQLAccumProfileResults): Unit = { | ||
val key = (accum.sqlID, accum.nodeID) | ||
|
||
// Initialize an entry if the key does not exist | ||
if (!IODiagnosticMetricsMap.contains(key)) { | ||
IODiagnosticMetricsMap(key) = ArrayBuffer[SQLAccumProfileResults]() | ||
} | ||
|
||
IODiagnosticMetricsMap(key) += accum | ||
} | ||
|
||
/** | ||
* Retrieves the task IDs associated with a specific stage. | ||
* | ||
* @param stageId The ID of the stage. | ||
* @return A seq of task IDs corresponding to the given stage ID. | ||
*/ | ||
private def getStageTaskIds(stageId: Int): Seq[Long] = { | ||
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut).distinct | ||
} | ||
|
||
/** | ||
* Retrieves task update values from the accumulator info for the specified stage ID. | ||
* | ||
* @param accumInfo AccumInfo object containing the task updates map. | ||
* @param stageId The stage ID for which task updates need to be retrived. | ||
* @return An array of task update values (`Long`) corresponding to the tasks | ||
* in the specified stage. | ||
*/ | ||
private def filterAccumTaskUpdatesForStage(accumInfo: AccumInfo, stageTaskIds: Seq[Long]) | ||
: Array[Long] = { | ||
stageTaskIds.collect { | ||
case taskId if accumInfo.taskUpdatesMap.contains(taskId) => | ||
accumInfo.taskUpdatesMap(taskId) | ||
}(breakOut) | ||
} | ||
|
||
/** | ||
* Connects Operators to Stages using AccumulatorIDs. | ||
* TODO: This function can be fused in the visitNode function to avoid the extra iteration. | ||
|
@@ -310,15 +370,101 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap | |
val med = Math.max(taskInfo.med, driverInfo.med) | ||
val total = Math.max(taskInfo.total, driverInfo.total) | ||
|
||
Some(SQLAccumProfileResults(appIndex, metric.sqlID, | ||
val sqlAccumProileResult = SQLAccumProfileResults(appIndex, metric.sqlID, | ||
metric.nodeID, metric.nodeName, metric.accumulatorId, metric.name, | ||
min, med, max, total, metric.metricType, metric.stageIds.mkString(","))) | ||
min, med, max, total, metric.metricType, metric.stageIds) | ||
|
||
if (isIODiagnosticMetricName(metric.name)) { | ||
updateIODiagnosticMetricsMap(sqlAccumProileResult) | ||
} | ||
|
||
Some(sqlAccumProileResult) | ||
} else { | ||
None | ||
} | ||
}(breakOut) | ||
} | ||
|
||
/** | ||
* Generates IO-related diagnostic metrics for the SQL plan. Metrics include: | ||
* - Output rows | ||
* - Scan time | ||
* - Output batches | ||
* - Buffer time | ||
* - Shuffle write time | ||
* - Fetch wait time | ||
* - GPU decode time | ||
* | ||
* This method processes accumulator information for each SQL stage and node and | ||
* computes statistical results (min, median, max, sum) for IO-related metrics. | ||
* | ||
* @return A sequence of `IODiagnosticResult` objects one per SQL stage and node. | ||
*/ | ||
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = { | ||
// val emptyAccumInfo = new AccumInfo(AccumMetaRef.EMPTY_ACCUM_META_REF) | ||
// Transform the diagnostic metrics map into a sequence of results | ||
IODiagnosticMetricsMap.toSeq.flatMap { case ((sqlId, nodeId), sqlAccums) => | ||
// Process each stage ID and compute diagnostic results | ||
val stageIds = sqlAccums.head.stageIds | ||
val stageIdsSize = stageIds.size | ||
stageIds.flatMap { stageId => | ||
val stageTaskIds = getStageTaskIds(stageId) | ||
val nodeName = sqlAccums.head.nodeName | ||
|
||
// Initialize a map to store statistics for each IO metric | ||
val metricNameToStatistics = HashMap.empty[String, StatisticsMetrics]. | ||
withDefaultValue(StatisticsMetrics.ZERO_RECORD) | ||
|
||
// Process each accumulator for the current SQL stage | ||
sqlAccums.foreach { sqlAccum => | ||
// TODO: need to check if accum ID is in driverAccumMap, currently skipped | ||
val accumInfo = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId) | ||
|
||
val metricStats: Option[StatisticsMetrics] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Can we use |
||
if (accumInfo.isEmpty || !accumInfo.get.stageValuesMap.contains(stageId)) { | ||
None | ||
} else if (stageIdsSize == 1) { | ||
Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total)) | ||
} else { | ||
// Retrieve task updates correspond to the current stage | ||
val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo.get, stageTaskIds) | ||
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) | ||
} | ||
|
||
// Compute the metric's statistics and store the results if available | ||
metricStats match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Can we use |
||
case Some(stat) => | ||
val metricKey = normalizeToIODiagnosticMetricKey(sqlAccum.name) | ||
metricNameToStatistics(metricKey) = stat | ||
case _ => () | ||
} | ||
} | ||
|
||
if (metricNameToStatistics.isEmpty) { | ||
// No IO metric statistics were computed for this stage | ||
None | ||
} else { | ||
Some(IODiagnosticResult( | ||
appIndex, | ||
app.getAppName, | ||
app.appId, | ||
sqlId, | ||
stageId, | ||
app.stageManager.getDurationById(stageId), | ||
nodeId, | ||
nodeName, | ||
metricNameToStatistics(OUTPUT_ROWS_METRIC_KEY), | ||
metricNameToStatistics(SCAN_TIME_METRIC_KEY), | ||
metricNameToStatistics(OUTPUT_BATCHES_METRIC_KEY), | ||
metricNameToStatistics(BUFFER_TIME_METRIC_KEY), | ||
metricNameToStatistics(SHUFFLE_WRITE_TIME_METRIC_KEY), | ||
metricNameToStatistics(FETCH_WAIT_TIME_METRIC_KEY), | ||
metricNameToStatistics(GPU_DECODE_TIME_METRIC_KEY))) | ||
} | ||
} | ||
}.toSeq | ||
} | ||
|
||
/** | ||
* Generate the stage level metrics for the SQL plan including GPU metrics if applicable. | ||
* Along with Spark defined metrics, below is the list of GPU metrics that are collected if they | ||
|
@@ -333,11 +479,12 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap | |
app.accumManager.accumInfoMap.flatMap { accumMapEntry => | ||
val accumInfo = accumMapEntry._2 | ||
accumInfo.stageValuesMap.keys.flatMap( stageId => { | ||
val stageTaskIds: Set[Long] = | ||
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut) | ||
// Retrieve task updates correspond to the current stage | ||
val filteredTaskUpdates = | ||
filterAccumTaskUpdatesForStage(accumInfo, getStageTaskIds(stageId)) | ||
|
||
// Get the task updates that belong to that stage | ||
StatisticsMetrics.createOptionalFromArr( | ||
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match { | ||
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) match { | ||
case Some(stat) => | ||
// Reuse AccumProfileResults to avoid generating allocating new objects | ||
val accumProfileResults = AccumProfileResults( | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,110 @@ | ||||||||
/* | ||||||||
* Copyright (c) 2024, NVIDIA CORPORATION. | ||||||||
* | ||||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||
* you may not use this file except in compliance with the License. | ||||||||
* You may obtain a copy of the License at | ||||||||
* | ||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||
* | ||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||
* See the License for the specific language governing permissions and | ||||||||
* limitations under the License. | ||||||||
*/ | ||||||||
|
||||||||
package com.nvidia.spark.rapids.tool.analysis.util | ||||||||
|
||||||||
object StageAccumDiagnosticMetrics { | ||||||||
val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled" | ||||||||
val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled" | ||||||||
val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead" | ||||||||
val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten" | ||||||||
val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten" | ||||||||
val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime" | ||||||||
val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime" | ||||||||
val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait" | ||||||||
|
||||||||
/** | ||||||||
* Set of all diagnostic metrics | ||||||||
*/ | ||||||||
lazy val allDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC, | ||||||||
DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC, | ||||||||
SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC, | ||||||||
GPU_SEMAPHORE_WAIT_METRIC) | ||||||||
} | ||||||||
|
||||||||
object IOAccumDiagnosticMetrics { | ||||||||
// Metric keys to support variations in metric naming | ||||||||
val OUTPUT_ROWS_METRIC_KEY = "output rows" | ||||||||
val SCAN_TIME_METRIC_KEY = "scan time" | ||||||||
val OUTPUT_BATCHES_METRIC_KEY = "output batches" | ||||||||
val BUFFER_TIME_METRIC_KEY = "buffer time" | ||||||||
val SHUFFLE_WRITE_TIME_METRIC_KEY = "shuffle write time" | ||||||||
val FETCH_WAIT_TIME_METRIC_KEY = "fetch wait time" | ||||||||
val GPU_DECODE_TIME_METRIC_KEY = "GPU decode time" | ||||||||
|
||||||||
val OUTPUT_ROW_METRIC_NAMES = Set( | ||||||||
"number of output rows", // common across all Spark eventlogs | ||||||||
"output rows", // only in GPU eventlogs | ||||||||
"join output rows" // only in GPU eventlogs | ||||||||
) | ||||||||
|
||||||||
val SCAN_TIME_METRIC_NAMES = Set( | ||||||||
"scan time" // common across all Spark eventlogs | ||||||||
) | ||||||||
|
||||||||
val OUTPUT_BATCHES_METRIC_NAMES = Set( | ||||||||
"number of output batches", // only in Photon eventlogs | ||||||||
"output columnar batches" // only in GPU eventlogs | ||||||||
) | ||||||||
|
||||||||
val BUFFER_TIME_METRIC_NAMES = Set( | ||||||||
"buffer time" // common across all Spark eventlogs | ||||||||
) | ||||||||
|
||||||||
val SHUFFLE_WRITE_TIME_METRIC_NAMES = Set( | ||||||||
"shuffle write time", // common across all Spark eventlogs | ||||||||
"rs. shuffle write time" // only in GPU eventlogs | ||||||||
) | ||||||||
|
||||||||
val FETCH_WAIT_TIME_METRIC_NAMES = Set( | ||||||||
"fetch wait time" // common across all Spark eventlogs | ||||||||
) | ||||||||
|
||||||||
val GPU_DECODE_TIME_METRIC_NAMES = Set( | ||||||||
"GPU decode time" // only in GPU eventlogs | ||||||||
) | ||||||||
|
||||||||
private val metricNamesToKeyMap: Map[String, String] = ( | ||||||||
OUTPUT_ROW_METRIC_NAMES.map(_ -> OUTPUT_ROWS_METRIC_KEY) ++ | ||||||||
SCAN_TIME_METRIC_NAMES.map(_ -> SCAN_TIME_METRIC_KEY) ++ | ||||||||
OUTPUT_BATCHES_METRIC_NAMES.map(_ -> OUTPUT_BATCHES_METRIC_KEY) ++ | ||||||||
BUFFER_TIME_METRIC_NAMES.map(_ -> BUFFER_TIME_METRIC_KEY) ++ | ||||||||
SHUFFLE_WRITE_TIME_METRIC_NAMES.map(_ -> SHUFFLE_WRITE_TIME_METRIC_KEY) ++ | ||||||||
FETCH_WAIT_TIME_METRIC_NAMES.map(_ -> FETCH_WAIT_TIME_METRIC_KEY) ++ | ||||||||
GPU_DECODE_TIME_METRIC_NAMES.map(_ -> GPU_DECODE_TIME_METRIC_KEY)).toMap | ||||||||
|
||||||||
/** | ||||||||
* Set of all IO diagnostic metrics names | ||||||||
*/ | ||||||||
lazy val allIODiagnosticMetrics: Set[String] = metricNamesToKeyMap.keys.toSet | ||||||||
|
||||||||
/** | ||||||||
* Check if a metric name belongs to IO diagnostic metrics | ||||||||
*/ | ||||||||
def isIODiagnosticMetricName(metric: String): Boolean = { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I saw a similar method Can we improve the consistency (maybe include isDiagnosticMetrics in its object spark-rapids-tools/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala Lines 37 to 39 in cc80b48
|
||||||||
allIODiagnosticMetrics.contains(metric) | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Normalize a metric name to its IO diagnostic metric constant because we want to | ||||||||
* support variations in metric naming, e.g. "join output rows", "number of output rows" | ||||||||
* are different names for output rows metric. | ||||||||
*/ | ||||||||
def normalizeToIODiagnosticMetricKey(metric: String): String = { | ||||||||
// input metric is already known to be an IO diagnostic metric | ||||||||
metricNamesToKeyMap(metric) | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good use of
breakOut
here