Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into spark…
Browse files Browse the repository at this point in the history
…-rapids-tools-1120
  • Loading branch information
bilalbari committed Jul 12, 2024
2 parents d6bc2da + 38eccc1 commit 3a8d4c5
Show file tree
Hide file tree
Showing 36 changed files with 387 additions and 79 deletions.
13 changes: 10 additions & 3 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) {
// factors since we don't have speedup factors for all combinations of
// platforms and GPUs. We expect speedup factor usage to be going away
// so this is less of an issue.
def defaultGpuForSpeedupFactor: GpuDevice = getGpuOrDefault
def defaultGpuForSpeedupFactor: GpuDevice = defaultGpuDevice

/**
* Recommendations to be excluded from the list of recommendations.
Expand Down Expand Up @@ -113,12 +113,15 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) {
s"operatorsScore-$platformName-$getGpuOrDefaultForSpeedupFactors.csv"
}

def getDefaultOperatorScoreFile: String = {
s"operatorsScore-$platformName-$defaultGpuForSpeedupFactor.csv"
}

final def getGpuOrDefault: GpuDevice = gpuDevice.getOrElse(defaultGpuDevice)

final def getGpuOrDefaultForSpeedupFactors: GpuDevice =
gpuDevice.getOrElse(defaultGpuForSpeedupFactor)


final def setGpuDevice(gpuDevice: GpuDevice): Unit = {
this.gpuDevice = Some(gpuDevice)
}
Expand Down Expand Up @@ -148,7 +151,6 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) {

abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice]) extends Platform(gpuDevice) {
override val defaultGpuDevice: GpuDevice = T4Gpu

override def isPlatformCSP: Boolean = true

// note that Databricks generally sets the spark.executor.memory for the user. Our
Expand Down Expand Up @@ -279,6 +281,11 @@ object PlatformFactory extends Logging {
def createInstance(platformKey: String = PlatformNames.DEFAULT): Platform = {
val (platformName, gpuName) = extractPlatformGpuName(platformKey)
val gpuDevice = gpuName.flatMap(GpuDevice.createInstance)
// case when gpu name is detected but not in device map
if (gpuName.isDefined && gpuDevice.isEmpty) {
throw new IllegalArgumentException(s"Unsupported GPU device: ${gpuName.get}. " +
s"Supported GPU devices are: ${GpuDevice.deviceMap.keys.mkString(", ")}.")
}
val platform = createPlatformInstance(platformName, gpuDevice)
logInfo(s"Using platform: $platform")
platform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
Expand Down Expand Up @@ -342,6 +342,58 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
}
}

/**
* Generate the stage level metrics for the SQL plan including GPU metrics if applicable.
* Along with Spark defined metrics, below is the list of GPU metrics that are collected if they
* are present in the eventlog:
* gpuSemaphoreWait, gpuRetryCount, gpuSplitAndRetryCount, gpuRetryBlockTime,
* gpuRetryComputationTime, gpuSpillToHostTime, gpuSpillToDiskTime, gpuReadSpillFromHostTime,
* gpuReadSpillFromDiskTime
*
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {

def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = {
// drop the metrics if there are no values
if (updates.isEmpty) {
None
} else if (updates.length == 1) {
Some(StatisticsMetrics(0L, 0L, 0L, updates.sum))
} else {
Some(StatisticsMetrics(
min = updates.head,
med = updates(updates.size / 2),
max = updates.last,
total = updates.sum
))
}
}

// Process taskStageAccumMap to get all the accumulators
val stageLevelAccums = app.taskStageAccumMap.values.flatten
val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId)
groupedByAccumulatorId.flatMap { case (accumulatorId, accums) =>
// Extract and sort the update values, defaulting to 0 if not present
val sortedUpdates = accums.flatMap(_.update).toSeq.sorted

// Compute the statistics for the accumulator if applicable
computeStatistics(sortedUpdates).map { stats =>
val sampleAccum = accums.head
AccumProfileResults(
appIndex = appIndex,
stageId = sampleAccum.stageId.toString,
accumulatorId = accumulatorId,
name = sampleAccum.name.getOrElse("Unknown"),
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
}.toSeq
}
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class ApplicationSummaryInfo(
rapidsProps: Seq[RapidsPropertyProfileResult],
rapidsJar: Seq[RapidsJarProfileResult],
sqlMetrics: Seq[SQLAccumProfileResults],
stageMetrics: Seq[AccumProfileResults],
jobAggMetrics: Seq[JobAggTaskMetricsProfileResult],
stageAggMetrics: Seq[StageAggTaskMetricsProfileResult],
sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfSQLPlanMetricsView.getRawView(apps)
}

// Print all Stage level Metrics
def getStageLevelMetrics: Seq[AccumProfileResults] = {
ProfStageMetricView.getRawView(apps)
}

/**
* This function is meant to clean up Delta log execs so that you could align
* SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log
Expand All @@ -103,6 +108,12 @@ object CollectInformation extends Logging {
}
}

def generateStageLevelAccums(apps: Seq[ApplicationInfo]): Seq[AccumProfileResults] = {
apps.flatMap { app =>
app.planMetricProcessor.generateStageLevelAccums()
}
}

def printSQLPlans(apps: Seq[ApplicationInfo], outputDir: String): Unit = {
for (app <- apps) {
val planFileWriter = new ToolTextFileWriter(s"$outputDir/${app.appId}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,23 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
}
}

case class AccumProfileResults(appIndex: Int, stageId: String, accumulatorId: Long, name: String,
min: Long, median: Long, max: Long, total: Long) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min",
"median", "max", "total")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, stageId, accumulatorId.toString, name, min.toString, median.toString,
max.toString, total.toString)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(stageId), accumulatorId.toString,
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString,
total.toString)
}
}

case class ResourceProfileInfoCase(
val resourceProfileId: Int,
val executorResources: Map[String, ExecutorResourceRequest],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val systemProps = collect.getSystemProperties
val rapidsJar = collect.getRapidsJARInfo
val sqlMetrics = collect.getSQLPlanMetrics
val stageMetrics = collect.getStageLevelMetrics
val wholeStage = collect.getWholeStageCodeGenMapping
// for compare mode we just add in extra tables for matching across applications
// the rest of the tables simply list all applications specified
Expand Down Expand Up @@ -392,7 +393,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo,
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, analysis.jobAggs, analysis.stageAggs,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
Expand Down Expand Up @@ -471,6 +472,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
combineProps("rapids", appsSum).sortBy(_.key),
appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex),
appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.stageMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.jobAggMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.stageAggMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.sqlTaskAggMetrics).sortBy(_.appIndex),
Expand Down Expand Up @@ -513,6 +515,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some(ProfRapidsJarView.getDescription))
profileOutputWriter.write(ProfSQLPlanMetricsView.getLabel, app.sqlMetrics,
Some(ProfSQLPlanMetricsView.getDescription))
profileOutputWriter.write(ProfStageMetricView.getLabel, app.stageMetrics,
Some(ProfStageMetricView.getDescription))
profileOutputWriter.write(ProfSQLCodeGenView.getLabel, app.wholeStage,
Some(ProfSQLCodeGenView.getDescription))
comparedRes.foreach { compareSum =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,19 @@ class PluginTypeChecker(val platform: Platform = PlatformFactory.createInstance(
private def readOperatorsScore: Map[String, Double] = {
speedupFactorFile match {
case None =>
logInfo(s"Reading operators scores with platform: $platform")
logInfo(s"Trying to read operators scores with platform: $platform")
val file = platform.getOperatorScoreFile
val source = Source.fromResource(file)
readOperators(source, "score", true).map(x => (x._1, x._2.toDouble))
try {
val source = Source.fromResource(file)
readOperators(source, "score", true).map(x => (x._1, x._2.toDouble))
} catch {
case NonFatal(_) =>
val defaultFile = platform.getDefaultOperatorScoreFile
logWarning(s"Unable to read operator scores from file: $file. " +
s"Using default operator scores file: $defaultFile.")
val source = Source.fromResource(defaultFile)
readOperators(source, "score", true).map(x => (x._1, x._2.toDouble))
}
case Some(file) =>
logInfo(s"Reading operators scores from custom speedup factor file: $file")
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "Applications which a particular user has submitted." )
val htmlReport : ScallopOption[Boolean] =
toggle("html-report",
default = Some(true),
default = Some(false),
prefix = "no-",
descrYes = "Generates an HTML Report. Enabled by default.",
descrYes = "Generates an HTML Report. Disabled by default.",
descrNo = "Disables generating the HTML report.")
val perSql : ScallopOption[Boolean] =
opt[Boolean](required = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids.tool.qualification

import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.tuning.TunerContext

Expand Down Expand Up @@ -66,8 +68,8 @@ object QualificationMain extends Logging {
val platform = try {
PlatformFactory.createInstance(appArgs.platform())
} catch {
case ie: IllegalStateException =>
logError("Error creating the platform", ie)
case NonFatal(e) =>
logError("Error creating the platform", e)
return (1, Seq[QualificationSummaryInfo]())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.analysis.{AppSQLPlanAnalyzer, ProfAppIndexMapperTrait}
import com.nvidia.spark.rapids.tool.profiling.AccumProfileResults

import org.apache.spark.sql.rapids.tool.AppBase
import org.apache.spark.sql.rapids.tool.annotation.Since
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] {
override def getLabel: String = "Stage Level All Metrics"
override def getDescription: String = "Stage Level Metrics"

override def sortView(
rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = {
rows.sortBy(cols => (cols.stageId, cols.appIndex, cols.accumulatorId))
}
}

@Since("24.06.2")
object ProfStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait {

override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = {
app match {
case app: ApplicationInfo =>
app.planMetricProcessor.generateStageLevelAccums()
case _ => Seq.empty
}
}
}

object QualStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait {
// Keep for the following refactor to customize the view based on the app type (Qual/Prof)
override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = {
Seq.empty
}

def getRawViewFromSqlProcessor(
sqlAnalyzer: AppSQLPlanAnalyzer): Seq[AccumProfileResults] = {
sortView(sqlAnalyzer.generateStageLevelAccums())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ object QualRawReportGenerator {
pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app)))
pWriter.write(QualAppJobView.getLabel, QualAppJobView.getRawView(Seq(app)))
generateSQLProcessingView(pWriter, sqlPlanAnalyzer)
pWriter.write(QualStageMetricView.getLabel,
QualStageMetricView.getRawViewFromSqlProcessor(sqlPlanAnalyzer),
Some(QualStageMetricView.getDescription))
pWriter.write(RapidsQualPropertiesView.getLabel,
RapidsQualPropertiesView.getRawView(Seq(app)),
Some(RapidsQualPropertiesView.getDescription))
Expand Down
Loading

0 comments on commit 3a8d4c5

Please sign in to comment.