diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AnalysisUtils.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AnalysisUtils.scala deleted file mode 100644 index 01f6989c7..000000000 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AnalysisUtils.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.tool.analysis - -object StageAccumDiagnosticMetrics { - val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled" - val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled" - val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead" - val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten" - val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten" - val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime" - val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime" - val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait" - - /** - * Get all diagnostic metrics - */ - def getAllDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC, - DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC, - SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC, - GPU_SEMAPHORE_WAIT_METRIC) -} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala index 7ca4bbb5b..05c27648b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala @@ -19,7 +19,8 @@ package com.nvidia.spark.rapids.tool.analysis import scala.collection.breakOut import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet} -import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} +import com.nvidia.spark.rapids.tool.analysis.util.IOAccumDiagnosticMetrics._ +import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IODiagnosticResult, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer import org.apache.spark.sql.execution.SparkPlanInfo @@ -27,7 +28,7 @@ import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo -import org.apache.spark.sql.rapids.tool.store.DataSourceRecord +import org.apache.spark.sql.rapids.tool.store.{AccumInfo, DataSourceRecord} import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph /** @@ -58,21 +59,78 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]() // A map between stage ID and a set of node names val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]] - // A map between stage ID and diagnostic metrics results (stored as a map between metric name - // and AccumProfileResults) + + // A mapping from stage ID to diagnostic metrics results. + // Each stage ID maps to another HashMap, where: + // - The key is the diagnostic metric name (String). + // - The value is an AccumProfileResults object containing the diagnostic data for that metric. val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] = HashMap.empty[Long, HashMap[String, AccumProfileResults]] + // A mapping from a unique combination of SQL execution identifiers to a list of IO diagnostic + // metrics results. + // The key is a tuple consisting of: + // - sqlID (Long): The unique identifier for the SQL query. + // - nodeID (Long): The unique identifier for the node. + // The value is an ArrayBuffer of SQLAccumProfileResults objects, storing the IO diagnostic + // metrics for the given key. + val IODiagnosticMetricsMap: HashMap[(Long, Long), ArrayBuffer[SQLAccumProfileResults]] = + HashMap.empty[(Long, Long), ArrayBuffer[SQLAccumProfileResults]] + /** - * Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping - * @param accum AccumProfileResults to be analyzed + * Updates the stageToDiagnosticMetrics mapping with the provided AccumProfileResults. + * @param accum AccumProfileResults instance containing diagnostic metrics to be added + * to stageToDiagnosticMetrics mapping. */ private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = { - val stageId = accum.stageId - if (!stageToDiagnosticMetrics.contains(stageId)) { - stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults] + // Initialize an empty mapping for the stage if it doesn't already exist + if (!stageToDiagnosticMetrics.contains(accum.stageId)) { + stageToDiagnosticMetrics(accum.stageId) = HashMap.empty[String, AccumProfileResults] } - stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum + + stageToDiagnosticMetrics(accum.stageId)(accum.accMetaRef.getName()) = accum + } + + /** + * Updates the IODiagnosticMetricsMap with the provided SQLAccumProfileResults. + * @param accum SQLAccumProfileResults instance containing IO diagnostics metrics + * to be added to IODiagnosticMetricsMap. + */ + private def updateIODiagnosticMetricsMap(accum: SQLAccumProfileResults): Unit = { + val key = (accum.sqlID, accum.nodeID) + + // Initialize an entry if the key does not exist + if (!IODiagnosticMetricsMap.contains(key)) { + IODiagnosticMetricsMap(key) = ArrayBuffer[SQLAccumProfileResults]() + } + + IODiagnosticMetricsMap(key) += accum + } + + /** + * Retrieves the task IDs associated with a specific stage. + * + * @param stageId The ID of the stage. + * @return A seq of task IDs corresponding to the given stage ID. + */ + private def getStageTaskIds(stageId: Int): Seq[Long] = { + app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut).distinct + } + + /** + * Retrieves task update values from the accumulator info for the specified stage ID. + * + * @param accumInfo AccumInfo object containing the task updates map. + * @param stageId The stage ID for which task updates need to be retrived. + * @return An array of task update values (`Long`) corresponding to the tasks + * in the specified stage. + */ + private def filterAccumTaskUpdatesForStage(accumInfo: AccumInfo, stageTaskIds: Seq[Long]) + : Array[Long] = { + stageTaskIds.collect { + case taskId if accumInfo.taskUpdatesMap.contains(taskId) => + accumInfo.taskUpdatesMap(taskId) + }(breakOut) } /** @@ -310,15 +368,101 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap val med = Math.max(taskInfo.med, driverInfo.med) val total = Math.max(taskInfo.total, driverInfo.total) - Some(SQLAccumProfileResults(appIndex, metric.sqlID, + val sqlAccumProileResult = SQLAccumProfileResults(appIndex, metric.sqlID, metric.nodeID, metric.nodeName, metric.accumulatorId, metric.name, - min, med, max, total, metric.metricType, metric.stageIds.mkString(","))) + min, med, max, total, metric.metricType, metric.stageIds) + + if (isIODiagnosticMetricName(metric.name)) { + updateIODiagnosticMetricsMap(sqlAccumProileResult) + } + + Some(sqlAccumProileResult) } else { None } }(breakOut) } + /** + * Generates IO-related diagnostic metrics for the SQL plan. Metrics include: + * - Output rows + * - Scan time + * - Output batches + * - Buffer time + * - Shuffle write time + * - Fetch wait time + * - GPU decode time + * + * This method processes accumulator information for each SQL stage and node and + * computes statistical results (min, median, max, sum) for IO-related metrics. + * + * @return A sequence of `IODiagnosticResult` objects one per SQL stage and node. + */ + def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = { + // Transform the diagnostic metrics map into a sequence of results + IODiagnosticMetricsMap.flatMap { case ((sqlId, nodeId), sqlAccums) => + // Process each stage ID and compute diagnostic results + // TODO: currently if stage IDs is empty, the result is skipped + val stageIds = sqlAccums.head.stageIds + stageIds.flatMap { stageId => + val stageTaskIds = getStageTaskIds(stageId) + val nodeName = sqlAccums.head.nodeName + + // Initialize a map to store statistics for each IO metric + val metricNameToStatistics = HashMap.empty[String, StatisticsMetrics]. + withDefaultValue(StatisticsMetrics.ZERO_RECORD) + + // Process each accumulator for the current SQL stage + sqlAccums.foreach { sqlAccum => + // TODO: check if accumulator ID is in driverAccumMap, currently skipped + val accumInfo = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId) + + val metricStats: Option[StatisticsMetrics] = + if (accumInfo.isEmpty || !accumInfo.get.stageValuesMap.contains(stageId)) { + None + } else if (stageIds.size == 1) { + // Skip computing statistics when there is only one stage + Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total)) + } else { + // Retrieve task updates which correspond to the current stage + val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo.get, stageTaskIds) + StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) + } + + // Compute the metric's statistics and store the results if available + metricStats match { + case Some(stat) => + val metricKey = normalizeToIODiagnosticMetricKey(sqlAccum.name) + metricNameToStatistics(metricKey) = stat + case _ => () + } + } + + if (metricNameToStatistics.isEmpty) { + // No IO metric statistics were computed for this stage + None + } else { + Some(IODiagnosticResult( + appIndex, + app.getAppName, + app.appId, + sqlId, + stageId, + app.stageManager.getDurationById(stageId), + nodeId, + nodeName, + metricNameToStatistics(OUTPUT_ROWS_METRIC_KEY), + metricNameToStatistics(SCAN_TIME_METRIC_KEY), + metricNameToStatistics(OUTPUT_BATCHES_METRIC_KEY), + metricNameToStatistics(BUFFER_TIME_METRIC_KEY), + metricNameToStatistics(SHUFFLE_WRITE_TIME_METRIC_KEY), + metricNameToStatistics(FETCH_WAIT_TIME_METRIC_KEY), + metricNameToStatistics(GPU_DECODE_TIME_METRIC_KEY))) + } + } + }(breakOut) + } + /** * Generate the stage level metrics for the SQL plan including GPU metrics if applicable. * Along with Spark defined metrics, below is the list of GPU metrics that are collected if they @@ -333,11 +477,12 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap app.accumManager.accumInfoMap.flatMap { accumMapEntry => val accumInfo = accumMapEntry._2 accumInfo.stageValuesMap.keys.flatMap( stageId => { - val stageTaskIds: Set[Long] = - app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut) + // Retrieve task updates correspond to the current stage + val filteredTaskUpdates = + filterAccumTaskUpdatesForStage(accumInfo, getStageTaskIds(stageId)) + // Get the task updates that belong to that stage - StatisticsMetrics.createOptionalFromArr( - accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match { + StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) match { case Some(stat) => // Reuse AccumProfileResults to avoid generating allocating new objects val accumProfileResults = AccumProfileResults( diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 6b8c3d5e5..990d9d21a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids.tool.analysis import scala.collection.breakOut import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap} -import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._ import com.nvidia.spark.rapids.tool.analysis.util.{AggAccumHelper, AggAccumPhotonHelper} +import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._ import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper import com.nvidia.spark.rapids.tool.profiling._ diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/DiagnosticMetrics.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/DiagnosticMetrics.scala new file mode 100644 index 000000000..323d3a245 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/DiagnosticMetrics.scala @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +object StageAccumDiagnosticMetrics { + val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled" + val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled" + val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead" + val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten" + val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten" + val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime" + val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime" + val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait" + + /** + * Set of all diagnostic metrics + */ + lazy val allDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC, + DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC, + SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC, + GPU_SEMAPHORE_WAIT_METRIC) +} + +object IOAccumDiagnosticMetrics { + // Metric keys to support variations in metric naming + val OUTPUT_ROWS_METRIC_KEY = "output rows" + val SCAN_TIME_METRIC_KEY = "scan time" + val OUTPUT_BATCHES_METRIC_KEY = "output batches" + val BUFFER_TIME_METRIC_KEY = "buffer time" + val SHUFFLE_WRITE_TIME_METRIC_KEY = "shuffle write time" + val FETCH_WAIT_TIME_METRIC_KEY = "fetch wait time" + val GPU_DECODE_TIME_METRIC_KEY = "GPU decode time" + + val OUTPUT_ROW_METRIC_NAMES = Set( + "number of output rows", // common across all Spark eventlogs + "output rows", // only in GPU eventlogs + "join output rows" // only in GPU eventlogs + ) + + val SCAN_TIME_METRIC_NAMES = Set( + "scan time" // common across all Spark eventlogs + ) + + val OUTPUT_BATCHES_METRIC_NAMES = Set( + "number of output batches", // only in Photon eventlogs + "output columnar batches" // only in GPU eventlogs + ) + + val BUFFER_TIME_METRIC_NAMES = Set( + "buffer time" // common across all Spark eventlogs + ) + + val SHUFFLE_WRITE_TIME_METRIC_NAMES = Set( + "shuffle write time", // common across all Spark eventlogs + "rs. shuffle write time" // only in GPU eventlogs + ) + + val FETCH_WAIT_TIME_METRIC_NAMES = Set( + "fetch wait time" // common across all Spark eventlogs + ) + + val GPU_DECODE_TIME_METRIC_NAMES = Set( + "GPU decode time" // only in GPU eventlogs + ) + + private val metricNamesToKeyMap: Map[String, String] = ( + OUTPUT_ROW_METRIC_NAMES.map(_ -> OUTPUT_ROWS_METRIC_KEY) ++ + SCAN_TIME_METRIC_NAMES.map(_ -> SCAN_TIME_METRIC_KEY) ++ + OUTPUT_BATCHES_METRIC_NAMES.map(_ -> OUTPUT_BATCHES_METRIC_KEY) ++ + BUFFER_TIME_METRIC_NAMES.map(_ -> BUFFER_TIME_METRIC_KEY) ++ + SHUFFLE_WRITE_TIME_METRIC_NAMES.map(_ -> SHUFFLE_WRITE_TIME_METRIC_KEY) ++ + FETCH_WAIT_TIME_METRIC_NAMES.map(_ -> FETCH_WAIT_TIME_METRIC_KEY) ++ + GPU_DECODE_TIME_METRIC_NAMES.map(_ -> GPU_DECODE_TIME_METRIC_KEY)).toMap + + /** + * Set of all IO diagnostic metrics names + */ + lazy val allIODiagnosticMetrics: Set[String] = metricNamesToKeyMap.keys.toSet + + /** + * Check if a metric name belongs to IO diagnostic metrics + */ + def isIODiagnosticMetricName(metric: String): Boolean = { + allIODiagnosticMetrics.contains(metric) + } + + /** + * Normalize a metric name to its IO diagnostic metric constant because we want to + * support variations in metric naming, e.g. "join output rows", "number of output rows" + * are different names for output rows metric. + */ + def normalizeToIODiagnosticMetricKey(metric: String): String = { + // input metric is already known to be an IO diagnostic metric + metricNamesToKeyMap(metric) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index b9c622334..6467f7feb 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -95,6 +95,10 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { ProfStageMetricView.getRawView(apps) } + def getIODiagnosticMetrics: Seq[IODiagnosticResult] = { + ProfIODiagnosticMetricsView.getRawView(apps) + } + /** * This function is meant to clean up Delta log execs so that you could align * SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DiagnosticSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DiagnosticSummaryInfo.scala index 6ba34f7f5..e714b123b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DiagnosticSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DiagnosticSummaryInfo.scala @@ -21,5 +21,6 @@ package com.nvidia.spark.rapids.tool.profiling * TODO: We plan to add two more fields/views in upcoming PRs. */ case class DiagnosticSummaryInfo( - stageDiagnostics: Seq[StageDiagnosticResult] + stageDiagnostics: Seq[StageDiagnosticResult], + IODiagnostics: Seq[IODiagnosticResult] ) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index ab5abdd2b..4ed1c4911 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.tool.profiling import scala.collection.Map +import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics + import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest} import org.apache.spark.sql.rapids.tool.store.AccumMetaRef import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils} @@ -74,16 +76,33 @@ case class JobInfoProfileResult( sqlID: Option[Long], startTime: Long, endTime: Option[Long]) extends ProfileResult { - override val outputHeaders = Seq("appIndex", "jobID", "stageIds", "sqlID", "startTime", "endTime") + + override val outputHeaders = { + Seq("appIndex", + "jobID", + "stageIds", + "sqlID", + "startTime", + "endTime") + } + override def convertToSeq: Seq[String] = { val stageIdStr = s"[${stageIds.mkString(",")}]" - Seq(appIndex.toString, jobID.toString, stageIdStr, sqlID.map(_.toString).getOrElse(null), - startTime.toString, endTime.map(_.toString).getOrElse(null)) + Seq(appIndex.toString, + jobID.toString, + stageIdStr, + sqlID.map(_.toString).getOrElse(null), + startTime.toString, + endTime.map(_.toString).getOrElse(null)) } + override def convertToCSVSeq: Seq[String] = { val stageIdStr = s"[${stageIds.mkString(",")}]" - Seq(appIndex.toString, jobID.toString, StringUtils.reformatCSVString(stageIdStr), - sqlID.map(_.toString).getOrElse(null), startTime.toString, + Seq(appIndex.toString, + jobID.toString, + StringUtils.reformatCSVString(stageIdStr), + sqlID.map(_.toString).getOrElse(null), + startTime.toString, endTime.map(_.toString).getOrElse(null)) } } @@ -202,38 +221,106 @@ class SQLExecutionInfoClass( } } -case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long, - nodeName: String, accumulatorId: Long, name: String, min: Long, median:Long, - max: Long, total: Long, metricType: String, stageIds: String) extends ProfileResult { - override val outputHeaders = Seq("appIndex", "sqlID", "nodeID", "nodeName", "accumulatorId", - "name", "min", "median", "max", "total", "metricType", "stageIds") +case class SQLAccumProfileResults( + appIndex: Int, + sqlID: Long, + nodeID: Long, + nodeName: String, + accumulatorId: Long, + name: String, + min: Long, + median: Long, + max: Long, + total: Long, + metricType: String, + stageIds: Set[Int]) extends ProfileResult { + + override val outputHeaders = { + Seq("appIndex", + "sqlID", + "nodeID", + "nodeName", + "accumulatorId", + "name", + "min", + "median", + "max", + "total", + "metricType", + "stageIds") + } + override def convertToSeq: Seq[String] = { - Seq(appIndex.toString, sqlID.toString, nodeID.toString, nodeName, accumulatorId.toString, - name, min.toString, median.toString, max.toString, total.toString, metricType, stageIds) + Seq(appIndex.toString, + sqlID.toString, + nodeID.toString, + nodeName, + accumulatorId.toString, + name, + min.toString, + median.toString, + max.toString, + total.toString, + metricType, + stageIds.mkString(",")) } + override def convertToCSVSeq: Seq[String] = { - Seq(appIndex.toString, sqlID.toString, nodeID.toString, - StringUtils.reformatCSVString(nodeName), accumulatorId.toString, - StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString, - total.toString, StringUtils.reformatCSVString(metricType), - StringUtils.reformatCSVString(stageIds)) + Seq(appIndex.toString, + sqlID.toString, + nodeID.toString, + StringUtils.reformatCSVString(nodeName), + accumulatorId.toString, + StringUtils.reformatCSVString(name), + min.toString, + median.toString, + max.toString, + total.toString, + StringUtils.reformatCSVString(metricType), + StringUtils.reformatCSVString(stageIds.mkString(","))) } } -case class AccumProfileResults(appIndex: Int, stageId: Int, accMetaRef: AccumMetaRef, - min: Long, median: Long, max: Long, total: Long) extends ProfileResult { - override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min", - "median", "max", "total") +case class AccumProfileResults( + appIndex: Int, + stageId: Int, + accMetaRef: AccumMetaRef, + min: Long, + median: Long, + max: Long, + total: Long) extends ProfileResult { + + override val outputHeaders = { + Seq("appIndex", + "stageId", + "accumulatorId", + "name", + "min", + "median", + "max", + "total") + } override def convertToSeq: Seq[String] = { - Seq(appIndex.toString, stageId.toString, accMetaRef.id.toString, accMetaRef.getName(), - min.toString, median.toString, max.toString, total.toString) + Seq(appIndex.toString, + stageId.toString, + accMetaRef.id.toString, + accMetaRef.getName(), + min.toString, + median.toString, + max.toString, + total.toString) } override def convertToCSVSeq: Seq[String] = { - Seq(appIndex.toString, stageId.toString, accMetaRef.id.toString, - accMetaRef.name.csvValue, min.toString, - median.toString, max.toString, total.toString) + Seq(appIndex.toString, + stageId.toString, + accMetaRef.id.toString, + accMetaRef.name.csvValue, + min.toString, + median.toString, + max.toString, + total.toString) } } @@ -573,6 +660,20 @@ case class StageAggTaskMetricsProfileResult( override def idHeader = "stageId" } +/** + * Represents diagnostic metrics results at task/stage level in a Spark SQL execution plan. + * Output file: stage_level_diagnostic_metrics.csv. + * Collected metrics include: + * - Memory spilled (MB) + * - Disk spilled (MB) + * - Input bytes read + * - Output bytes written + * - Shuffle read total bytes (remote + local) + * - Shuffle write bytes + * - Shuffle read fetch wait time (ms) + * - Shuffle write time (ms) + * - GPU semaphore time (ns) + */ case class StageDiagnosticResult( appIndex: Int, appName: String, @@ -878,6 +979,153 @@ case class SQLTaskAggMetricsProfileResult( } } +/** + * Represents IO-related diagnostic metrics results in a Spark SQL execution plan. + * Output file: io_diagnostic_metrics.csv. + * Collected metrics include: + * - Output rows + * - Scan time (ns) + * - Output batches + * - Buffer time (ns) + * - Shuffle write time (ns) + * - Fetch wait time (ns) + * - GPU decode time (ns) + */ +case class IODiagnosticResult( + appIndex: Int, + appName: String, + appId: String, + sqlId: Long, + stageId: Long, + duration: Long, + nodeId: Long, + nodeName: String, + outputRows: StatisticsMetrics, + scanTime: StatisticsMetrics, + outputBatches: StatisticsMetrics, + bufferTime: StatisticsMetrics, + shuffleWriteTime: StatisticsMetrics, + fetchWaitTime: StatisticsMetrics, + gpuDecodeTime: StatisticsMetrics) extends ProfileResult { + + override val outputHeaders = { + Seq("appIndex", + "appName", + "appId", + "sqlId", + "stageId", + "stageDurationMs", + "nodeId", + "nodeName", + "outputRowsMin", + "outputRowsMedian", + "outputRowsMax", + "outputRowsTotal", + "scanTimeMin", + "scanTimeMedian", + "scanTimeMax", + "scanTimeTotal", + "outputBatchesMin", + "outputBatchesMedian", + "outputBatchesMax", + "outputBatchesTotal", + "bufferTimeMin", + "bufferTimeMedian", + "bufferTimeMax", + "bufferTimeTotal", + "shuffleWriteTimeMin", + "shuffleWriteTimeMedian", + "shuffleWriteTimeMax", + "shuffleWriteTimeTotal", + "fetchWaitTimeMin", + "fetchWaitTimeMedian", + "fetchWaitTimeMax", + "fetchWaitTimeTotal", + "gpuDecodeTimeMin", + "gpuDecodeTimeMedian", + "gpuDecodeTimeMax", + "gpuDecodeTimeTotal") + } + + override def convertToSeq: Seq[String] = { + Seq(appIndex.toString, + appName, + appId, + sqlId.toString, + stageId.toString, + duration.toString, + nodeId.toString, + nodeName, + outputRows.min.toString, + outputRows.med.toString, + outputRows.max.toString, + outputRows.total.toString, + scanTime.min.toString, + scanTime.med.toString, + scanTime.max.toString, + scanTime.total.toString, + outputBatches.min.toString, + outputBatches.med.toString, + outputBatches.max.toString, + outputBatches.total.toString, + bufferTime.min.toString, + bufferTime.med.toString, + bufferTime.max.toString, + bufferTime.total.toString, + shuffleWriteTime.min.toString, + shuffleWriteTime.med.toString, + shuffleWriteTime.max.toString, + shuffleWriteTime.total.toString, + fetchWaitTime.min.toString, + fetchWaitTime.med.toString, + fetchWaitTime.max.toString, + fetchWaitTime.total.toString, + gpuDecodeTime.min.toString, + gpuDecodeTime.med.toString, + gpuDecodeTime.max.toString, + gpuDecodeTime.total.toString) + } + + override def convertToCSVSeq: Seq[String] = { + Seq(appIndex.toString, + appName, + appId, + sqlId.toString, + stageId.toString, + duration.toString, + nodeId.toString, + StringUtils.reformatCSVString(nodeName), + outputRows.min.toString, + outputRows.med.toString, + outputRows.max.toString, + outputRows.total.toString, + scanTime.min.toString, + scanTime.med.toString, + scanTime.max.toString, + scanTime.total.toString, + outputBatches.min.toString, + outputBatches.med.toString, + outputBatches.max.toString, + outputBatches.total.toString, + bufferTime.min.toString, + bufferTime.med.toString, + bufferTime.max.toString, + bufferTime.total.toString, + shuffleWriteTime.min.toString, + shuffleWriteTime.med.toString, + shuffleWriteTime.max.toString, + shuffleWriteTime.total.toString, + fetchWaitTime.min.toString, + fetchWaitTime.med.toString, + fetchWaitTime.max.toString, + fetchWaitTime.total.toString, + gpuDecodeTime.min.toString, + gpuDecodeTime.med.toString, + gpuDecodeTime.max.toString, + gpuDecodeTime.total.toString) + } +} + case class IOAnalysisProfileResult( appIndex: Int, appId: String, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 75b4c4590..b05447d3d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -403,7 +403,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea failedTasks, failedStages, failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo, appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo), - compareRes, DiagnosticSummaryInfo(analysis.stageDiagnostics)) + compareRes, DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics)) } /** @@ -570,12 +570,15 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea } // Write diagnostic related results to CSV files val diagnostics = if (outputCombined) { - Seq(DiagnosticSummaryInfo(diagnosticSum.flatMap(_.stageDiagnostics))) + Seq(DiagnosticSummaryInfo(diagnosticSum.flatMap(_.stageDiagnostics), + diagnosticSum.flatMap(_.IODiagnostics))) } else { diagnosticSum } diagnostics.foreach { diagnostoic => profileOutputWriter.writeCSVTable(STAGE_DIAGNOSTICS_LABEL, diagnostoic.stageDiagnostics) + profileOutputWriter.writeCSVTable(ProfIODiagnosticMetricsView.getLabel, + diagnostoic.IODiagnostics) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/SQLView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/SQLView.scala index e52340dce..c1f2b5675 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/SQLView.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/SQLView.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.tool.views import com.nvidia.spark.rapids.tool.analysis.{AppSQLPlanAnalyzer, ProfAppIndexMapperTrait, QualAppIndexMapperTrait} -import com.nvidia.spark.rapids.tool.profiling.{SQLAccumProfileResults, SQLCleanAndAlignIdsProfileResult, SQLPlanClassifier, WholeStageCodeGenResults} +import com.nvidia.spark.rapids.tool.profiling.{IODiagnosticResult, SQLAccumProfileResults, SQLCleanAndAlignIdsProfileResult, SQLPlanClassifier, WholeStageCodeGenResults} import org.apache.spark.sql.rapids.tool.AppBase import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo @@ -109,6 +109,25 @@ object ProfSQLPlanMetricsView extends AppSQLPlanMetricsViewTrait with ProfAppInd } } +object ProfIODiagnosticMetricsView extends ViewableTrait[IODiagnosticResult] + with ProfAppIndexMapperTrait { + override def getLabel: String = "IO Diagnostic Metrics" + override def getDescription: String = "IO Diagnostic Metrics" + + override def sortView( + rows: Seq[IODiagnosticResult]): Seq[IODiagnosticResult] = { + rows.sortBy(cols => (cols.appIndex, -cols.duration, cols.stageId, cols.sqlId, cols.nodeId)) + } + + override def getRawView(app: AppBase, index: Int): Seq[IODiagnosticResult] = { + app match { + case app: ApplicationInfo => + sortView(app.planMetricProcessor.generateIODiagnosticAccums()) + case _ => Seq.empty + } + } +} + object QualSQLPlanMetricsView extends AppSQLPlanMetricsViewTrait with QualAppIndexMapperTrait { override def getRawView(app: AppBase, index: Int): Seq[SQLAccumProfileResults] = { // TODO: Fix this implementation when we have a better way to get bind between App and diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala index 4ce41e4a5..75a55635d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.tool.store import java.util.concurrent.ConcurrentHashMap -import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics.getAllDiagnosticMetrics +import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics.allDiagnosticMetrics import org.apache.spark.sql.rapids.tool.util.EventUtils.normalizeMetricName import org.apache.spark.sql.rapids.tool.util.StringUtils @@ -36,7 +36,7 @@ case class AccumNameRef(value: String) { // create a new CSV string even though they represent the same AccumulatorName. val csvValue: String = StringUtils.reformatCSVString(value) - def isDiagnosticMetrics(): Boolean = getAllDiagnosticMetrics.contains(value) + def isDiagnosticMetrics(): Boolean = allDiagnosticMetrics.contains(value) } object AccumNameRef { diff --git a/core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_iodiagnosticmetrics_expectation.csv b/core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_iodiagnosticmetrics_expectation.csv new file mode 100644 index 000000000..8b1d3093e --- /dev/null +++ b/core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_iodiagnosticmetrics_expectation.csv @@ -0,0 +1,16 @@ +appIndex,appName,appId,sqlId,stageId,stageDurationMs,nodeId,nodeName,outputRowsMin,outputRowsMedian,outputRowsMax,outputRowsTotal,scanTimeMin,scanTimeMedian,scanTimeMax,scanTimeTotal,outputBatchesMin,outputBatchesMedian,outputBatchesMax,outputBatchesTotal,bufferTimeMin,bufferTimeMedian,bufferTimeMax,bufferTimeTotal,shuffleWriteTimeMin,shuffleWriteTimeMedian,shuffleWriteTimeMax,shuffleWriteTimeTotal,fetchWaitTimeMin,fetchWaitTimeMedian,fetchWaitTimeMax,fetchWaitTimeTotal,gpuDecodeTimeMin,gpuDecodeTimeMedian,gpuDecodeTimeMax,gpuDecodeTimeTotal +1,Spark shell,local-1622814619968,0,0,1743,16,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,41434653,60830365,100858775,400284505,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,0,1743,21,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,1,1631,8,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,37444140,92128351,108992798,508750471,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,1,1631,13,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,3,"GpuColumnarExchange",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,139875,230038,9747416,93193331,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,4,"GpuHashAggregate",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,6,"GpuShuffledHashJoin",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,7,"GpuShuffleCoalesce",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,8,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,14,"GpuCoalesceBatches",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,15,"GpuShuffleCoalesce",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,2,688,16,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,3,83,1,"GpuHashAggregate",1,1,1,1,0,0,0,0,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,3,83,2,"GpuShuffleCoalesce",200,200,200,200,0,0,0,0,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +1,Spark shell,local-1622814619968,0,3,83,3,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 diff --git a/core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagediagnosticmetricsagg_expectation.csv b/core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagediagnosticmetrics_expectation.csv similarity index 100% rename from core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagediagnosticmetricsagg_expectation.csv rename to core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagediagnosticmetrics_expectation.csv diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index 2b8c3bf12..17b0623c0 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -67,30 +67,135 @@ case class TestStageDiagnosticResult( gpuSemaphoreWaitSum: Long, nodeNames: Seq[String]) +case class TestIODiagnosticResult( + appIndex: Int, + appName: String, + appId: String, + sqlId: Long, + stageId: Long, + duration: Long, + nodeId: Long, + nodeName: String, + outputRowsMin: Long, + outputRowsMed: Long, + outputRowsMax: Long, + outputRowsSum: Long, + scanTimeMin: Long, + scanTimeMed: Long, + scanTimeMax: Long, + scanTimeSum: Long, + outputBatchesMin: Long, + outputBatchesMed: Long, + outputBatchesMax: Long, + outputBatchesSum: Long, + bufferTimeMin: Long, + bufferTimeMed: Long, + bufferTimeMax: Long, + bufferTimeSum: Long, + shuffleWriteTimeMin: Long, + shuffleWriteTimeMed: Long, + shuffleWriteTimeMax: Long, + shuffleWriteTimeSum: Long, + fetchWaitTimeMin: Long, + fetchWaitTimeMed: Long, + fetchWaitTimeMax: Long, + fetchWaitTimeSum: Long, + gpuDecodeTimeMin: Long, + gpuDecodeTimeMed: Long, + gpuDecodeTimeMax: Long, + gpuDecodeTimeSum: Long) + class AnalysisSuite extends FunSuite { private def createTestStageDiagnosticResult(diagnosticsResults: Seq[StageDiagnosticResult]): Seq[TestStageDiagnosticResult] = { def bytesToMB(numBytes: Long): Long = numBytes / (1024 * 1024) def nanoToMilliSec(numNano: Long): Long = numNano / 1000000 + diagnosticsResults.map { result => + TestStageDiagnosticResult( + result.appIndex, + result.appName, + result.appId, + result.stageId, + result.duration, + result.numTasks, + bytesToMB(result.memoryBytesSpilled.min), + bytesToMB(result.memoryBytesSpilled.median), + bytesToMB(result.memoryBytesSpilled.max), + bytesToMB(result.memoryBytesSpilled.total), + bytesToMB(result.diskBytesSpilled.min), + bytesToMB(result.diskBytesSpilled.median), + bytesToMB(result.diskBytesSpilled.max), + bytesToMB(result.diskBytesSpilled.total), + result.inputBytesRead.min, + result.inputBytesRead.median, + result.inputBytesRead.max, + result.inputBytesRead.total, + result.outputBytesWritten.min, + result.outputBytesWritten.median, + result.outputBytesWritten.max, + result.outputBytesWritten.total, + result.srTotalBytesReadMin, + result.srTotalBytesReadMed, + result.srTotalBytesReadMax, + result.srTotalBytesReadSum, + result.swBytesWritten.min, + result.swBytesWritten.median, + result.swBytesWritten.max, + result.swBytesWritten.total, + nanoToMilliSec(result.srFetchWaitTime.min), + nanoToMilliSec(result.srFetchWaitTime.median), + nanoToMilliSec(result.srFetchWaitTime.max), + nanoToMilliSec(result.srFetchWaitTime.total), + nanoToMilliSec(result.swWriteTime.min), + nanoToMilliSec(result.swWriteTime.median), + nanoToMilliSec(result.swWriteTime.max), + nanoToMilliSec(result.swWriteTime.total), + result.gpuSemaphoreWait.total, + result.nodeNames) + } + } + + private def createTestIODiagnosticResult(diagnosticsResults: Seq[IODiagnosticResult]): + Seq[TestIODiagnosticResult] = { diagnosticsResults.map {result => - TestStageDiagnosticResult(result.appIndex, result.appName, result.appId, result.stageId, - result.duration, result.numTasks, bytesToMB(result.memoryBytesSpilled.min), - bytesToMB(result.memoryBytesSpilled.median), bytesToMB(result.memoryBytesSpilled.max), - bytesToMB(result.memoryBytesSpilled.total), bytesToMB(result.diskBytesSpilled.min), - bytesToMB(result.diskBytesSpilled.median), bytesToMB(result.diskBytesSpilled.max), - bytesToMB(result.diskBytesSpilled.total), result.inputBytesRead.min, - result.inputBytesRead.median, result.inputBytesRead.max, result.inputBytesRead.total, - result.outputBytesWritten.min, result.outputBytesWritten.median, - result.outputBytesWritten.max, result.outputBytesWritten.total, - result.srTotalBytesReadMin, result.srTotalBytesReadMed, result.srTotalBytesReadMax, - result.srTotalBytesReadSum, result.swBytesWritten.min, result.swBytesWritten.median, - result.swBytesWritten.max, result.swBytesWritten.total, - nanoToMilliSec(result.srFetchWaitTime.min), nanoToMilliSec(result.srFetchWaitTime.median), - nanoToMilliSec(result.srFetchWaitTime.max), nanoToMilliSec(result.srFetchWaitTime.total), - nanoToMilliSec(result.swWriteTime.min), nanoToMilliSec(result.swWriteTime.median), - nanoToMilliSec(result.swWriteTime.max), nanoToMilliSec(result.swWriteTime.total), - result.gpuSemaphoreWait.total, result.nodeNames) + TestIODiagnosticResult( + result.appIndex, + result.appName, + result.appId, + result.sqlId, + result.stageId, + result.duration, + result.nodeId, + result.nodeName, + result.outputRows.min, + result.outputRows.med, + result.outputRows.max, + result.outputRows.total, + result.scanTime.min, + result.scanTime.med, + result.scanTime.max, + result.scanTime.total, + result.outputBatches.min, + result.outputBatches.med, + result.outputBatches.max, + result.outputBatches.total, + result.bufferTime.min, + result.bufferTime.med, + result.bufferTime.max, + result.bufferTime.total, + result.shuffleWriteTime.min, + result.shuffleWriteTime.med, + result.shuffleWriteTime.max, + result.shuffleWriteTime.total, + result.fetchWaitTime.min, + result.fetchWaitTime.med, + result.fetchWaitTime.max, + result.fetchWaitTime.total, + result.gpuDecodeTime.min, + result.gpuDecodeTime.med, + result.gpuDecodeTime.max, + result.gpuDecodeTime.total) } } @@ -142,8 +247,8 @@ class AnalysisSuite extends FunSuite { expectFile("sql"), expectFile("job"), expectFile("stage")) } - test("test stage-level diagnostic aggregation simple") { - val expectFile = "rapids_join_eventlog_stagediagnosticmetricsagg_expectation.csv" + test("test stage-level diagnostic metrics") { + val expectFile = "rapids_join_eventlog_stagediagnosticmetrics_expectation.csv" val logs = Array(s"$logDir/rapids_join_eventlog.zstd") val apps = ToolTestUtils.processProfileApps(logs, sparkSession) assert(apps.size == logs.size) @@ -154,14 +259,30 @@ class AnalysisSuite extends FunSuite { collect.getSQLToStage collect.getStageLevelMetrics - val aggResults = RawMetricProfilerView.getAggMetrics(apps) + val diagnosticResults = RawMetricProfilerView.getAggMetrics(apps) import org.apache.spark.sql.functions._ import sparkSession.implicits._ - val actualDf = createTestStageDiagnosticResult(aggResults.stageDiagnostics).toDF. + val actualDf = createTestStageDiagnosticResult(diagnosticResults.stageDiagnostics).toDF. withColumn("nodeNames", concat_ws(",", col("nodeNames"))) compareMetrics(actualDf, expectFile) } + test("test IO diagnostic metrics") { + val expectFile = "rapids_join_eventlog_iodiagnosticmetrics_expectation.csv" + val logs = Array(s"$logDir/rapids_join_eventlog.zstd") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + assert(apps.size == logs.size) + + val collect = new CollectInformation(apps) + // Computes IO diagnostic metrics mapping which is later used in getIODiagnosticMetrics + collect.getSQLPlanMetrics + val diagnosticResults = collect.getIODiagnosticMetrics + + import sparkSession.implicits._ + val actualDf = createTestIODiagnosticResult(diagnosticResults).toDF + compareMetrics(actualDf, expectFile) + } + private def testSqlMetricsAggregation(logs: Array[String], expectFileSQL: String, expectFileJob: String, expectFileStage: String): Unit = { val apps = ToolTestUtils.processProfileApps(logs, sparkSession) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 7ff03a943..40c7ea99f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -195,11 +195,10 @@ class ApplicationInfoSuite extends FunSuite with Logging { val resultExpectation = new File(expRoot, "rapids_join_eventlog_sqlmetrics_expectation.csv") assert(sqlMetrics.size == 83) - val sqlMetricsWithDelim = sqlMetrics.map { metrics => - metrics.copy(stageIds = ProfileUtils.replaceDelimiter(metrics.stageIds, ",")) - } + + import org.apache.spark.sql.functions._ import sparkSession.implicits._ - val df = sqlMetricsWithDelim.toDF + val df = sqlMetrics.toDF.withColumn("stageIds", concat_ws(",", col("stageIds"))) val dfExpect = ToolTestUtils.readExpectationCSV(sparkSession, resultExpectation.getPath()) ToolTestUtils.compareDataFrames(df, dfExpect) } @@ -841,7 +840,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 21) + assert(dotDirs.length === 22) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -875,7 +874,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 17) + assert(dotDirs.length === 18) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -912,7 +911,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 21) + assert(dotDirs.length === 22) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -949,7 +948,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 19) + assert(dotDirs.length === 20) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly