diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 33194644e..3a862097b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -16,17 +16,16 @@ package com.nvidia.spark.rapids.tool.analysis -import java.util.concurrent.TimeUnit - import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap} import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._ +import com.nvidia.spark.rapids.tool.analysis.util.{AggAccumHelper, AggAccumPhotonHelper} import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper -import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} +import com.nvidia.spark.rapids.tool.profiling._ import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo -import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef, TaskModel} +import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef} /** * Does analysis on the DataFrames from object of AppBase. @@ -84,52 +83,47 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { if (jc.stageIds.isEmpty) { None } else { - val profResultsInJob = stageLevelSparkMetrics(index).filterKeys(jc.stageIds.contains).values - if (profResultsInJob.isEmpty) { + val jobAggAccumulator = new AggAccumHelper() + val perJobRec = jobAggAccumulator.accumPerJob( + jc.stageIds.collect { + case stageId if stageLevelSparkMetrics(index).contains(stageId) => + stageLevelSparkMetrics(index)(stageId) + }) + if (perJobRec.isEmptyAggregates) { None } else { - // Recalculate the duration sum, max, min, avg for the job based on the cached - // stage Profiling results - val tasksInJob = profResultsInJob.map(_.numTasks).sum - val durSum = profResultsInJob.map(_.durationSum).sum - val durMax = - AppSparkMetricsAnalyzer.maxWithEmptyHandling(profResultsInJob.map(_.durationMax)) - val durMin = - AppSparkMetricsAnalyzer.minWithEmptyHandling(profResultsInJob.map(_.durationMin)) - val durAvg = ToolUtils.calculateAverage(durSum, tasksInJob, 1) Some(JobAggTaskMetricsProfileResult(index, id, - tasksInJob, + perJobRec.numTasks, jc.duration, - profResultsInJob.map(_.diskBytesSpilledSum).sum, - durSum, - durMax, - durMin, - durAvg, - profResultsInJob.map(_.executorCPUTimeSum).sum, - profResultsInJob.map(_.executorDeserializeCpuTimeSum).sum, - profResultsInJob.map(_.executorDeserializeTimeSum).sum, - profResultsInJob.map(_.executorRunTimeSum).sum, - profResultsInJob.map(_.inputBytesReadSum).sum, - profResultsInJob.map(_.inputRecordsReadSum).sum, - profResultsInJob.map(_.jvmGCTimeSum).sum, - profResultsInJob.map(_.memoryBytesSpilledSum).sum, - profResultsInJob.map(_.outputBytesWrittenSum).sum, - profResultsInJob.map(_.outputRecordsWrittenSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling( - profResultsInJob.map(_.peakExecutionMemoryMax)), - profResultsInJob.map(_.resultSerializationTimeSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(profResultsInJob.map(_.resultSizeMax)), - profResultsInJob.map(_.srFetchWaitTimeSum).sum, - profResultsInJob.map(_.srLocalBlocksFetchedSum).sum, - profResultsInJob.map(_.srcLocalBytesReadSum).sum, - profResultsInJob.map(_.srRemoteBlocksFetchSum).sum, - profResultsInJob.map(_.srRemoteBytesReadSum).sum, - profResultsInJob.map(_.srRemoteBytesReadToDiskSum).sum, - profResultsInJob.map(_.srTotalBytesReadSum).sum, - profResultsInJob.map(_.swBytesWrittenSum).sum, - profResultsInJob.map(_.swRecordsWrittenSum).sum, - profResultsInJob.map(_.swWriteTimeSum).sum)) + perJobRec.diskBytesSpilledSum, + perJobRec.durationSum, + perJobRec.durationMax, + perJobRec.durationMin, + perJobRec.durationAvg, + perJobRec.executorCPUTimeSum, + perJobRec.executorDeserializeCpuTimeSum, + perJobRec.executorDeserializeTimeSum, + perJobRec.executorRunTimeSum, + perJobRec.inputBytesReadSum, + perJobRec.inputRecordsReadSum, + perJobRec.jvmGCTimeSum, + perJobRec.memoryBytesSpilledSum, + perJobRec.outputBytesWrittenSum, + perJobRec.outputRecordsWrittenSum, + perJobRec.peakExecutionMemoryMax, + perJobRec.resultSerializationTimeSum, + perJobRec.resultSizeMax, + perJobRec.srFetchWaitTimeSum, + perJobRec.srLocalBlocksFetchedSum, + perJobRec.srLocalBytesReadSum, + perJobRec.srRemoteBlocksFetchSum, + perJobRec.srRemoteBytesReadSum, + perJobRec.srRemoteBytesReadToDiskSum, + perJobRec.srTotalBytesReadSum, + perJobRec.swBytesWrittenSum, + perJobRec.swRecordsWrittenSum, + perJobRec.swWriteTimeSum)) } } } @@ -182,66 +176,55 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { if (app.sqlIdToStages.contains(sqlId)) { val stagesInSQL = app.sqlIdToStages(sqlId) // TODO: Should we only consider successful tasks? - val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values - if (cachedResBySQL.isEmpty) { + val sqlAggAccumulator = new AggAccumHelper() + val preSqlRec = sqlAggAccumulator.accumPerSQL( + stagesInSQL.collect { + case stageId if stageLevelSparkMetrics(index).contains(stageId) => + stageLevelSparkMetrics(index)(stageId) + }) + if (preSqlRec.isEmptyAggregates) { None } else { - // Recalculate the duration sum, max, min, avg for the job based on the cached - // stage Profiling results - val tasksInSql = cachedResBySQL.map(_.numTasks).sum - val durSum = cachedResBySQL.map(_.durationSum).sum - val durMax = - AppSparkMetricsAnalyzer.maxWithEmptyHandling(cachedResBySQL.map(_.durationMax)) - val durMin = - AppSparkMetricsAnalyzer.minWithEmptyHandling(cachedResBySQL.map(_.durationMin)) - val durAvg = ToolUtils.calculateAverage(durSum, tasksInSql, 1) - val diskBytes = cachedResBySQL.map(_.diskBytesSpilledSum).sum - val execCpuTime = cachedResBySQL.map(_.executorCPUTimeSum).sum - val execRunTime = cachedResBySQL.map(_.executorRunTimeSum).sum - val execCPURatio = ToolUtils.calculateDurationPercent(execCpuTime, execRunTime) - val inputBytesRead = cachedResBySQL.map(_.inputBytesReadSum).sum // set this here, so make sure we don't get it again until later - sqlCase.sqlCpuTimePercent = execCPURatio - + sqlCase.sqlCpuTimePercent = preSqlRec.executorCpuRatio Some(SQLTaskAggMetricsProfileResult(index, app.appId, sqlId, sqlCase.description, - tasksInSql, + preSqlRec.numTasks, sqlCase.duration, - execCpuTime, - execRunTime, - execCPURatio, - diskBytes, - durSum, - durMax, - durMin, - durAvg, - execCpuTime, - cachedResBySQL.map(_.executorDeserializeCpuTimeSum).sum, - cachedResBySQL.map(_.executorDeserializeTimeSum).sum, - execRunTime, - inputBytesRead, - inputBytesRead * 1.0 / tasksInSql, - cachedResBySQL.map(_.inputRecordsReadSum).sum, - cachedResBySQL.map(_.jvmGCTimeSum).sum, - cachedResBySQL.map(_.memoryBytesSpilledSum).sum, - cachedResBySQL.map(_.outputBytesWrittenSum).sum, - cachedResBySQL.map(_.outputRecordsWrittenSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling( - cachedResBySQL.map(_.peakExecutionMemoryMax)), - cachedResBySQL.map(_.resultSerializationTimeSum).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(cachedResBySQL.map(_.resultSizeMax)), - cachedResBySQL.map(_.srFetchWaitTimeSum).sum, - cachedResBySQL.map(_.srLocalBlocksFetchedSum).sum, - cachedResBySQL.map(_.srcLocalBytesReadSum).sum, - cachedResBySQL.map(_.srRemoteBlocksFetchSum).sum, - cachedResBySQL.map(_.srRemoteBytesReadSum).sum, - cachedResBySQL.map(_.srRemoteBytesReadToDiskSum).sum, - cachedResBySQL.map(_.srTotalBytesReadSum).sum, - cachedResBySQL.map(_.swBytesWrittenSum).sum, - cachedResBySQL.map(_.swRecordsWrittenSum).sum, - cachedResBySQL.map(_.swWriteTimeSum).sum)) + preSqlRec.executorCPUTimeSum, + preSqlRec.executorRunTimeSum, + preSqlRec.executorCpuRatio, + preSqlRec.diskBytesSpilledSum, + preSqlRec.durationSum, + preSqlRec.durationMax, + preSqlRec.durationMin, + preSqlRec.durationAvg, + preSqlRec.executorCPUTimeSum, + preSqlRec.executorDeserializeCpuTimeSum, + preSqlRec.executorDeserializeTimeSum, + preSqlRec.executorRunTimeSum, + preSqlRec.inputBytesReadSum, + preSqlRec.inputBytesReadAvg, + preSqlRec.inputRecordsReadSum, + preSqlRec.jvmGCTimeSum, + preSqlRec.memoryBytesSpilledSum, + preSqlRec.outputBytesWrittenSum, + preSqlRec.outputRecordsWrittenSum, + preSqlRec.peakExecutionMemoryMax, + preSqlRec.resultSerializationTimeSum, + preSqlRec.resultSizeMax, + preSqlRec.srFetchWaitTimeSum, + preSqlRec.srLocalBlocksFetchedSum, + preSqlRec.srLocalBytesReadSum, + preSqlRec.srRemoteBlocksFetchSum, + preSqlRec.srRemoteBytesReadSum, + preSqlRec.srRemoteBytesReadToDiskSum, + preSqlRec.srTotalBytesReadSum, + preSqlRec.swBytesWrittenSum, + preSqlRec.swRecordsWrittenSum, + preSqlRec.swWriteTimeSum)) } } else { None @@ -339,8 +322,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { app.asInstanceOf[ApplicationInfo].planMetricProcessor } val zeroAccumProfileResults = - AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L) - + AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L) + val emptyNodeNames = Seq.empty[String] + val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults] // TODO: this has stage attempts. we should handle different attempts app.stageManager.getAllStages.map { sm => // TODO: Should we only consider successful tasks? @@ -348,11 +332,11 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { sm.stageInfo.attemptNumber()) // count duplicate task attempts val numTasks = tasksInStage.size - val nodeNames = sqlAnalyzer.stageToNodeNames. - getOrElse(sm.stageInfo.stageId, Seq.empty[String]) - val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics. - getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]). - withDefaultValue(zeroAccumProfileResults) + val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames) + val diagnosticMetricsMap = + sqlAnalyzer.stageToDiagnosticMetrics + .getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics) + .withDefaultValue(zeroAccumProfileResults) val srTotalBytesMetrics = AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead)) @@ -417,10 +401,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { // TODO: Should we only consider successful tasks? val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId, sm.stageInfo.attemptNumber()) - // count duplicate task attempts - val numAttempts = tasksInStage.size - val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) { + val accumHelperObj = if (app.isPhoton) { // If this a photon app, use the photonHelper // For max peak memory, we need to look at the accumulators at the task level. val peakMemoryValues = tasksInStage.flatMap { taskModel => photonPeakMemoryAccumInfos.flatMap { accumInfo => @@ -431,50 +413,45 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo => accumInfo.stageValuesMap.get(sm.stageInfo.stageId) } - (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), - TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)) + new AggAccumPhotonHelper(shuffleWriteValues, peakMemoryValues) } else { // For non-Photon apps, use the task metrics directly. - val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory) - val shuffleWriteTime = tasksInStage.map(_.sw_writeTime) - (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), - shuffleWriteTime.sum) + new AggAccumHelper() } - - val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage) + val perStageRec = accumHelperObj.accumPerStage(tasksInStage) val stageRow = StageAggTaskMetricsProfileResult(index, sm.stageInfo.stageId, - numAttempts, // TODO: why is this numAttempts and not numTasks? + // numTasks includes duplicate task attempts + perStageRec.numTasks, sm.duration, - tasksInStage.map(_.diskBytesSpilled).sum, - durSum, - durMax, - durMin, - durAvg, - tasksInStage.map(_.executorCPUTime).sum, - tasksInStage.map(_.executorDeserializeCPUTime).sum, - tasksInStage.map(_.executorDeserializeTime).sum, - tasksInStage.map(_.executorRunTime).sum, - tasksInStage.map(_.input_bytesRead).sum, - tasksInStage.map(_.input_recordsRead).sum, - tasksInStage.map(_.jvmGCTime).sum, - tasksInStage.map(_.memoryBytesSpilled).sum, - tasksInStage.map(_.output_bytesWritten).sum, - tasksInStage.map(_.output_recordsWritten).sum, - peakMemoryMax, - tasksInStage.map(_.resultSerializationTime).sum, - AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.resultSize)), - tasksInStage.map(_.sr_fetchWaitTime).sum, - tasksInStage.map(_.sr_localBlocksFetched).sum, - tasksInStage.map(_.sr_localBytesRead).sum, - tasksInStage.map(_.sr_remoteBlocksFetched).sum, - tasksInStage.map(_.sr_remoteBytesRead).sum, - tasksInStage.map(_.sr_remoteBytesReadToDisk).sum, - tasksInStage.map(_.sr_totalBytesRead).sum, - tasksInStage.map(_.sw_bytesWritten).sum, - tasksInStage.map(_.sw_recordsWritten).sum, - shuffleWriteTimeSum - ) + perStageRec.diskBytesSpilledSum, + perStageRec.durationSum, + perStageRec.durationMax, + perStageRec.durationMin, + perStageRec.durationAvg, + perStageRec.executorCPUTimeSum, + perStageRec.executorDeserializeCpuTimeSum, + perStageRec.executorDeserializeTimeSum, + perStageRec.executorRunTimeSum, + perStageRec.inputBytesReadSum, + perStageRec.inputRecordsReadSum, + perStageRec.jvmGCTimeSum, + perStageRec.memoryBytesSpilledSum, + perStageRec.outputBytesWrittenSum, + perStageRec.outputRecordsWrittenSum, + perStageRec.peakExecutionMemoryMax, + perStageRec.resultSerializationTimeSum, + perStageRec.resultSizeMax, + perStageRec.srFetchWaitTimeSum, + perStageRec.srLocalBlocksFetchedSum, + perStageRec.srLocalBytesReadSum, + perStageRec.srRemoteBlocksFetchSum, + perStageRec.srRemoteBytesReadSum, + perStageRec.srRemoteBytesReadToDiskSum, + perStageRec.srTotalBytesReadSum, + perStageRec.swBytesWrittenSum, + perStageRec.swRecordsWrittenSum, + perStageRec.swWriteTimeSum) stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow) } } @@ -482,16 +459,6 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { object AppSparkMetricsAnalyzer { - def getDurations(tcs: Iterable[TaskModel]): (Long, Long, Long, Double) = { - val durations = tcs.map(_.duration) - if (durations.nonEmpty) { - (durations.sum, durations.max, durations.min, - ToolUtils.calculateAverage(durations.sum, durations.size, 1)) - } else { - (0L, 0L, 0L, 0.toDouble) - } - } - /** * Given an input iterable, returns its min, median, max and sum. */ @@ -509,20 +476,4 @@ object AppSparkMetricsAnalyzer { StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum) } } - - def maxWithEmptyHandling(arr: Iterable[Long]): Long = { - if (arr.isEmpty) { - 0L - } else { - arr.max - } - } - - def minWithEmptyHandling(arr: Iterable[Long]): Long = { - if (arr.isEmpty) { - 0L - } else { - arr.min - } - } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala new file mode 100644 index 000000000..b42ac08b4 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * A helper class to facilitate the accumulation of aggregate metrics. + * This is a separate class to allow further customization in the future. For example, + * a parellel processor can be used to split the iterables without changing the caller side. + */ +class AggAccumHelper { + + private def accumCachedRecords[R <: TaskMetricsAccumRec]( + stageRecords: Iterable[StageAggTaskMetricsProfileResult], + rec: R): Unit = { + stageRecords.foreach(rec.addRecord) + rec.finalizeAggregation() + } + + protected def createStageAccumRecord(): TaskMetricsAccumRec = { + StageAggAccum() + } + + def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = { + val resRec = createStageAccumRecord() + taskRecords.foreach(resRec.addRecord) + resRec.finalizeAggregation() + resRec + } + + def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = { + val resRec = SQLAggAccum() + accumCachedRecords(stageRecords, resRec) + resRec + } + + def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = { + val resRec = JobAggAccum() + accumCachedRecords(stageRecords, resRec) + resRec + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala new file mode 100644 index 000000000..4f1356960 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +/** + * Implementation of AggAccumHelper for Photon. + * It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those + * values are not available in the TaskModel. + */ +class AggAccumPhotonHelper( + shuffleWriteValues: Iterable[Long], + peakMemValues: Iterable[Long]) extends AggAccumHelper { + + override def createStageAccumRecord(): TaskMetricsAccumRec = { + StageAggPhoton(shuffleWriteValues, peakMemValues) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala new file mode 100644 index 000000000..a8e5b78db --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * Accumulator for Job Aggregates. + * This is an optimization to avoid using the Scala collections API on each field for the entire + * number of tasks/stages in a Job. + */ +case class JobAggAccum() extends TaskMetricsAccumRec { + override def addRecord(rec: TaskModel): Unit = { + throw new UnsupportedOperationException( + "Not implemented: JobAggAccum accepts only cached records") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala new file mode 100644 index 000000000..b8222679f --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * Accumulator for SQL Aggregates. + * This is an optimization to avoid using the Scala collections API on each field for the entire + * number of tasks/stages in a SQL. + */ +case class SQLAggAccum( + var executorCpuRatio: Double = 0, + // Not added to the output since it is used only by the AutoTuner + var inputBytesReadAvg: Double = 0) extends TaskMetricsAccumRec { + + override def finalizeAggregation(): Unit = { + super.finalizeAggregation() + executorCpuRatio = ToolUtils.calculateDurationPercent(executorCPUTimeSum, executorRunTimeSum) + inputBytesReadAvg = ToolUtils.calculateAverage(inputBytesReadSum, numTasks, 1) + } + + override def addRecord(rec: TaskModel): Unit = { + throw new UnsupportedOperationException( + "Not implemented: SQLAggAccum accepts only cached records") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala new file mode 100644 index 000000000..c88f1a77d --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +/** + * Accumulator for Stage Aggregates. + * This is an optimization to avoid using the Scala collections API on each field for the entire + * number of tasks in a Stage. + */ +case class StageAggAccum() extends TaskMetricsAccumRec { + override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { + throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" + + "calculate stage aggregates") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala new file mode 100644 index 000000000..ed7127050 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import java.util.concurrent.TimeUnit + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +/** + * Implementation of Accumulator object for Photon. + * It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those + * values are not available in the TaskModel. + */ +case class StageAggPhoton( + shuffleWriteValues: Iterable[Long], + peakMemValues: Iterable[Long]) extends TaskMetricsAccumRec { + + override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { + throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" + + "calculate stage aggregates") + } + + override def finalizeAggregation(): Unit = { + // Fix the shuffleWriteTimes and the peakMemoryValues to use the shuffleWriteValues and + // the peakMemValues. + swWriteTimeSum = 0 + peakExecutionMemoryMax = 0 + if (!isEmptyAggregates) { + // Re-calculate the photon specific fields only if the accumulator has tasks. + // Otherwise, leave it as 0. + if (shuffleWriteValues.nonEmpty) { + swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum) + } + if (peakMemValues.nonEmpty) { + peakExecutionMemoryMax = peakMemValues.max + } + } + super.finalizeAggregation() + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala new file mode 100644 index 000000000..b5d98b9ac --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.analysis.util + +import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult + +import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.store.TaskModel + +/** + * Accumulator used for task metrics. + * This is an optimization decision to avoid using Scala builtin collections on every field in the + * taskModel. + */ +class TaskMetricsAccumRec { + var numTasks: Int = 0 + var diskBytesSpilledSum: Long = 0 + var durationSum: Long = 0 + var durationMax: Long = Long.MinValue + var durationMin: Long = Long.MaxValue + var durationAvg: Double = 0.0 + var executorCPUTimeSum: Long = 0 + var executorDeserializeCpuTimeSum: Long = 0 + var executorDeserializeTimeSum: Long = 0 + var executorRunTimeSum: Long = 0 + var inputBytesReadSum: Long = 0 + var inputRecordsReadSum: Long = 0 + var jvmGCTimeSum: Long = 0 + var memoryBytesSpilledSum: Long = 0 + var outputBytesWrittenSum: Long = 0 + var outputRecordsWrittenSum: Long = 0 + var peakExecutionMemoryMax: Long = Long.MinValue + var resultSerializationTimeSum: Long = 0 + var resultSizeMax: Long = Long.MinValue + var srFetchWaitTimeSum: Long = 0 + var srLocalBlocksFetchedSum: Long = 0 + var srLocalBytesReadSum: Long = 0 + var srRemoteBlocksFetchSum: Long = 0 + var srRemoteBytesReadSum: Long = 0 + var srRemoteBytesReadToDiskSum: Long = 0 + var srTotalBytesReadSum: Long = 0 + var swBytesWrittenSum: Long = 0 + var swRecordsWrittenSum: Long = 0 + var swWriteTimeSum: Long = 0 + + /** + * Assumption that 0-tasks implies no aggregations on metrics. This means that metrics on + * job/SQL levels won't be accumulated as long as no tasks are accounted for. + */ + def isEmptyAggregates: Boolean = numTasks == 0 + + /** + * Reset all fields to 0. This is used to reset the fields when the Task iterator is empty. + * When the iterator is empty, then fields such as "max" should be reset to 0. + */ + def resetFields(): Unit = { + durationMax = 0 + durationMin = 0 + peakExecutionMemoryMax = 0 + resultSizeMax = 0 + } + + def addRecord(rec: TaskModel): Unit = { + numTasks += 1 + // SumFields + diskBytesSpilledSum += rec.diskBytesSpilled + durationSum += rec.duration + executorCPUTimeSum += rec.executorCPUTime + executorDeserializeCpuTimeSum += rec.executorDeserializeCPUTime + executorDeserializeTimeSum += rec.executorDeserializeTime + executorRunTimeSum += rec.executorRunTime + inputBytesReadSum += rec.input_bytesRead + inputRecordsReadSum += rec.input_recordsRead + jvmGCTimeSum += rec.jvmGCTime + memoryBytesSpilledSum += rec.memoryBytesSpilled + outputBytesWrittenSum += rec.output_bytesWritten + outputRecordsWrittenSum += rec.output_recordsWritten + resultSerializationTimeSum += rec.resultSerializationTime + srFetchWaitTimeSum += rec.sr_fetchWaitTime + srLocalBlocksFetchedSum += rec.sr_localBlocksFetched + srLocalBytesReadSum += rec.sr_localBytesRead + srRemoteBlocksFetchSum += rec.sr_remoteBlocksFetched + srRemoteBytesReadSum += rec.sr_remoteBytesRead + srRemoteBytesReadToDiskSum += rec.sr_remoteBytesReadToDisk + srTotalBytesReadSum += rec.sr_totalBytesRead + swBytesWrittenSum += rec.sw_bytesWritten + swRecordsWrittenSum += rec.sw_recordsWritten + swWriteTimeSum += rec.sw_writeTime + // Max fields + durationMax = math.max(durationMax, rec.duration) + peakExecutionMemoryMax = math.max(peakExecutionMemoryMax, rec.peakExecutionMemory) + resultSizeMax = math.max(resultSizeMax, rec.resultSize) + // Min Fields + durationMin = math.min(durationMin, rec.duration) + } + + def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { + // Sums + numTasks += rec.numTasks + durationSum += rec.durationSum + diskBytesSpilledSum += rec.diskBytesSpilledSum + executorCPUTimeSum += rec.executorCPUTimeSum + executorRunTimeSum += rec.executorRunTimeSum + inputBytesReadSum += rec.inputBytesReadSum + executorDeserializeCpuTimeSum += rec.executorDeserializeCpuTimeSum + executorDeserializeTimeSum += rec.executorDeserializeTimeSum + inputRecordsReadSum += rec.inputRecordsReadSum + jvmGCTimeSum += rec.jvmGCTimeSum + memoryBytesSpilledSum += rec.memoryBytesSpilledSum + outputBytesWrittenSum += rec.outputBytesWrittenSum + outputRecordsWrittenSum += rec.outputRecordsWrittenSum + resultSerializationTimeSum += rec.resultSerializationTimeSum + srFetchWaitTimeSum += rec.srFetchWaitTimeSum + srLocalBlocksFetchedSum += rec.srLocalBlocksFetchedSum + srLocalBytesReadSum += rec.srcLocalBytesReadSum + srRemoteBlocksFetchSum += rec.srRemoteBlocksFetchSum + srRemoteBytesReadSum += rec.srRemoteBytesReadSum + srRemoteBytesReadToDiskSum += rec.srRemoteBytesReadToDiskSum + srTotalBytesReadSum += rec.srTotalBytesReadSum + swBytesWrittenSum += rec.swBytesWrittenSum + swRecordsWrittenSum += rec.swRecordsWrittenSum + swWriteTimeSum += rec.swWriteTimeSum + // Max + durationMax = math.max(durationMax, rec.durationMax) + peakExecutionMemoryMax = math.max(peakExecutionMemoryMax, rec.peakExecutionMemoryMax) + resultSizeMax = math.max(resultSizeMax, rec.resultSizeMax) + // Min + durationMin = math.min(durationMin, rec.durationMin) + } + + /** + * This method should be called to finalize the accumulations of all the metrics. + * For example, calculating averages and doing any last transformations on a field before the + * results are consumed. + */ + def finalizeAggregation(): Unit = { + durationAvg = ToolUtils.calculateAverage(durationSum, numTasks, 1) + if (numTasks < 1) { + // number of tasks is 0, then we should reset fields such as (max, min) to 0. + resetFields() + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala index 7b70bedb2..35c9c19e1 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala @@ -27,6 +27,7 @@ case class AccumMetaRef(id: Long, name: AccumNameRef) { } object AccumMetaRef { + val EMPTY_ACCUM_META_REF: AccumMetaRef = new AccumMetaRef(0L, AccumNameRef.EMPTY_ACC_NAME_REF) def apply(id: Long, name: Option[String]): AccumMetaRef = new AccumMetaRef(id, AccumNameRef.getOrCreateAccumNameRef(name)) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala index 0172f5229..4ce41e4a5 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala @@ -42,7 +42,7 @@ case class AccumNameRef(value: String) { object AccumNameRef { // Dummy AccNameRef to represent None accumulator names. This is an optimization to avoid // storing an option[string] for all accumulable names which leads to "get-or-else" everywhere. - private val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A") + val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A") // A global table to store reference to all accumulator names. The map is accessible by all // threads (different applications) running in parallel. This avoids duplicate work across // different threads.