forked from NVIDIA/spark-rapids-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AccumManager Usage + Refactoring #14
Open
bilalbari
wants to merge
19
commits into
rapids-tools-815-1202
Choose a base branch
from
tools_1202_bilal
base: rapids-tools-815-1202
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
e4cca3d
fix instance file tyle error (#1232)
cindyyuanjiang fa9d11e
Improve console output from python tool (#1235)
parthosa 0b65f8b
Handle event logs with wildcards in status report generation (#1237)
parthosa 6b2b0a8
Improvements in Cluster Config Recommender (#1241)
parthosa 0f5ff5e
updated dataproc instance catalog for n1 series (#1242)
cindyyuanjiang 941100c
Changes for removing taskStageAccumMap
bilalbari 9c98024
Qualification tool should print Kryo related recommendations (#1204)
tgravescs c6c62fa
Add footnotes for config recommendations and speedup category (#1243)
parthosa c926094
Removing unused imports
bilalbari 81b98c0
Removing accum to stage map
bilalbari 377407e
variables name refactoring
bilalbari 4baa9ba
Making access private
bilalbari 04875ae
Update xgboost models and metrics (#1244)
leewyang d157a41
Review comment changes
bilalbari 4e12f99
Fix stage level metrics output csv file (#1251)
nartal1 31b82d1
Replace split_nds with split_train_val (#1252)
leewyang 5339dd9
Adding comments
bilalbari 22c6db4
Latest dev pull + changes
bilalbari 4c85f65
Changes for correcting reformating usage
bilalbari File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer | |
|
||
import org.apache.spark.sql.execution.SparkPlanInfo | ||
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} | ||
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SQLMetricsStats, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry} | ||
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry} | ||
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo | ||
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo | ||
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph | ||
|
@@ -274,41 +274,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap | |
|
||
def generateSQLAccums(): Seq[SQLAccumProfileResults] = { | ||
allSQLMetrics.flatMap { metric => | ||
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) => | ||
// Avoid getOrElse to reduce memory allocations | ||
jc.sqlID.isDefined && jc.sqlID.get == metric.sqlID | ||
} | ||
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSet | ||
val accumsOpt = app.taskStageAccumMap.get(metric.accumulatorId) | ||
val taskMax = accumsOpt match { | ||
case Some(accums) => | ||
val filtered = accums.filter { a => | ||
stageIdsForSQL.contains(a.stageId) | ||
} | ||
// If metricType is size, average or timing, we want to read field `update` value | ||
// to get the min, median, max, and total. Otherwise, we want to use field `value`. | ||
if (SQLMetricsStats.hasStats(metric.metricType)) { | ||
val accumValues = filtered.map(_.update.getOrElse(0L)).sortWith(_ < _) | ||
if (accumValues.isEmpty) { | ||
None | ||
} | ||
else if (accumValues.length <= 1) { | ||
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum)) | ||
} else { | ||
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2), | ||
accumValues(accumValues.size - 1), accumValues.sum)) | ||
} | ||
} else { | ||
val accumValues = filtered.map(_.value.getOrElse(0L)) | ||
if (accumValues.isEmpty) { | ||
None | ||
} else { | ||
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.max)) | ||
} | ||
} | ||
case None => None | ||
} | ||
|
||
val accumTaskStats = app.accumManager.calculateAccStats(metric.accumulatorId) | ||
// local mode driver gets updates | ||
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId) | ||
val driverMax = driverAccumsOpt match { | ||
|
@@ -329,8 +295,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap | |
None | ||
} | ||
|
||
if (taskMax.isDefined || driverMax.isDefined) { | ||
val taskInfo = taskMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L)) | ||
if (accumTaskStats.isDefined || driverMax.isDefined) { | ||
val taskInfo = accumTaskStats.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L)) | ||
val driverInfo = driverMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L)) | ||
|
||
val max = Math.max(taskInfo.max, driverInfo.max) | ||
|
@@ -358,45 +324,21 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap | |
* @return a sequence of AccumProfileResults | ||
*/ | ||
def generateStageLevelAccums(): Seq[AccumProfileResults] = { | ||
|
||
def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = { | ||
// drop the metrics if there are no values | ||
if (updates.isEmpty) { | ||
None | ||
} else if (updates.length == 1) { | ||
Some(StatisticsMetrics(0L, 0L, 0L, updates.sum)) | ||
} else { | ||
Some(StatisticsMetrics( | ||
min = updates.head, | ||
med = updates(updates.size / 2), | ||
max = updates.last, | ||
total = updates.sum | ||
)) | ||
} | ||
} | ||
|
||
// Process taskStageAccumMap to get all the accumulators | ||
val stageLevelAccums = app.taskStageAccumMap.values.flatten | ||
val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId) | ||
groupedByAccumulatorId.flatMap { case (accumulatorId, accums) => | ||
// Extract and sort the update values, defaulting to 0 if not present | ||
val sortedUpdates = accums.flatMap(_.update).toSeq.sorted | ||
|
||
// Compute the statistics for the accumulator if applicable | ||
computeStatistics(sortedUpdates).map { stats => | ||
val sampleAccum = accums.head | ||
AccumProfileResults( | ||
appIndex = appIndex, | ||
stageId = sampleAccum.stageId.toString, | ||
accumulatorId = accumulatorId, | ||
name = sampleAccum.name.getOrElse("Unknown"), | ||
min = stats.min, | ||
median = stats.med, | ||
max = stats.max, | ||
total = stats.total | ||
) | ||
} | ||
}.toSeq | ||
app.accumManager.accumInfoMap.map( entry => { | ||
val accumId = entry._1 | ||
val accumInfo = entry._2 | ||
val accumStats = app.accumManager.calculateAccStats(accumId) | ||
AccumProfileResults( | ||
appIndex = appIndex, | ||
stageId = accumInfo.stageValuesMap.keySet.head.toString, | ||
accumulatorId = accumId, | ||
name = accumInfo.infoRef.name.value, | ||
min = accumStats.map(_.min).getOrElse(0L), | ||
median = accumStats.map(_.med).getOrElse(0L), | ||
max = accumStats.map(_.max).getOrElse(0L), | ||
total = accumStats.map(_.total).getOrElse(0L) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated |
||
) | ||
}).toSeq | ||
} | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look correct.
calculateAccStats(accumId)
gets the stats by accumulating the taskMap.In order to get stageLevel accumulable values, we need to filter stageIds that belong to the specific stage. This is because an accumulable can be updated by more than one Stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes have been corrected in the latest code to do stage level metric aggregation