Skip to content

Commit

Permalink
Review comment changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari committed Aug 1, 2024
1 parent 4baa9ba commit d157a41
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,20 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.map( entry => {
val accumId = entry._1
val accumInfo = entry._2
app.accumManager.accumInfoMap.map( accumMapEntry => {
val accumId = accumMapEntry._1
val accumInfo = accumMapEntry._2
val accumStats = app.accumManager.calculateAccStats(accumId)
.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))
AccumProfileResults(
appIndex = appIndex,
stageId = accumInfo.stageValuesMap.keySet.head.toString,
accumulatorId = accumId,
name = accumInfo.infoRef.name.value,
min = accumStats.map(_.min).getOrElse(0L),
median = accumStats.map(_.med).getOrElse(0L),
max = accumStats.map(_.max).getOrElse(0L),
total = accumStats.map(_.total).getOrElse(0L)
min = accumStats.min,
median = accumStats.med,
max = accumStats.max,
total = accumStats.total
)
}).toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,32 @@ class AccumInfo(val infoRef: AccMetaRef) {
def addAccToStage(stageId: Int,
accumulableInfo: AccumulableInfo,
update: Option[Long] = None): Unit = {
val value = accumulableInfo.value.flatMap(parseAccumFieldToLong)
val parsedValue = accumulableInfo.value.flatMap(parseAccumFieldToLong)
val existingValue = stageValuesMap.getOrElse(stageId, 0L)
value match {
case Some(v) =>
// This assert prevents out of order events to be processed
assert( v >= existingValue,
s"Stage $stageId: Out of order events detected.")
stageValuesMap.put(stageId, v)
case _ =>
val incomingUpdate = update.getOrElse(0L)
assert( incomingUpdate >= existingValue,
s"Stage $stageId: Out of order events detected.")
// this case is for metrics that are not parsed as long
// We track the accumId to stageId and taskId mapping
stageValuesMap.put(stageId, incomingUpdate)
val incomingValue = parsedValue match {
case Some(v) => v
case _ => update.getOrElse(0L)
}
stageValuesMap.put(stageId, Math.max(existingValue, incomingValue))
}

def addAccToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
val update = accumulableInfo.update.flatMap(parseAccumFieldToLong)
val parsedUpdateValue = accumulableInfo.update.flatMap(parseAccumFieldToLong)
// we have to update the stageMap if the stageId does not exist in the map
var updateStageFlag = !stageValuesMap.contains(stageId)
// TODO: Task can update an accum multiple times. Should account for that case.
// This is for cases where same task updates the same accum multiple times
val existingUpdate = taskUpdatesMap.getOrElse(taskId, 0L)
update match {
val existingUpdateValue = taskUpdatesMap.getOrElse(taskId, 0L)
parsedUpdateValue match {
case Some(v) =>
taskUpdatesMap.put(taskId, v + existingUpdate)
taskUpdatesMap.put(taskId, v + existingUpdateValue)
// update teh stage if the task's update is non-zero
updateStageFlag ||= v != 0
case None =>
taskUpdatesMap.put(taskId, existingUpdate)
taskUpdatesMap.put(taskId, existingUpdateValue)
}
// update the stage value map if necessary
if (updateStageFlag) {
addAccToStage(stageId, accumulableInfo, update.map(_ + existingUpdate))
addAccToStage(stageId, accumulableInfo, parsedUpdateValue)
}
}

Expand Down

0 comments on commit d157a41

Please sign in to comment.