diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 48723739a..e36d9ec8f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -19,8 +19,9 @@ package com.nvidia.spark.rapids.tool.analysis import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer -import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper +import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper.{PHOTON_METRIC_DISK_SPILL_SIZE_LABEL, PHOTON_METRIC_PEAK_MEMORY_LABEL, PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL} import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult} import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} @@ -328,29 +329,24 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { private def aggregateSparkMetricsByStageInternal(index: Int): Unit = { // TODO: this has stage attempts. we should handle different attempts - // For Photon apps, peak memory and shuffle write time need to be calculated from accumulators + // For Photon apps, certain metrics need to be calculated from accumulators // instead of task metrics. // Approach: // 1. Collect accumulators for each metric type. // 2. For each stage, retrieve the relevant accumulators and calculate aggregated values. - // Note: - // - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type, - // but avoiding it for readability. - val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]() - val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]() + val photonAccumInfos = mutable.HashMap[String, ArrayBuffer[AccumInfo]]( + PHOTON_METRIC_PEAK_MEMORY_LABEL -> ArrayBuffer[AccumInfo](), + PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL -> ArrayBuffer[AccumInfo](), + PHOTON_METRIC_DISK_SPILL_SIZE_LABEL -> ArrayBuffer[AccumInfo]() + ) if (app.isPhoton) { app.accumManager.applyToAccumInfoMap { accumInfo => - accumInfo.infoRef.name.value match { - case name if name.contains( - DatabricksParseHelper.PHOTON_METRIC_PEAK_MEMORY_LABEL) => - // Collect accumulators for peak memory - photonPeakMemoryAccumInfos += accumInfo - case name if name.contains( - DatabricksParseHelper.PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL) => - // Collect accumulators for shuffle write time - photonShuffleWriteTimeAccumInfos += accumInfo - case _ => // Ignore other accumulators + val metricName = accumInfo.infoRef.name.value + // Append to the corresponding metric buffer + photonAccumInfos.keys.find(metricName.contains) match { + case Some(metricLabel) => photonAccumInfos(metricLabel) += accumInfo + case None => // Ignore other accumulators } } } @@ -362,25 +358,32 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { // count duplicate task attempts val numAttempts = tasksInStage.size - val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) { + val (peakMemoryMax, shuffleWriteTimeSum, diskSpillSizeSum) = if (app.isPhoton) { // For max peak memory, we need to look at the accumulators at the task level. val peakMemoryValues = tasksInStage.flatMap { taskModel => - photonPeakMemoryAccumInfos.flatMap { accumInfo => - accumInfo.taskUpdatesMap.get(taskModel.taskId) + photonAccumInfos(PHOTON_METRIC_PEAK_MEMORY_LABEL).flatMap { + accumInfo => accumInfo.taskUpdatesMap.get(taskModel.taskId) } } // For sum of shuffle write time, we need to look at the accumulators at the stage level. - val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo => - accumInfo.stageValuesMap.get(sm.stageInfo.stageId) + val shuffleWriteValues = photonAccumInfos(PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL).flatMap { + accumInfo => accumInfo.stageValuesMap.get(sm.stageInfo.stageId) + } + // For sum of disk spill size, we need to look at the accumulators at the stage level. + val diskSpillSizeValues = photonAccumInfos(PHOTON_METRIC_DISK_SPILL_SIZE_LABEL).flatMap { + accumInfo => accumInfo.stageValuesMap.get(sm.stageInfo.stageId) } (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), - TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)) + TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum), + diskSpillSizeValues.sum) } else { // For non-Photon apps, use the task metrics directly. val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory) - val shuffleWriteTime = tasksInStage.map(_.sw_writeTime) + val shuffleWriteValues = tasksInStage.map(_.sw_writeTime) + val diskSpillSizeValues = tasksInStage.map(_.diskBytesSpilled) (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), - shuffleWriteTime.sum) + shuffleWriteValues.sum, + diskSpillSizeValues.sum) } val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage) @@ -388,7 +391,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { sm.stageInfo.stageId, numAttempts, // TODO: why is this numAttempts and not numTasks? sm.duration, - tasksInStage.map(_.diskBytesSpilled).sum, + diskSpillSizeSum, durSum, durMax, durMin, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala index 886c9986b..2b2e3a7a3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DatabricksParseHelper.scala @@ -57,6 +57,7 @@ object DatabricksParseHelper extends Logging { val PHOTON_METRIC_CUMULATIVE_TIME_LABEL = "cumulative time" // Alternative for "scan time" val PHOTON_METRIC_PEAK_MEMORY_LABEL = "peak memory usage" // Alternative for "peak execution memory" val PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL = "part of shuffle file write" // Alternative for "shuffle write time" + val PHOTON_METRIC_DISK_SPILL_SIZE_LABEL = "num bytes spilled to disk" // Alternative for "spill size" // scalastyle:on private val PHOTON_PATTERN: Regex = "Photon[a-zA-Z]*".r diff --git a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv index 97de9ad08..ee122fa65 100644 --- a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv +++ b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_job_metrics_agg_expectation.csv @@ -27,9 +27,9 @@ appIndex,jobId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_max, 1,25,200,4603,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0 1,36,1,4556,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0 1,29,200,4555,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0 -1,32,1,4515,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,32,1,4515,2147483648,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 1,39,1,4488,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0 -1,37,1,4481,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,37,1,4481,1073741824,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 1,40,1,4476,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0 1,56,1,1055,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0 1,19,200,803,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0 diff --git a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv index 63aaf7a1b..1aa68d482 100644 --- a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv +++ b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_sql_metrics_agg_expectation.csv @@ -1,2 +1,2 @@ appIndex,appID,sqlID,description,numTasks,Duration,executorCPUTime,executorRunTime,executorCPURatio,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum -1,"app-20240919162642-0000",26,"query88",3472,250542,2883837,3818106,75.53,0,3858136,6743,54,1111.2,2883837,12349,18186,3818106,52997115316,69120188398,16100,0,0,0,250840493,181,16203,1394,1759,201596,1750,201200,0,402796,218614,19946,154 +1,"app-20240919162642-0000",26,"query88",3472,250542,2883837,3818106,75.53,3221225472,3858136,6743,54,1111.2,2883837,12349,18186,3818106,52997115316,69120188398,16100,0,0,0,250840493,181,16203,1394,1759,201596,1750,201200,0,402796,218614,19946,154 diff --git a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv index ae34c1ac0..2e1ee2d6f 100644 --- a/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv +++ b/core/src/test/resources/ProfilingExpectations/nds_q88_photon_db_13_3_stage_metrics_agg_expectation.csv @@ -27,10 +27,10 @@ appIndex,stageId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_ma 1,25,200,4599,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0 1,29,200,4552,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0 1,35,1,4525,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0 -1,36,1,4509,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,36,1,4509,2147483648,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 1,39,1,4474,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0 1,40,1,4469,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0 -1,37,1,4464,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 +1,37,1,4464,1073741824,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0 1,107,1,1052,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0 1,19,200,794,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0 1,26,1,315,0,312,312,312,312.0,2,1,1,306,0,0,0,0,0,0,0,0,3777,0,0,0,0,0,0,0,0,0,0 diff --git a/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd b/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd index 505a36b01..7a749567d 100644 Binary files a/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd and b/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd differ