-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Qualification tool: Add penalty for row conversions #471
Conversation
Signed-off-by: Niranjan Artal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any other information in the execs like number of rows in the examples we are trying to hit?
I would definitely like to see what output here is on various other workloads. I guess as long as we are more conservative its ok. I'm worried about the cases we read a ton of data but then filter it smaller very quickly, this may be very off.
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L).toDouble | ||
if (totalBytesRead > 0) { | ||
val fallback_duration = (totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * | ||
QualificationAppInfo.SECONDS_TO_MILLISECONDS * gpuCpuTransitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a toMillis and others in java.util.concurrent.TimeUnit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be | ||
// spilled to disk. | ||
// Duration in Spark metrics is in millisecond, so multiply this by 1000 to make | ||
// it consistent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expand comment as its kind of left hanging, make it consistent between what 2 things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment.
val topLevelExecs = execs.filterNot(x => x.exec.startsWith("WholeStage")) | ||
val childrenExecs = execs.flatMap(_.children).flatten | ||
val allExecs = topLevelExecs ++ childrenExecs | ||
val transitions = allExecs.zip(allExecs.drop(1)).count { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment about what this is doing, this is definitely relying on the order and I'm not sure we are great about making sure that is guaranteed. If its required we need to make sure its documented that it has to be in the right order. It would also be nice to have a test to make sure its in the right order when it gets here to make sure someone doesn't break it. I was originally thinking this would be in plan parser but if this is easier I'm ok with it as long as its not brittle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment. You are right that ordered needs to be preserved. Will add a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order is not preserved in all the cases. Need to fix this.
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
val childrenExecs = execs.flatMap(_.children).flatten | ||
val allExecs = topLevelExecs ++ childrenExecs | ||
val transitions = allExecs.zip(allExecs.drop(1)).count { | ||
case (exec1, exec2) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to be like currExec and nextExec, might help readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, | ||
eachStageUnsupported, estimated) | ||
eachStageUnsupported + transitionsTime, estimated) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be nice to report the number of transition we expect in the qual tool stage output, the idea behind that output was to be able to debug or figure out why we came out with a certain recommendation number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Added a column for number of transitions in qual tool stage output .
…pu_transition_time
Signed-off-by: Niranjan Artal <[email protected]>
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
@@ -241,13 +262,74 @@ class QualificationAppInfo( | |||
stages.map { stageId => | |||
val stageTaskTime = stageIdToTaskEndSum.get(stageId) | |||
.map(_.totalTaskDuration).getOrElse(0L) | |||
val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a flag that forces numTransitions
to be 0, then theoretically we can disable the fall-back penalty, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR which adds a config for transitions. Default is true, we can disable by setting the config. Please let me know if it's fine.
core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala
Show resolved
Hide resolved
Needs an up-merge to get rid of the python failures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @nartal1 !
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
@@ -174,6 +179,23 @@ class QualificationAppInfo( | |||
res | |||
} | |||
|
|||
private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit rename to calculateExecsNoStageDurations or actually this is durations due to transitions? then it should have something liek that in the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this method. Filed a follow on issue to add penalties for execs not associated with any stages - #514
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) | ||
case true => 0 | ||
} | ||
// val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented out line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
// Update totaltaskduration of stageIdToTaskEndSum to include transitions time | ||
val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None) | ||
if (stageIdToTasksMetrics.isDefined) { | ||
stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this, why are we changing the task durations here? this has traditionally been the real task durations then we add/remove things later. Is this because supported + unsupported is now longer due to the transition Time? it seems odd to change it in this datastructure that is the real values from file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done as we are adding transitionTime to unsupportedTaskDuration i.e unsupportedTaskDuration=eachStageUnsupported + transitionsTime
. So the totalTaskDuration should also incude transitionTime else we will end up in a case where unsupportedTaskDuration > totalTaskDuration
(which would be incorrect)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this code. Now we are considering transitionTime in unsupportedDurations only. stageTaskTime is the totalTaskDuration from the eventlog. Returning transitionTime
from StageQualSummaryInfo
so that it could be used in calculation of calculateNonSQLTaskDataframeDuration
@@ -161,11 +166,11 @@ class QualificationAppInfo( | |||
} | |||
|
|||
private def calculateSQLSupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { | |||
all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum | |||
all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum - calculateNoExecsStageDurations(all) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I'm a bit unclear how this work with the job overhead we add later and/or the mapping we try to do with execs without stages already.
is this counting it twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this.
private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = { | ||
// If there are Execs not associated with any stage, then some of the Execs may not be | ||
// supported on GPU. We need to estimate the duration of these Execs and add it to the | ||
// unsupportedTaskDur. We estimate the duration by taking the average of the unsupportedTaskDur |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow this estimation. We are trying to give some penalty for execs that have transitions but don't map to a stage (ie we don't have a duration), correct?
I'm wondering if we are already calculating this with like either the stages with no execs or job overhead time.
Signed-off-by: Niranjan Artal <[email protected]>
Marking this as draft for doing more tests to get better a bandwidth number. |
…pu_transition_time
…pu_transition_time
…pu_transition_time
Signed-off-by: Niranjan Artal <[email protected]>
…pu_transition_time
core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
Outdated
Show resolved
Hide resolved
Thanks for the review @tgravescs and @mattahrens! Merging this. |
This fixes #385
Until now, the qualification tool didn't consider the time taken if there are CPU fallbacks(ColumnarToRow conversions) due to Execs not supported on GPU and RowToColumnar conversions. This PR attempts to add these to total durations so that the qualification tool speedup estimation can be closer to the actual speedup for Spark jobs.
There could be multiple transitions within a stage. i.e there could be few Execs supported on GPU and other which are not supported. But we cannot get the durations per Exec from the Spark metrics i.e there is no mapping between tasks and Execs. We know the input size of each stage whether is read from external source or when there is shuffle. So current implementation takes total input size for each stage and total number of transitions to estimate the total time taken by the transitions.
Have to run benchmarks to get the accurate transfer speed between CPU and GPU and vice versa. Have added the transfer rate based on the speedups obeserved on couple of eventlogs.