Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add penalty for row conversions #471

Merged
merged 25 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5008ad8
Qualification tool: Add penalty for row conversions
nartal1 Aug 1, 2023
a32d81c
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into apple…
nartal1 Aug 1, 2023
af962b8
optimize code
nartal1 Aug 2, 2023
1ae0eaa
addressed review comments
nartal1 Aug 8, 2023
7da1acf
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Aug 14, 2023
e2dbb12
addressed review comments and added test
nartal1 Aug 18, 2023
29f0e8c
fix unit test
nartal1 Aug 22, 2023
a52803a
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Aug 22, 2023
7b6803f
addressed review comments
nartal1 Aug 23, 2023
2607489
addressed review comments and updated test results
nartal1 Aug 28, 2023
f8a867f
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Sep 19, 2023
0c5b342
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Sep 22, 2023
89fccf4
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 8, 2023
2ba211a
Address review comments
nartal1 Oct 9, 2023
5e06370
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 9, 2023
8dfa245
address review comments
nartal1 Oct 10, 2023
7e1ebe0
update tests
nartal1 Oct 11, 2023
132605f
Revert "update tests"
nartal1 Oct 11, 2023
f608365
add penalty to durations
nartal1 Oct 12, 2023
f8c8d40
change transitiontime calculation
nartal1 Oct 13, 2023
db6eaf0
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 13, 2023
36f8d56
update variable name
nartal1 Oct 13, 2023
4a8ab84
addressed review comments
nartal1 Oct 13, 2023
b17df32
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 14, 2023
052b22f
change penaly percentage
nartal1 Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ object QualOutputWriter {
val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup"
val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved"
val STAGE_ESTIMATED_STR = "Stage Estimated"
val NUM_TRANSITIONS = "Number of CPU-GPU Transitions"
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
val UNSUPPORTED_EXECS = "Unsupported Execs"
val UNSUPPORTED_EXPRS = "Unsupported Expressions"
val CLUSTER_TAGS = "Cluster Tags"
Expand Down Expand Up @@ -856,7 +857,8 @@ object QualOutputWriter {
AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size,
STAGE_DUR_STR -> STAGE_DUR_STR.size,
UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size,
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size,
NUM_TRANSITIONS -> NUM_TRANSITIONS.size
)
detailedHeadersAndFields
}
Expand All @@ -878,7 +880,8 @@ object QualOutputWriter {
headersAndSizes(AVERAGE_SPEEDUP_STR),
info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR),
info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR),
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR))
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR),
info.numTransitions.toString -> headersAndSizes(NUM_TRANSITIONS))
constructOutputRow(data, delimiter, prettyPrint)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
timeout: Option[Long], nThreads: Int, order: String,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean) extends Logging {
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
ignoreTransitions: Boolean) extends Logging {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand Down Expand Up @@ -166,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
try {
val startTime = System.currentTimeMillis()
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
reportSqlLevel, mlOpsEnabled)
reportSqlLevel, mlOpsEnabled, ignoreTransitions)
val qualAppResult = appResult match {
case Left(errorMessage: String) =>
// Case when an error occurred during QualificationAppInfo creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
opt[Boolean](required = false,
descr = "Whether to parse ML functions in the eventlogs. Default is false.",
default = Some(false))
val ignoreTransitions: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Whether to ignore durations for ColumnarToRow and RowToColumnar transitions " +
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
"in the eventlogs while calculating the speedup. Default is false.",
default = Some(false))
val sparkProperty: ScallopOption[List[String]] =
opt[List[String]](required = false,
descr = "Filter applications based on certain Spark properties that were set during " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object QualificationMain extends Logging {
val reportSqlLevel = appArgs.perSql.getOrElse(false)
val platform = appArgs.platform.getOrElse("onprem")
val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false)
val ignoreTransitions = appArgs.ignoreTransitions.getOrElse(false)

val hadoopConf = RapidsToolsConfUtil.newHadoopConf

Expand Down Expand Up @@ -93,7 +94,7 @@ object QualificationMain extends Logging {

val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled,
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled)
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, ignoreTransitions)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.rapids.tool.qualification

import java.util.concurrent.TimeUnit

import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.tool.EventLogInfo
Expand All @@ -37,7 +39,8 @@ class QualificationAppInfo(
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
perSqlOnly: Boolean = false,
mlOpsEnabled: Boolean = false)
mlOpsEnabled: Boolean = false,
ignoreTransitions: Boolean = false)
extends AppBase(eventLogInfo, hadoopConf) with Logging {

var appId: String = ""
Expand All @@ -51,6 +54,7 @@ class QualificationAppInfo(
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] =
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int]

val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long]
val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]]
Expand Down Expand Up @@ -147,8 +151,10 @@ class QualificationAppInfo(

// Look at the total task times for all jobs/stages that aren't SQL or
// SQL but dataset or rdd
private def calculateNonSQLTaskDataframeDuration(taskDFDuration: Long): Long = {
val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum
private def calculateNonSQLTaskDataframeDuration(
taskDFDuration: Long,
totalTransitionTime: Long): Long = {
val allTaskTime = stageIdToTaskEndSum.values.map(_.totalTaskDuration).sum + totalTransitionTime
val res = allTaskTime - taskDFDuration
assert(res >= 0)
res
Expand Down Expand Up @@ -241,13 +247,78 @@ class QualificationAppInfo(
stages.map { stageId =>
val stageTaskTime = stageIdToTaskEndSum.get(stageId)
.map(_.totalTaskDuration).getOrElse(0L)
val numTransitions = ignoreTransitions match {
case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
case true => 0
}
val transitionsTime = numTransitions match {
case 0 => 0L // no transitions
case gpuCpuTransitions if gpuCpuTransitions > 0 =>
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// Duration to transfer data from GPU to CPU and vice versa.
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
// Duration in Spark metrics is in milliseconds and CPU-GPU transfer rate is in bytes/sec.
// So we need to convert the transitions time to milliseconds.
val totalBytesRead = {
stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L)
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
}
if (totalBytesRead > 0) {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
(TimeUnit.SECONDS.toMillis(totalBytesRead /
QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * gpuCpuTransitions)
} else {
0L
}

case _ => 0L
}
val finalEachStageUnsupported = if (transitionsTime != 0) {
(allStageTaskTime * numUnsupported.size / allFlattenedExecs.size.toDouble).toLong
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
} else {
eachStageUnsupported
}

StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime,
eachStageUnsupported, estimated)
finalEachStageUnsupported, numTransitions, transitionsTime, estimated)
}.toSet
}

def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = {
val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos)

// Get the total number of transitions between CPU and GPU for each stage and
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// store it in a Map.
allStagesToExecs.foreach { case (stageId, execs) =>
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// Flatten all the Execs within a stage.
// Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange
// will be flattened to Exchange;Sort;Exchange;Sort;SortMergeJoin;SortMergeJoin;Exchange;
val allExecs = execs.map(x => if (x.exec.startsWith("WholeStage")) {
x.children.getOrElse(Seq.empty)
} else {
Seq(x)
}).flatten.reverse

// If it's a shuffle stage, then we need to keep the first and last Exchange and remove
// all the intermediate Exchanges as input size is captured in Exchange node.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val dedupedExecs = if (allExecs.size > 2) {
allExecs.head +:
allExecs.tail.init.filter(x => x.exec != "Exchange") :+ allExecs.last
} else {
allExecs
}
// Create a list of transitions by zipping allExecs with itself but with the first element
// This will create a list of adjacent pairs.
// Example: If allExecs = (ScanExec, FilterExec, SortExec, ProjectExec), then it will create
// a list of tuples as follows:
// (ScanExec, FilterExec), (FilterExec, SortExec), (SortExec, ProjectExec)
val transitions = dedupedExecs.zip(dedupedExecs.drop(1)).count {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// If the current execution (currExec) is supported, and the next execution (nextExec)
// is not supported, or if the current execution is not supported and the next execution
// is supported, then we consider this as a transition.
case (currExec, nextExec) => (currExec.isSupported && !nextExec.isSupported) ||
(!currExec.isSupported && nextExec.isSupported)
}
stageIdToGpuCpuTransitions(stageId) = transitions
}
if (allStagesToExecs.isEmpty) {
// use job level
// also get the job ids associated with the SQLId
Expand Down Expand Up @@ -302,7 +373,7 @@ class QualificationAppInfo(
val numUnsupportedExecs = execInfos.filterNot(_.isSupported).size
// This is a guestimate at how much wall clock was supported
val numExecs = execInfos.size.toDouble
val numSupportedExecs = (numExecs - numUnsupportedExecs).toDouble
val numSupportedExecs = (numExecs - numUnsupportedExecs)
val ratio = numSupportedExecs / numExecs
val estimateWallclockSupported = (sqlWallClockDuration * ratio).toInt
// don't worry about supported execs for these are these are mostly indicator of I/O
Expand Down Expand Up @@ -428,11 +499,12 @@ class QualificationAppInfo(
val allStagesSummary = perSqlStageSummary.flatMap(_.stageSum)
.map(sum => sum.stageId -> sum).toMap.values.toSeq
val sqlDataframeTaskDuration = allStagesSummary.map(s => s.stageTaskTime).sum
val totalTransitionsTime = allStagesSummary.map(s=> s.transitionTime).sum
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val unsupportedSQLTaskDuration = calculateSQLUnsupportedTaskDuration(allStagesSummary)
val endDurationEstimated = this.appEndTime.isEmpty && appDuration > 0
val jobOverheadTime = calculateJobOverHeadTime(info.startTime)
val nonSQLDataframeTaskDuration =
calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration)
calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration, totalTransitionsTime)
val nonSQLTaskDuration = nonSQLDataframeTaskDuration + jobOverheadTime
// note that these ratios are based off the stage times which may be missing some stage
// overhead or execs that didn't have associated stages
Expand Down Expand Up @@ -489,7 +561,8 @@ class QualificationAppInfo(

// get the ratio based on the Task durations that we will use for wall clock durations
val estimatedGPURatio = if (sqlDataframeTaskDuration > 0) {
supportedSQLTaskDuration.toDouble / sqlDataframeTaskDuration.toDouble
supportedSQLTaskDuration.toDouble / (
sqlDataframeTaskDuration.toDouble + totalTransitionsTime.toDouble)
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
} else {
1
}
Expand Down Expand Up @@ -670,7 +743,8 @@ class StageTaskQualificationSummary(
val stageAttemptId: Int,
var executorRunTime: Long,
var executorCPUTime: Long,
var totalTaskDuration: Long)
var totalTaskDuration: Long,
var totalbytesRead: Long)

case class QualApplicationInfo(
appName: String,
Expand Down Expand Up @@ -736,6 +810,8 @@ case class StageQualSummaryInfo(
averageSpeedup: Double,
stageTaskTime: Long,
unsupportedTaskDur: Long,
numTransitions: Int,
transitionTime: Long,
estimated: Boolean = false)

object QualificationAppInfo extends Logging {
Expand All @@ -746,6 +822,13 @@ object QualificationAppInfo extends Logging {
val NOT_APPLICABLE = "Not Applicable"
val LOWER_BOUND_RECOMMENDED = 1.3
val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5
// Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions
// This includes the time taken to convert the data from one format to another and the time taken
// to transfer the data from CPU to GPU and vice versa. Current transfer rate is 1GB/s and is
// based on the testing on few candidate eventlogs.
// TODO: Need to test this on more eventlogs including NDS queries
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// and come up with a better transfer rate.
val CPU_GPU_TRANSFER_RATE = 1000000000L

private def handleException(e: Exception, path: EventLogInfo): String = {
val message: String = e match {
Expand Down Expand Up @@ -838,10 +921,11 @@ object QualificationAppInfo extends Logging {
hadoopConf: Configuration,
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
mlOpsEnabled: Boolean): Either[String, QualificationAppInfo] = {
mlOpsEnabled: Boolean,
ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = {
try {
val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker,
reportSqlLevel, false, mlOpsEnabled)
reportSqlLevel, false, mlOpsEnabled, ignoreTransitions)
logInfo(s"${path.eventLog.toString} has App: ${app.appId}")
Right(app)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
super.doSparkListenerTaskEnd(app, event)
// keep all stage task times to see for nonsql duration
val taskSum = app.stageIdToTaskEndSum.getOrElseUpdate(event.stageId, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
taskSum.totalTaskDuration += event.taskInfo.duration
// Add the total bytes read from the task if it's available. This is from inputMetrics if
// it is reading from datasource, or shuffleReadMetrics if it is reading from shuffle.
val inputMetrics = event.taskMetrics.inputMetrics
if (inputMetrics != null) {
taskSum.totalbytesRead += inputMetrics.bytesRead
}
val shuffleReadMetrics = event.taskMetrics.shuffleReadMetrics
if (shuffleReadMetrics != null) {
taskSum.totalbytesRead += shuffleReadMetrics.totalBytesRead
}

// Adds in everything (including failures)
app.stageIdToSqlID.get(event.stageId).foreach { sqlID =>
val taskSum = app.sqlIDToTaskEndSum.getOrElseUpdate(sqlID, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SQLPlanParserSuite extends BaseTestSuite {
val pluginTypeChecker = new PluginTypeChecker()
assert(allEventLogs.size == 1)
val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false)
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, ignoreTransitions = false)
appResult match {
case Right(app) => app
case Left(_) => throw new AssertionError("Cannot create application")
Expand Down Expand Up @@ -217,6 +217,51 @@ class SQLPlanParserSuite extends BaseTestSuite {
}
}

test("Parse Execs within WholeStageCodeGen in Order") {
amahussein marked this conversation as resolved.
Show resolved Hide resolved
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
"Execs within WSCG ") { spark =>
import spark.implicits._
val df = Seq(("foo", 1L, 1.2), ("foo", 2L, 2.2), ("bar", 2L, 3.2),
("bar", 2L, 4.2)).toDF("x", "y", "z")
df.cube($"x", ceil($"y")).count
}
val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
assert(app.sqlPlans.size == 1)
app.sqlPlans.foreach { case (sqlID, plan) =>
val planInfo = SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "",
pluginTypeChecker, app)
val allExecInfo = planInfo.execInfo
val expectedAllExecInfoSize = if (ToolUtils.isSpark320OrLater()) {
// AdaptiveSparkPlan, WholeStageCodegen, AQEShuffleRead, Exchange, WholeStageCodegen
5
} else {
// WholeStageCodegen, Exchange, WholeStageCodegen
3
}
val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen"))
assert(wholeStages.size == 2)
// Expanding the children of WholeStageCodegen
val allExecs = allExecInfo.map(x => if (x.exec.startsWith("WholeStage")) {
x.children.getOrElse(Seq.empty)
} else {
Seq(x)
}).flatten.reverse
val expectedOrder = if (ToolUtils.isSpark320OrLater()) {
// Order should be: LocalTableScan, Expand, HashAggregate, Exchange,
// AQEShuffleRead, HashAggregate, AdaptiveSparkPlan
Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "AQEShuffleRead",
"HashAggregate", "AdaptiveSparkPlan")
} else {
// Order should be: LocalTableScan, Expand, HashAggregate, Exchange, HashAggregate
Seq("LocalTableScan", "Expand", "HashAggregate", "Exchange", "HashAggregate")
}
assert(allExecs.map(_.exec) == expectedOrder)
}
}
}

test("HashAggregate") {
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
Expand Down