-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize implementation of getAggregateRawMetrics in core-tools #1468
Conversation
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]> Contributes to NVIDIA#1461 This commit improves the implementation of aggregation accross raw metrics by replacing the builtin scala collections with accumulators.
@@ -517,12 +485,4 @@ object AppSparkMetricsAnalyzer { | |||
arr.max | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because it is not used anymore
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow) | ||
} | ||
} | ||
} | ||
|
||
|
||
object AppSparkMetricsAnalyzer { | ||
def getDurations(tcs: Iterable[TaskModel]): (Long, Long, Long, Double) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because it is not used anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein for this design refactor. Minor comments.
@@ -182,66 +176,55 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
if (app.sqlIdToStages.contains(sqlId)) { | |||
val stagesInSQL = app.sqlIdToStages(sqlId) | |||
// TODO: Should we only consider successful tasks? | |||
val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we combine filter
and map
and use collect
to process in single pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
if (profResultsInJob.isEmpty) { | ||
val jobAggAccumulator = new AggAccumHelper() | ||
val perJobRec = jobAggAccumulator.accumPerJob( | ||
jc.stageIds.filter(stageLevelSparkMetrics(index).contains) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, can we replace filter
and map
by collect
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
*/ | ||
def isEmptyAggregates: Boolean = numTasks == 0 | ||
|
||
def resetFields(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we add a comment on why do we need to reset fields here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Also refactored the code to do that within the class which is better OOP
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala
Show resolved
Hide resolved
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala
Show resolved
Hide resolved
*/ | ||
def isEmptyAggregates: Boolean = numTasks == 0 | ||
|
||
def resetFields(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Also refactored the code to do that within the class which is better OOP
@@ -182,66 +176,55 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
if (app.sqlIdToStages.contains(sqlId)) { | |||
val stagesInSQL = app.sqlIdToStages(sqlId) | |||
// TODO: Should we only consider successful tasks? | |||
val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
if (profResultsInJob.isEmpty) { | ||
val jobAggAccumulator = new AggAccumHelper() | ||
val perJobRec = jobAggAccumulator.accumPerJob( | ||
jc.stageIds.filter(stageLevelSparkMetrics(index).contains) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
} | ||
} | ||
|
||
def minWithEmptyHandling(arr: Iterable[Long]): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because it is unused
val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames) | ||
val diagnosticMetricsMap = | ||
sqlAnalyzer.stageToDiagnosticMetrics | ||
.getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reformated the code because it was not easy to read that withDefaultValue
is applied on getOrElse
AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L) | ||
val emptyNodeNames = Seq.empty[String] | ||
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It is better to avoid creating metrics/nodeNames with empty Strings. Because it is harder to notice them and then it could lead to other problems in the CSV files or on joining based on metric names when the string is empty. That's why I replaced empty string with "
N/A
" - Moved the creation of default values outside the map block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @amahussein!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein ! This is a great refactor. Runtime down by 80% and memory usage optimization is nice indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein. LGTME.
@@ -45,22 +40,19 @@ class AggAccumHelper { | |||
|
|||
def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = { | |||
val resRec = createStageAccumRecord() | |||
initializeRecord(resRec, taskRecords) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also wondering the need to initializeRecord()
before.
Signed-off-by: Ahmed Hussein (amahussein) [email protected]
Contributes to #1461
This commit improves the implementation of aggregation accross raw metrics by replacing the builtin scala collections with accumulators.
In legacy implementation:
The cost of aggregation was too high:
O(m X n)
: where n is the number of the tasks and m is the number of metric fields inTaskModel
filterKeys
turned to be very expensive becauseO(m X n^2)
where m is the number of keys in the hashMap andn
is the number of stages per SQL/StageIn new implementation:
O(Kn)
wheren
is the number of tasks, andK
is a constant (number of fields)filterKeys
with a loop on the stageIDs and usingfilter
andcontains
:Impact of performance
Code changes in details
This pull request introduces significant changes to the
AppSparkMetricsAnalyzer
class and related utility classes to improve the aggregation of Spark metrics. The changes include the addition of helper classes for accumulating metrics and the refactoring of existing methods to use these helpers. The most important changes are outlined below:Refactoring and Code Simplification:
AppSparkMetricsAnalyzer
class to use the newAggAccumHelper
andAggAccumPhotonHelper
classes for aggregating metrics, simplifying the code and improving readability. [1] [2] [3] [4]New Helper Classes:
AggAccumHelper
class to facilitate the accumulation of aggregate metrics, allowing for future customization and parallel processing.AggAccumPhotonHelper
class to extendAggAccumHelper
for Photon-specific metrics, handling shuffle write values and peak memory values.New Accumulator Classes:
JobAggAccum
class to optimize the aggregation of job-level metrics by avoiding the use of the Scala collections API on each field for the entire number of tasks/stages in a job.SQLAggAccum
class to optimize the aggregation of SQL-level metrics, including the calculation of executor CPU ratio and average input bytes read.Import and Dependency Cleanup:
AppSparkMetricsAnalyzer.scala
, removing unused imports and consolidating others for better organization. [1] [2]These changes collectively improve the maintainability and performance of the
AppSparkMetricsAnalyzer
class by leveraging the new helper and accumulator classes.