Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add IO diagnostic output for GPU slowness in Profiler tool #1451

Open
wants to merge 28 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5d36743
diagnostic view 2 start
cindyyuanjiang Nov 15, 2024
5a62749
Merge branch 'dev' into diagnostic-view-2
cindyyuanjiang Nov 26, 2024
6978f79
update IODiagnosticProfileResult
cindyyuanjiang Nov 26, 2024
e0fe692
working implementation of view 2
cindyyuanjiang Dec 4, 2024
b2a8c0e
add node name in view 2 with some print statements
cindyyuanjiang Dec 5, 2024
e0d6a0c
fixe unit tests
cindyyuanjiang Dec 6, 2024
03087c0
Merge branch 'dev' into diagnostic-view-2
cindyyuanjiang Dec 6, 2024
c0e44b2
fix merge conflict
cindyyuanjiang Dec 6, 2024
66c9762
add unit test and clean up
cindyyuanjiang Dec 6, 2024
45712e2
address review feedback
cindyyuanjiang Dec 10, 2024
3786fd9
add comments and rename variables/functions
cindyyuanjiang Dec 10, 2024
f2131ee
Merge branch 'dev' into diagnostic-view-2
cindyyuanjiang Dec 12, 2024
5dabc28
address review feedback
cindyyuanjiang Dec 13, 2024
f668dda
new output batches name
cindyyuanjiang Dec 16, 2024
e9c9508
refactor io diagnostic metrics
cindyyuanjiang Dec 18, 2024
0bd1e7f
remove function getAccumInfoStatisticsInStage
cindyyuanjiang Dec 18, 2024
f8346dd
Merge branch 'dev' into diagnostic-view-2
cindyyuanjiang Dec 18, 2024
550406c
refactor due to new optimizations from dev
cindyyuanjiang Dec 19, 2024
6d0d798
add back getStageTaskIds to avoid computing stage ids multiple times …
cindyyuanjiang Dec 19, 2024
1ae6143
Merge branch 'dev' into diagnostic-view-2
cindyyuanjiang Dec 20, 2024
be428e9
merged dev
cindyyuanjiang Dec 20, 2024
e882518
optimized computation of metric stats
cindyyuanjiang Dec 23, 2024
735546a
change metricNamesToKeyMap from string to string
cindyyuanjiang Dec 23, 2024
43f5c84
move AnalysisUtil file
cindyyuanjiang Dec 23, 2024
e8c2814
minor style updates
cindyyuanjiang Dec 24, 2024
b97e2db
expected file update
cindyyuanjiang Dec 24, 2024
631b9c0
minor updates between seq conversion
cindyyuanjiang Dec 24, 2024
cc80b48
comment on empty stage ids
cindyyuanjiang Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.analysis.util.IOAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IODiagnosticResult, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.store.DataSourceRecord
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, DataSourceRecord}
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph

/**
Expand Down Expand Up @@ -58,21 +59,78 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and a set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and diagnostic metrics results (stored as a map between metric name
// and AccumProfileResults)

// A mapping from stage ID to diagnostic metrics results.
// Each stage ID maps to another HashMap, where:
// - The key is the diagnostic metric name (String).
// - The value is an AccumProfileResults object containing the diagnostic data for that metric.
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] =
HashMap.empty[Long, HashMap[String, AccumProfileResults]]

// A mapping from a unique combination of SQL execution identifiers to a list of IO diagnostic
// metrics results.
// The key is a tuple consisting of:
// - sqlID (Long): The unique identifier for the SQL query.
// - nodeID (Long): The unique identifier for the node.
// The value is an ArrayBuffer of SQLAccumProfileResults objects, storing the IO diagnostic
// metrics for the given key.
val IODiagnosticMetricsMap: HashMap[(Long, Long), ArrayBuffer[SQLAccumProfileResults]] =
HashMap.empty[(Long, Long), ArrayBuffer[SQLAccumProfileResults]]

/**
* Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping
* @param accum AccumProfileResults to be analyzed
* Updates the stageToDiagnosticMetrics mapping with the provided AccumProfileResults.
* @param accum AccumProfileResults instance containing diagnostic metrics to be added
* to stageToDiagnosticMetrics mapping.
*/
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = {
val stageId = accum.stageId
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults]
// Initialize an empty mapping for the stage if it doesn't already exist
if (!stageToDiagnosticMetrics.contains(accum.stageId)) {
stageToDiagnosticMetrics(accum.stageId) = HashMap.empty[String, AccumProfileResults]
}
stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum

stageToDiagnosticMetrics(accum.stageId)(accum.accMetaRef.getName()) = accum
}

/**
* Updates the IODiagnosticMetricsMap with the provided SQLAccumProfileResults.
* @param accum SQLAccumProfileResults instance containing IO diagnostics metrics
* to be added to IODiagnosticMetricsMap.
*/
private def updateIODiagnosticMetricsMap(accum: SQLAccumProfileResults): Unit = {
val key = (accum.sqlID, accum.nodeID)

// Initialize an entry if the key does not exist
if (!IODiagnosticMetricsMap.contains(key)) {
IODiagnosticMetricsMap(key) = ArrayBuffer[SQLAccumProfileResults]()
}

IODiagnosticMetricsMap(key) += accum
}

/**
* Retrieves the task IDs associated with a specific stage.
*
* @param stageId The ID of the stage.
* @return A seq of task IDs corresponding to the given stage ID.
*/
private def getStageTaskIds(stageId: Int): Seq[Long] = {
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut).distinct
}

/**
* Retrieves task update values from the accumulator info for the specified stage ID.
*
* @param accumInfo AccumInfo object containing the task updates map.
* @param stageId The stage ID for which task updates need to be retrived.
* @return An array of task update values (`Long`) corresponding to the tasks
* in the specified stage.
*/
private def filterAccumTaskUpdatesForStage(accumInfo: AccumInfo, stageTaskIds: Seq[Long])
: Array[Long] = {
stageTaskIds.collect {
case taskId if accumInfo.taskUpdatesMap.contains(taskId) =>
accumInfo.taskUpdatesMap(taskId)
}(breakOut)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of breakOut here

}

/**
Expand Down Expand Up @@ -310,15 +368,101 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val med = Math.max(taskInfo.med, driverInfo.med)
val total = Math.max(taskInfo.total, driverInfo.total)

Some(SQLAccumProfileResults(appIndex, metric.sqlID,
val sqlAccumProileResult = SQLAccumProfileResults(appIndex, metric.sqlID,
metric.nodeID, metric.nodeName, metric.accumulatorId, metric.name,
min, med, max, total, metric.metricType, metric.stageIds.mkString(",")))
min, med, max, total, metric.metricType, metric.stageIds)

if (isIODiagnosticMetricName(metric.name)) {
updateIODiagnosticMetricsMap(sqlAccumProileResult)
}

Some(sqlAccumProileResult)
} else {
None
}
}(breakOut)
}

/**
* Generates IO-related diagnostic metrics for the SQL plan. Metrics include:
* - Output rows
* - Scan time
* - Output batches
* - Buffer time
* - Shuffle write time
* - Fetch wait time
* - GPU decode time
*
* This method processes accumulator information for each SQL stage and node and
* computes statistical results (min, median, max, sum) for IO-related metrics.
*
* @return A sequence of `IODiagnosticResult` objects one per SQL stage and node.
*/
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = {
// Transform the diagnostic metrics map into a sequence of results
IODiagnosticMetricsMap.flatMap { case ((sqlId, nodeId), sqlAccums) =>
// Process each stage ID and compute diagnostic results
// TODO: currently if stage IDs is empty, the result is skipped
val stageIds = sqlAccums.head.stageIds
stageIds.flatMap { stageId =>
val stageTaskIds = getStageTaskIds(stageId)
val nodeName = sqlAccums.head.nodeName

// Initialize a map to store statistics for each IO metric
val metricNameToStatistics = HashMap.empty[String, StatisticsMetrics].
withDefaultValue(StatisticsMetrics.ZERO_RECORD)

// Process each accumulator for the current SQL stage
sqlAccums.foreach { sqlAccum =>
// TODO: check if accumulator ID is in driverAccumMap, currently skipped
val accumInfo = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId)

val metricStats: Option[StatisticsMetrics] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use accumInfoOpt.flatMap() for a functional approach?

if (accumInfo.isEmpty || !accumInfo.get.stageValuesMap.contains(stageId)) {
None
} else if (stageIds.size == 1) {
// Skip computing statistics when there is only one stage
Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total))
} else {
// Retrieve task updates which correspond to the current stage
val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo.get, stageTaskIds)
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates)
}

// Compute the metric's statistics and store the results if available
metricStats match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use .map() instead of match{} for a more functional approach?

case Some(stat) =>
val metricKey = normalizeToIODiagnosticMetricKey(sqlAccum.name)
metricNameToStatistics(metricKey) = stat
case _ => ()
}
}

if (metricNameToStatistics.isEmpty) {
// No IO metric statistics were computed for this stage
None
} else {
Some(IODiagnosticResult(
appIndex,
app.getAppName,
app.appId,
sqlId,
stageId,
app.stageManager.getDurationById(stageId),
nodeId,
nodeName,
metricNameToStatistics(OUTPUT_ROWS_METRIC_KEY),
metricNameToStatistics(SCAN_TIME_METRIC_KEY),
metricNameToStatistics(OUTPUT_BATCHES_METRIC_KEY),
metricNameToStatistics(BUFFER_TIME_METRIC_KEY),
metricNameToStatistics(SHUFFLE_WRITE_TIME_METRIC_KEY),
metricNameToStatistics(FETCH_WAIT_TIME_METRIC_KEY),
metricNameToStatistics(GPU_DECODE_TIME_METRIC_KEY)))
}
}
}(breakOut)
}

/**
* Generate the stage level metrics for the SQL plan including GPU metrics if applicable.
* Along with Spark defined metrics, below is the list of GPU metrics that are collected if they
Expand All @@ -333,11 +477,12 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keys.flatMap( stageId => {
val stageTaskIds: Set[Long] =
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
// Retrieve task updates correspond to the current stage
val filteredTaskUpdates =
filterAccumTaskUpdatesForStage(accumInfo, getStageTaskIds(stageId))

// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.breakOut
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.analysis.util.{AggAccumHelper, AggAccumPhotonHelper}
import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling._

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

object StageAccumDiagnosticMetrics {
val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled"
val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled"
val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead"
val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten"
val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten"
val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime"
val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime"
val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait"

/**
* Set of all diagnostic metrics
*/
lazy val allDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC,
DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC,
SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC,
GPU_SEMAPHORE_WAIT_METRIC)
}

object IOAccumDiagnosticMetrics {
// Metric keys to support variations in metric naming
val OUTPUT_ROWS_METRIC_KEY = "output rows"
val SCAN_TIME_METRIC_KEY = "scan time"
val OUTPUT_BATCHES_METRIC_KEY = "output batches"
val BUFFER_TIME_METRIC_KEY = "buffer time"
val SHUFFLE_WRITE_TIME_METRIC_KEY = "shuffle write time"
val FETCH_WAIT_TIME_METRIC_KEY = "fetch wait time"
val GPU_DECODE_TIME_METRIC_KEY = "GPU decode time"

val OUTPUT_ROW_METRIC_NAMES = Set(
"number of output rows", // common across all Spark eventlogs
"output rows", // only in GPU eventlogs
"join output rows" // only in GPU eventlogs
)

val SCAN_TIME_METRIC_NAMES = Set(
"scan time" // common across all Spark eventlogs
)

val OUTPUT_BATCHES_METRIC_NAMES = Set(
"number of output batches", // only in Photon eventlogs
"output columnar batches" // only in GPU eventlogs
)

val BUFFER_TIME_METRIC_NAMES = Set(
"buffer time" // common across all Spark eventlogs
)

val SHUFFLE_WRITE_TIME_METRIC_NAMES = Set(
"shuffle write time", // common across all Spark eventlogs
"rs. shuffle write time" // only in GPU eventlogs
)

val FETCH_WAIT_TIME_METRIC_NAMES = Set(
"fetch wait time" // common across all Spark eventlogs
)

val GPU_DECODE_TIME_METRIC_NAMES = Set(
"GPU decode time" // only in GPU eventlogs
)

private val metricNamesToKeyMap: Map[String, String] = (
OUTPUT_ROW_METRIC_NAMES.map(_ -> OUTPUT_ROWS_METRIC_KEY) ++
SCAN_TIME_METRIC_NAMES.map(_ -> SCAN_TIME_METRIC_KEY) ++
OUTPUT_BATCHES_METRIC_NAMES.map(_ -> OUTPUT_BATCHES_METRIC_KEY) ++
BUFFER_TIME_METRIC_NAMES.map(_ -> BUFFER_TIME_METRIC_KEY) ++
SHUFFLE_WRITE_TIME_METRIC_NAMES.map(_ -> SHUFFLE_WRITE_TIME_METRIC_KEY) ++
FETCH_WAIT_TIME_METRIC_NAMES.map(_ -> FETCH_WAIT_TIME_METRIC_KEY) ++
GPU_DECODE_TIME_METRIC_NAMES.map(_ -> GPU_DECODE_TIME_METRIC_KEY)).toMap

/**
* Set of all IO diagnostic metrics names
*/
lazy val allIODiagnosticMetrics: Set[String] = metricNamesToKeyMap.keys.toSet

/**
* Check if a metric name belongs to IO diagnostic metrics
*/
def isIODiagnosticMetricName(metric: String): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a similar method isDiagnosticMetrics() that has been included in class AccumNameRef where as
isIODiagnosticMetricName() is part of object IOAccumDiagnosticMetrics.

Can we improve the consistency (maybe include isDiagnosticMetrics in its object StageAccumDiagnosticMetrics)?

val csvValue: String = StringUtils.reformatCSVString(value)
def isDiagnosticMetrics(): Boolean = allDiagnosticMetrics.contains(value)

allIODiagnosticMetrics.contains(metric)
}

/**
* Normalize a metric name to its IO diagnostic metric constant because we want to
* support variations in metric naming, e.g. "join output rows", "number of output rows"
* are different names for output rows metric.
*/
def normalizeToIODiagnosticMetricKey(metric: String): String = {
// input metric is already known to be an IO diagnostic metric
metricNamesToKeyMap(metric)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfStageMetricView.getRawView(apps)
}

def getIODiagnosticMetrics: Seq[IODiagnosticResult] = {
ProfIODiagnosticMetricsView.getRawView(apps)
}

/**
* This function is meant to clean up Delta log execs so that you could align
* SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log
Expand Down
Loading
Loading