Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for disk spill metrics in Photon jobs #1405

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package com.nvidia.spark.rapids.tool.analysis
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper.{PHOTON_METRIC_DISK_SPILL_SIZE_LABEL, PHOTON_METRIC_PEAK_MEMORY_LABEL, PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
Expand Down Expand Up @@ -328,29 +329,24 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
private def aggregateSparkMetricsByStageInternal(index: Int): Unit = {
// TODO: this has stage attempts. we should handle different attempts

// For Photon apps, peak memory and shuffle write time need to be calculated from accumulators
// For Photon apps, certain metrics need to be calculated from accumulators
// instead of task metrics.
// Approach:
// 1. Collect accumulators for each metric type.
// 2. For each stage, retrieve the relevant accumulators and calculate aggregated values.
// Note:
// - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type,
// but avoiding it for readability.
val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonAccumInfos = mutable.HashMap[String, ArrayBuffer[AccumInfo]](
PHOTON_METRIC_PEAK_MEMORY_LABEL -> ArrayBuffer[AccumInfo](),
PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL -> ArrayBuffer[AccumInfo](),
PHOTON_METRIC_DISK_SPILL_SIZE_LABEL -> ArrayBuffer[AccumInfo]()
)

if (app.isPhoton) {
app.accumManager.applyToAccumInfoMap { accumInfo =>
accumInfo.infoRef.name.value match {
case name if name.contains(
DatabricksParseHelper.PHOTON_METRIC_PEAK_MEMORY_LABEL) =>
// Collect accumulators for peak memory
photonPeakMemoryAccumInfos += accumInfo
case name if name.contains(
DatabricksParseHelper.PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL) =>
// Collect accumulators for shuffle write time
photonShuffleWriteTimeAccumInfos += accumInfo
case _ => // Ignore other accumulators
val metricName = accumInfo.infoRef.name.value
// Append to the corresponding metric buffer
photonAccumInfos.keys.find(metricName.contains) match {
case Some(metricLabel) => photonAccumInfos(metricLabel) += accumInfo
case None => // Ignore other accumulators
}
}
}
Expand All @@ -362,33 +358,40 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// count duplicate task attempts
val numAttempts = tasksInStage.size

val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) {
val (peakMemoryMax, shuffleWriteTimeSum, diskSpillSizeSum) = if (app.isPhoton) {
// For max peak memory, we need to look at the accumulators at the task level.
val peakMemoryValues = tasksInStage.flatMap { taskModel =>
photonPeakMemoryAccumInfos.flatMap { accumInfo =>
accumInfo.taskUpdatesMap.get(taskModel.taskId)
photonAccumInfos(PHOTON_METRIC_PEAK_MEMORY_LABEL).flatMap {
accumInfo => accumInfo.taskUpdatesMap.get(taskModel.taskId)
}
}
// For sum of shuffle write time, we need to look at the accumulators at the stage level.
val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo =>
accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
val shuffleWriteValues = photonAccumInfos(PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL).flatMap {
accumInfo => accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
}
// For sum of disk spill size, we need to look at the accumulators at the stage level.
val diskSpillSizeValues = photonAccumInfos(PHOTON_METRIC_DISK_SPILL_SIZE_LABEL).flatMap {
accumInfo => accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
}
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum))
TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum),
diskSpillSizeValues.sum)
} else {
// For non-Photon apps, use the task metrics directly.
val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory)
val shuffleWriteTime = tasksInStage.map(_.sw_writeTime)
val shuffleWriteValues = tasksInStage.map(_.sw_writeTime)
val diskSpillSizeValues = tasksInStage.map(_.diskBytesSpilled)
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
shuffleWriteTime.sum)
shuffleWriteValues.sum,
diskSpillSizeValues.sum)
}

val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage)
val stageRow = StageAggTaskMetricsProfileResult(index,
sm.stageInfo.stageId,
numAttempts, // TODO: why is this numAttempts and not numTasks?
sm.duration,
tasksInStage.map(_.diskBytesSpilled).sum,
diskSpillSizeSum,
durSum,
durMax,
durMin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object DatabricksParseHelper extends Logging {
val PHOTON_METRIC_CUMULATIVE_TIME_LABEL = "cumulative time" // Alternative for "scan time"
val PHOTON_METRIC_PEAK_MEMORY_LABEL = "peak memory usage" // Alternative for "peak execution memory"
val PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL = "part of shuffle file write" // Alternative for "shuffle write time"
val PHOTON_METRIC_DISK_SPILL_SIZE_LABEL = "num bytes spilled to disk" // Alternative for "spill size"
// scalastyle:on

private val PHOTON_PATTERN: Regex = "Photon[a-zA-Z]*".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ appIndex,jobId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_max,
1,25,200,4603,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0
1,36,1,4556,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0
1,29,200,4555,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0
1,32,1,4515,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,32,1,4515,2147483648,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,39,1,4488,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0
1,37,1,4481,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,37,1,4481,1073741824,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,40,1,4476,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0
1,56,1,1055,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0
1,19,200,803,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
appIndex,appID,sqlID,description,numTasks,Duration,executorCPUTime,executorRunTime,executorCPURatio,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum
1,"app-20240919162642-0000",26,"query88",3472,250542,2883837,3818106,75.53,0,3858136,6743,54,1111.2,2883837,12349,18186,3818106,52997115316,69120188398,16100,0,0,0,250840493,181,16203,1394,1759,201596,1750,201200,0,402796,218614,19946,154
1,"app-20240919162642-0000",26,"query88",3472,250542,2883837,3818106,75.53,3221225472,3858136,6743,54,1111.2,2883837,12349,18186,3818106,52997115316,69120188398,16100,0,0,0,250840493,181,16203,1394,1759,201596,1750,201200,0,402796,218614,19946,154
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ appIndex,stageId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_ma
1,25,200,4599,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0
1,29,200,4552,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0
1,35,1,4525,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0
1,36,1,4509,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,36,1,4509,2147483648,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,39,1,4474,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0
1,40,1,4469,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0
1,37,1,4464,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,37,1,4464,1073741824,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,107,1,1052,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0
1,19,200,794,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0
1,26,1,315,0,312,312,312,312.0,2,1,1,306,0,0,0,0,0,0,0,0,3777,0,0,0,0,0,0,0,0,0,0
Expand Down
Binary file not shown.