Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistent shuffle write time sum results in Profiler output #1450

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from

Conversation

cindyyuanjiang
Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang commented Dec 5, 2024

Fixes #1408

Changes

  • In TaskModel class, keep using nanoseconds for shuffle write time
  • Convert these into milliseconds when generating output

This improves shuffle write time metrics output to avoid potential precision lost from converting nanoseconds to milliseconds and then taking the sum of the converted numbers. It also separates TaskModel and output reporting so that we know all metrics are in their original values before output generation.

Testing

  • Existing unit tests
  • Manually confirm the shuffle write time value is consistent in all places in Profiler output

Before/After Values (shuffle write time sum)

core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_jobmetricsaggmulti_expectation.csv
944 --> 1001
849 --> 901

core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_sqlmetricsaggmulti_expectation.csv
944 --> 1001
849 --> 901

core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagemetricsaggmulti_expectation.csv
397 --> 400
505 --> 508
42 --> 93
373 --> 376
473 --> 475
3 --> 50

@cindyyuanjiang cindyyuanjiang self-assigned this Dec 5, 2024
@cindyyuanjiang cindyyuanjiang added bug Something isn't working core_tools Scope the core module (scala) labels Dec 5, 2024
nartal1
nartal1 previously approved these changes Dec 9, 2024
Copy link
Collaborator

@nartal1 nartal1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @cindyyuanjiang for the fix!
Nit: It would be nice to include the before and after values in the description. I understand that we can confirm the fix from the expected_files.

parthosa

This comment was marked as duplicate.

parthosa
parthosa previously approved these changes Dec 9, 2024
Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang for this change.

@amahussein Unrelated but should we have similar approach for executorCpuTime and executorDeserializeCpuTime?

@cindyyuanjiang
Copy link
Collaborator Author

Thanks @nartal1! Updated the before/after values in PR description.

@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Dec 9, 2024

Thanks @parthosa! Agree we should discuss the requirements for executorCpuTime and executorDeserializeCpuTime.

@amahussein
Copy link
Collaborator

Thanks @cindyyuanjiang for this change.

@amahussein Unrelated but should we have similar approach for executorCpuTime and executorDeserializeCpuTime?

Thanks @parthosa. Yes, it would have been better to fix the inconsistency for other metrics within the is very PR since the change is not big compared to the overhead we would have to go through filing another bug then dealing with the a new PR.

@amahussein
Copy link
Collaborator

@cindyyuanjiang
Is this ready to merge? Or there is something you are going to address?

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is still not accurate. because we need to convert the units after all the tasks are aggregated on each level.

@@ -438,7 +438,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory)
val shuffleWriteTime = tasksInStage.map(_.sw_writeTime)
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
shuffleWriteTime.sum)
TimeUnit.NANOSECONDS.toMillis(shuffleWriteTime.sum))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still does not fix the problem because the conversion is done on the stage-level.
The correct way, is to convert after the metrics are aggregated on each level.
For example, perStage/perSql/perJob.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The per SQL and per job results are computed based on cached per stage results. Please correct me if I am wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct!
But when we are aggregating perSql, this PR is actually aggregating the stages per SQL after the time is converted to milliseconds.
If we want to be more accurate, then the cached-per-stage-results should still be in nano-seconds; then per-sql value is the sum in nano-seconds; and finally it gets converted to milliseonds.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood, thanks @amahussein! I will address this now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. We will keep the current implementation to avoid potential overflow if we aggregate at SQL/job level.

@cindyyuanjiang cindyyuanjiang dismissed stale reviews from parthosa and nartal1 via 8e03d94 December 17, 2024 02:33
@cindyyuanjiang
Copy link
Collaborator Author

Applied same approach for executorCpuTime and executorDeserializeCpuTime.

Signed-off-by: cindyyuanjiang <[email protected]>
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(appId), rootsqlID.getOrElse("").toString,
sqlID.toString, durStr, containsDataset.toString, appDurStr,
StringUtils.reformatCSVString(potentialStr), execCpuTimePercent)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

@@ -950,14 +992,27 @@ case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String,
}

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, rootsqlID.getOrElse("").toString, appId, sqlID.toString, durStr,
containsDataset.toString, appDurStr, potentialStr, execCpuTimePercent)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

"resultSerializationTime_sum", "resultSize_max", "sr_fetchWaitTime_sum",
"sr_localBlocksFetched_sum", "sr_localBytesRead_sum", "sr_remoteBlocksFetched_sum",
"sr_remoteBytesRead_sum", "sr_remoteBytesReadToDisk_sum", "sr_totalBytesRead_sum",
"sw_bytesWritten_sum", "sw_recordsWritten_sum", "sw_writeTime_sum")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

@@ -924,12 +951,27 @@ case class IOAnalysisProfileResult(
}
}

case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String,
rootsqlID: Option[Long], sqlID: Long, duration: Option[Long], containsDataset: Boolean,
appDuration: Option[Long], potentialProbs: String,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

executorCpuRatio: Double) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "App ID", "RootSqlID", "sqlID", "SQL Duration",
"Contains Dataset or RDD Op", "App Duration", "Potential Problems", "Executor CPU Time Percent")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Dec 17, 2024

@amahussein @parthosa @nartal1
Question: After we make the changes, I see Executor CPU Time Percent of 103.45 > 100 in core/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv, do we want to limit/upper-bound this ratio to 100.0, or it is okay to have >100 percentages?

@amahussein
Copy link
Collaborator

@amahussein @parthosa @nartal1 Question: After we make the changes, I see Executor CPU Time Percent of 103.45 > 100 in core/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv, do we want to limit/upper-bound this ratio to 100.0, or it is okay to have >100 percentages?

It seems more like a bug.

amahussein
amahussein previously approved these changes Dec 17, 2024
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang
minor styling issue.

@@ -473,7 +471,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
tasksInStage.map(_.sr_totalBytesRead).sum,
tasksInStage.map(_.sw_bytesWritten).sum,
tasksInStage.map(_.sw_recordsWritten).sum,
shuffleWriteTimeSum
TimeUnit.NANOSECONDS.toMillis(shuffleWriteTimeSum) // nanoseconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
// nanoseconds -> It took me a minute until I realized that this is a comment and not a division :)
Can we move that to be in a separate line above the conversion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, removed this.

executorCPUTime: Long,
executorDeserializeCPUTime: Long, // nanoseconds
executorRunTime: Long, // milliseconds
executorCPUTime: Long, // nanoseconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯
Thanks for doing that

@@ -443,20 +443,44 @@ trait BaseJobStageAggTaskMetricsProfileResult extends ProfileResult {
def srTotalBytesReadSum: Long
def swBytesWrittenSum: Long
def swRecordsWrittenSum: Long
def swWriteTimeSum: Long
def swWriteTimeSum: Long // milliseconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Signed-off-by: cindyyuanjiang <[email protected]>
@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Dec 17, 2024

@amahussein @parthosa @nartal1 Question: After we make the changes, I see Executor CPU Time Percent of 103.45 > 100 in core/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv, do we want to limit/upper-bound this ratio to 100.0, or it is okay to have >100 percentages?

It seems more like a bug.

thanks @amahussein! Filed issue: #1469

Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang for this change. The discussion above about overflow concerns makes sense.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein @parthosa @nartal1 Question: After we make the changes, I see Executor CPU Time Percent of 103.45 > 100 in core/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv, do we want to limit/upper-bound this ratio to 100.0, or it is okay to have >100 percentages?

It seems more like a bug.

thanks @amahussein! Filed issue: #1469

I am not sure we should fix the percentage in a followup issue. This means that we are fixing inconsistent view across 2 files and we introduce another bug.

@cindyyuanjiang
Copy link
Collaborator Author

@amahussein @parthosa @nartal1 Question: After we make the changes, I see Executor CPU Time Percent of 103.45 > 100 in core/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv, do we want to limit/upper-bound this ratio to 100.0, or it is okay to have >100 percentages?

It seems more like a bug.

thanks @amahussein! Filed issue: #1469

I am not sure we should fix the percentage in a followup issue. This means that we are fixing inconsistent view across 2 files and we introduce another bug.

Investigated into this. It looks more like a rounding issue than a bug to me:

  1. executorRunTime is in milliseconds in its raw form while executorCpuTime is in nanoseconds, therefore executorRunTime could have lost precision before we take the sum over all tasks.
  2. The runtime is very low where execCPURatio = 103.45, execCpuTime = 30ms and execRunTime = 29ms.

WDYT? @amahussein @nartal1 @parthosa

@parthosa
Copy link
Collaborator

parthosa commented Dec 20, 2024

@amahussein @parthosa @nartal1 Question: After we make the changes, I see Executor CPU Time Percent of 103.45 > 100 in core/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv, do we want to limit/upper-bound this ratio to 100.0, or it is okay to have >100 percentages?

It seems more like a bug.

thanks @amahussein! Filed issue: #1469

I am not sure we should fix the percentage in a followup issue. This means that we are fixing inconsistent view across 2 files and we introduce another bug.

Investigated into this. It looks more like a rounding issue than a bug to me:

  1. executorRunTime is in milliseconds in its raw form while executorCpuTime is in nanoseconds, therefore executorRunTime could have lost precision before we take the sum over all tasks.
  2. The runtime is very low where execCPURatio = 103.45, execCpuTime = 30ms and execRunTime = 29ms.

WDYT? @amahussein @nartal1 @parthosa

Thanks @cindyyuanjiang for looking into this. I think this was always a bug but we are now able to catch it due to the changes in this PR. If in the raw form they are measured in different units, I do not think we can fix this problem.

I could not find a reason why spark reports runtime in ms and cpu time in ns.

Ref:
https://github.com/apache/spark/blob/a2e3188b4997001f4dbc1eb364d61ca55d438208/core/src/main/scala/org/apache/spark/executor/Executor.scala#L715-L720

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Profiler output shows inconsistent shuffleWriteTime results
4 participants