Skip to content

Commit

Permalink
Merge branch 'dev' into spark-rapids-tools-1420
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
  • Loading branch information
parthosa committed Dec 10, 2024
2 parents 383b71c + 9604133 commit 68cdf83
Show file tree
Hide file tree
Showing 139 changed files with 3,435 additions and 1,630 deletions.
50 changes: 50 additions & 0 deletions .github/workflows/license-header-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# A workflow to check copyright/license header
name: license header check

on:
pull_request:
types: [opened, synchronize, reopened]

jobs:
license-header-check:
runs-on: ubuntu-latest
if: "!contains(github.event.pull_request.title, '[bot]')"
steps:
- name: Get checkout depth
run: |
echo "PR_FETCH_DEPTH=$(( ${{ github.event.pull_request.commits }} + 10 ))" >> $GITHUB_ENV
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: ${{ env.PR_FETCH_DEPTH }}

- name: license-header-check
uses: NVIDIA/spark-rapids-common/license-header-check@main
with:
included_file_patterns: |
*.py,
*.toml,
*.ini,
*.yml,
*.yaml,
*.sh,
*.properties,
*.xml,
*.feature,
*.scala
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>24.10.1-SNAPSHOT</version>
<version>24.10.3-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*/
def isPlatformCSP: Boolean = false

/**
* Indicate if the platform requires path recommendations
*/
def requirePathRecommendations: Boolean = true

/**
* The maximum number of Gpus any instance in this platform supports.
*/
Expand Down Expand Up @@ -658,6 +663,7 @@ class EmrPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = A10GGpu

override def isPlatformCSP: Boolean = true
override def requirePathRecommendations: Boolean = false

override def getRetainedSystemProps: Set[String] = Set("EMR_CLUSTER_ID")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ToolTextFileWriter(
// No need to close the outputStream.
// Java should handle nested streams automatically.
utf8Writer.foreach { writer =>
logInfo(s"$finalLocationText output location: $textOutputLoc")
logDebug(s"$finalLocationText output location: $textOutputLoc")
writer.flush()
writer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -32,6 +32,7 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param maxTaskInputSizes a sequence of SQLMaxTaskInputSizes that contains the maximum input size
* @param stageDiagnostics the stage level Spark metrics for diagnostic purposes
*/
case class AggRawMetricsResult(
jobAggs: Seq[JobAggTaskMetricsProfileResult],
Expand All @@ -40,4 +41,5 @@ case class AggRawMetricsResult(
sqlAggs: Seq[SQLTaskAggMetricsProfileResult],
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes])
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnostics: Seq[StageDiagnosticResult])
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis

object StageAccumDiagnosticMetrics {
val MEMORY_SPILLED_METRIC = "internal.metrics.memoryBytesSpilled"
val DISK_SPILLED_METRIC = "internal.metrics.diskBytesSpilled"
val INPUT_BYTES_READ_METRIC = "internal.metrics.input.bytesRead"
val OUTPUT_BYTES_WRITTEN_METRIC = "internal.metrics.output.bytesWritten"
val SW_TOTAL_BYTES_METRIC = "internal.metrics.shuffle.write.bytesWritten"
val SR_FETCH_WAIT_TIME_METRIC = "internal.metrics.shuffle.read.fetchWaitTime"
val SW_WRITE_TIME_METRIC = "internal.metrics.shuffle.write.writeTime"
val GPU_SEMAPHORE_WAIT_METRIC = "gpuSemaphoreWait"

/**
* Get all diagnostic metrics
*/
def getAllDiagnosticMetrics: Set[String] = Set(MEMORY_SPILLED_METRIC,
DISK_SPILLED_METRIC, INPUT_BYTES_READ_METRIC, OUTPUT_BYTES_WRITTEN_METRIC,
SW_TOTAL_BYTES_METRIC, SR_FETCH_WAIT_TIME_METRIC, SW_WRITE_TIME_METRIC,
GPU_SEMAPHORE_WAIT_METRIC)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.tool.analysis

import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

Expand Down Expand Up @@ -56,6 +55,24 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// SQLPlanParser.
var unsupportedSQLPlan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and a set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and diagnostic metrics results (stored as a map between metric name
// and AccumProfileResults)
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] =
HashMap.empty[Long, HashMap[String, AccumProfileResults]]

/**
* Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping
* @param accum AccumProfileResults to be analyzed
*/
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = {
val stageId = accum.stageId
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults]
}
stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum
}

/**
* Connects Operators to Stages using AccumulatorIDs.
Expand All @@ -70,7 +87,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// Maps stages to operators by checking for non-zero intersection
// between nodeMetrics and stageAccumulateIDs
val nodeIdToStage = planGraph.allNodes.map { node =>
val mappedStages = SQLPlanParser.getStagesInSQLNode(node, app)
val nodeAccums = node.metrics.map(_.accumulatorId)
val mappedStages = app.getStageIDsFromAccumIds(nodeAccums)
((sqlId, node.id), mappedStages)
}.toMap
sqlPlanNodeIdToStageIds ++= nodeIdToStage
Expand Down Expand Up @@ -261,6 +279,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
validNodes.map(n => s"${n.name}(${n.id.toString})")
}.getOrElse(Seq.empty)
stageToNodeNames(sModel.stageInfo.stageId) = nodeNames
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
Expand Down Expand Up @@ -339,14 +358,19 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
Some(AccumProfileResults(
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum))
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
* object to aggregate the Raw metrics and returns the result
* @param app the AppBase to be analyzed
* @param index the application index
* @param sqlAnalyzer optional AppSQLPlanAnalyzer used to aggregate diagnostic metrics,
* this is already present in ApplicationInfo for Profiler, but for
* Qualification this argument needs to be provided.
* @return a single record of AggRawMetricsResult containing all the raw aggregated Spark
* metrics
*/
def getAggRawMetrics(app: AppBase, index: Int): AggRawMetricsResult = {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
Expand All @@ -38,7 +42,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
analysisObj.aggregateSparkMetricsBySql(index),
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)))
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
}

/**
Expand All @@ -59,7 +64,8 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
agg1.sqlAggs ++ agg2.sqlAggs,
agg1.ioAggs ++ agg2.ioAggs,
agg1.sqlDurAggs ++ agg2.sqlDurAggs,
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes)
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes,
agg1.stageDiagnostics ++ agg2.stageDiagnostics)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package com.nvidia.spark.rapids.tool.analysis

import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, TaskModel}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef, TaskModel}

/**
* Does analysis on the DataFrames from object of AppBase.
Expand All @@ -50,14 +52,14 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// Hashmap to cache the stage level metrics. It is initialized to None just in case the caller
// does not call methods in order starting with stage level metrics.
private var stageLevelCache:
Option[mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult]] = None
Option[LinkedHashMap[Int, StageAggTaskMetricsProfileResult]] = None

// Getter method used to protect the cache from out-of-order calls.
// If the stage-level metrics are not generated yet, generates and add them to the cache
private def stageLevelSparkMetrics(
index: Int): mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult] = {
index: Int): LinkedHashMap[Int, StageAggTaskMetricsProfileResult] = {
if (stageLevelCache.isEmpty) {
stageLevelCache = Some(mutable.LinkedHashMap[Int, StageAggTaskMetricsProfileResult]())
stageLevelCache = Some(LinkedHashMap[Int, StageAggTaskMetricsProfileResult]())
aggregateSparkMetricsByStageInternal(index)
}
stageLevelCache.get
Expand Down Expand Up @@ -320,6 +322,62 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlRows.toSeq
}

/**
* Aggregates the diagnostic SparkMetrics by stage.
* @param index the App-index (used by the profiler tool)
* @param analyzer optional AppSQLPlanAnalyzer which is used to pull stage level
* information like node names and diagnostic metrics results, only
* Qualification needs to provide this argument.
* @return sequence of StageDiagnosticAggTaskMetricsProfileResult
*/
def aggregateDiagnosticMetricsByStage(index: Int, analyzer: Option[AppSQLPlanAnalyzer] = None):
Seq[StageDiagnosticResult] = {
val sqlAnalyzer = analyzer match {
case Some(res) => res
case None =>
// for Profiler this is present in ApplicationInfo
app.asInstanceOf[ApplicationInfo].planMetricProcessor
}
val zeroAccumProfileResults =
AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L)

// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numTasks = tasksInStage.size
val nodeNames = sqlAnalyzer.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics.
getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]).
withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))

StageDiagnosticResult(index,
app.getAppName,
app.appId,
sm.stageInfo.stageId,
sm.duration,
numTasks,
srTotalBytesMetrics.min,
srTotalBytesMetrics.med,
srTotalBytesMetrics.max,
srTotalBytesMetrics.total,
diagnosticMetricsMap(MEMORY_SPILLED_METRIC),
diagnosticMetricsMap(DISK_SPILLED_METRIC),
diagnosticMetricsMap(INPUT_BYTES_READ_METRIC),
diagnosticMetricsMap(OUTPUT_BYTES_WRITTEN_METRIC),
diagnosticMetricsMap(SW_TOTAL_BYTES_METRIC),
diagnosticMetricsMap(SR_FETCH_WAIT_TIME_METRIC),
diagnosticMetricsMap(SW_WRITE_TIME_METRIC),
diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC),
nodeNames)
}.toSeq
}

/**
* Aggregates the SparkMetrics by stage. This is an internal method to populate the cached metrics
* to be used by other aggregators.
Expand All @@ -336,8 +394,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// Note:
// - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type,
// but avoiding it for readability.
val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonPeakMemoryAccumInfos = ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = ArrayBuffer[AccumInfo]()

if (app.isPhoton) {
app.accumManager.applyToAccumInfoMap { accumInfo =>
Expand Down Expand Up @@ -434,6 +492,24 @@ object AppSparkMetricsAnalyzer {
}
}

/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
StatisticsMetrics(0L, 0L, 0L, 0L)
} else {
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
val med = if (len % 2 == 0) {
(sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
} else {
sortedArr(len / 2)
}
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}
}

def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
Expand Down
Loading

0 comments on commit 68cdf83

Please sign in to comment.