-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix inconsistent shuffle write time sum results in Profiler output #1450
base: dev
Are you sure you want to change the base?
Conversation
… ms only in output Signed-off-by: cindyyuanjiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @cindyyuanjiang for the fix!
Nit: It would be nice to include the before and after values in the description. I understand that we can confirm the fix from the expected_files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang for this change.
@amahussein Unrelated but should we have similar approach for executorCpuTime
and executorDeserializeCpuTime
?
Thanks @nartal1! Updated the before/after values in PR description. |
Thanks @parthosa! Agree we should discuss the requirements for |
Thanks @parthosa. Yes, it would have been better to fix the inconsistency for other metrics within the is very PR since the change is not big compared to the overhead we would have to go through filing another bug then dealing with the a new PR. |
@cindyyuanjiang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation is still not accurate. because we need to convert the units after all the tasks are aggregated on each level.
@@ -438,7 +438,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory) | |||
val shuffleWriteTime = tasksInStage.map(_.sw_writeTime) | |||
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), | |||
shuffleWriteTime.sum) | |||
TimeUnit.NANOSECONDS.toMillis(shuffleWriteTime.sum)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still does not fix the problem because the conversion is done on the stage-level.
The correct way, is to convert after the metrics are aggregated on each level.
For example, perStage/perSql/perJob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The per SQL and per job results are computed based on cached per stage results. Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct!
But when we are aggregating perSql, this PR is actually aggregating the stages per SQL after the time is converted to milliseconds.
If we want to be more accurate, then the cached-per-stage-results should still be in nano-seconds; then per-sql value is the sum in nano-seconds; and finally it gets converted to milliseonds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood, thanks @amahussein! I will address this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. We will keep the current implementation to avoid potential overflow if we aggregate at SQL/job level.
Signed-off-by: cindyyuanjiang <[email protected]>
8e03d94
Applied same approach for |
Signed-off-by: cindyyuanjiang <[email protected]>
} | ||
|
||
override def convertToCSVSeq: Seq[String] = { | ||
Seq(appIndex.toString, StringUtils.reformatCSVString(appId), rootsqlID.getOrElse("").toString, | ||
sqlID.toString, durStr, containsDataset.toString, appDurStr, | ||
StringUtils.reformatCSVString(potentialStr), execCpuTimePercent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
@@ -950,14 +992,27 @@ case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String, | |||
} | |||
|
|||
override def convertToSeq: Seq[String] = { | |||
Seq(appIndex.toString, rootsqlID.getOrElse("").toString, appId, sqlID.toString, durStr, | |||
containsDataset.toString, appDurStr, potentialStr, execCpuTimePercent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
"resultSerializationTime_sum", "resultSize_max", "sr_fetchWaitTime_sum", | ||
"sr_localBlocksFetched_sum", "sr_localBytesRead_sum", "sr_remoteBlocksFetched_sum", | ||
"sr_remoteBytesRead_sum", "sr_remoteBytesReadToDisk_sum", "sr_totalBytesRead_sum", | ||
"sw_bytesWritten_sum", "sw_recordsWritten_sum", "sw_writeTime_sum") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
@@ -924,12 +951,27 @@ case class IOAnalysisProfileResult( | |||
} | |||
} | |||
|
|||
case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String, | |||
rootsqlID: Option[Long], sqlID: Long, duration: Option[Long], containsDataset: Boolean, | |||
appDuration: Option[Long], potentialProbs: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
executorCpuRatio: Double) extends ProfileResult { | ||
override val outputHeaders = Seq("appIndex", "App ID", "RootSqlID", "sqlID", "SQL Duration", | ||
"Contains Dataset or RDD Op", "App Duration", "Potential Problems", "Executor CPU Time Percent") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
@amahussein @parthosa @nartal1 |
It seems more like a bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang
minor styling issue.
@@ -473,7 +471,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
tasksInStage.map(_.sr_totalBytesRead).sum, | |||
tasksInStage.map(_.sw_bytesWritten).sum, | |||
tasksInStage.map(_.sw_recordsWritten).sum, | |||
shuffleWriteTimeSum | |||
TimeUnit.NANOSECONDS.toMillis(shuffleWriteTimeSum) // nanoseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// nanoseconds
-> It took me a minute until I realized that this is a comment and not a division :)
Can we move that to be in a separate line above the conversion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, removed this.
executorCPUTime: Long, | ||
executorDeserializeCPUTime: Long, // nanoseconds | ||
executorRunTime: Long, // milliseconds | ||
executorCPUTime: Long, // nanoseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
Thanks for doing that
@@ -443,20 +443,44 @@ trait BaseJobStageAggTaskMetricsProfileResult extends ProfileResult { | |||
def srTotalBytesReadSum: Long | |||
def swBytesWrittenSum: Long | |||
def swRecordsWrittenSum: Long | |||
def swWriteTimeSum: Long | |||
def swWriteTimeSum: Long // milliseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
Signed-off-by: cindyyuanjiang <[email protected]>
thanks @amahussein! Filed issue: #1469 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang for this change. The discussion above about overflow concerns makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amahussein @parthosa @nartal1 Question: After we make the changes, I see
Executor CPU Time Percent
of 103.45 > 100 incore/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv,
do we want to limit/upper-bound this ratio to100.0
, or it is okay to have >100 percentages?It seems more like a bug.
thanks @amahussein! Filed issue: #1469
I am not sure we should fix the percentage in a followup issue. This means that we are fixing inconsistent view across 2 files and we introduce another bug.
Investigated into this. It looks more like a rounding issue than a bug to me:
WDYT? @amahussein @nartal1 @parthosa |
Thanks @cindyyuanjiang for looking into this. I think this was always a bug but we are now able to catch it due to the changes in this PR. If in the raw form they are measured in different units, I do not think we can fix this problem. I could not find a reason why spark reports runtime in ms and cpu time in ns. |
Fixes #1408
Changes
TaskModel
class, keep using nanoseconds for shuffle write timeThis improves shuffle write time metrics output to avoid potential precision lost from converting nanoseconds to milliseconds and then taking the sum of the converted numbers. It also separates
TaskModel
and output reporting so that we know all metrics are in their original values before output generation.Testing
Before/After Values (shuffle write time sum)
core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_jobmetricsaggmulti_expectation.csv
944 --> 1001
849 --> 901
core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_sqlmetricsaggmulti_expectation.csv
944 --> 1001
849 --> 901
core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagemetricsaggmulti_expectation.csv
397 --> 400
505 --> 508
42 --> 93
373 --> 376
473 --> 475
3 --> 50