Skip to content

Commit

Permalink
Adding comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari committed Aug 2, 2024
1 parent d157a41 commit 5339dd9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package org.apache.spark.sql.rapids.tool.store

/**
* Accumulator Meta Reference
* This maintains the reference to the metadata associated with an accumulable
* @param id - Accumulable id
* @param name - Reference to the accumulator name
*/
case class AccMetaRef(id: Long, name: AccNameRef)

object AccMetaRef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.rapids.tool.util.EventUtils.normalizeMetricName

/**
* Accumulator Name Reference
* This maintains references to all accumulator names
* @param value
*/
case class AccNameRef(value: String)

object AccNameRef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.rapids.tool.util.EventUtils.parseAccumFieldToLong

/**
* Maintains the accumulator information for a single accumulator
* This maintains following information:
* 1. Task updates for the accumulator - a map of all taskIds and their update values
* 2. Stage values for the accumulator - a map of all stageIds and their total values
* 3. AccumMetaRef for the accumulator - a reference to the Meta information
* @param infoRef - AccumMetaRef for the accumulator
*/
class AccumInfo(val infoRef: AccMetaRef) {
// TODO: Should we use sorted maps for stageIDs and taskIds?
val taskUpdatesMap: mutable.HashMap[Long, Long] =
Expand Down Expand Up @@ -51,8 +59,6 @@ class AccumInfo(val infoRef: AccMetaRef) {
parsedUpdateValue match {
case Some(v) =>
taskUpdatesMap.put(taskId, v + existingUpdateValue)
// update teh stage if the task's update is non-zero
updateStageFlag ||= v != 0
case None =>
taskUpdatesMap.put(taskId, existingUpdateValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics

import org.apache.spark.scheduler.AccumulableInfo

/**
* A class that manages all accumulables -
* maintains a map of accumulable id to AccumInfo
*/
class AccumManager {
val accumInfoMap: mutable.HashMap[Long, AccumInfo] = {
new mutable.HashMap[Long, AccumInfo]()
}

def getOrCreateAccumInfo(id: Long, name: Option[String]): AccumInfo = {
private def getOrCreateAccumInfo(id: Long, name: Option[String]): AccumInfo = {
accumInfoMap.getOrElseUpdate(id, new AccumInfo(AccMetaRef(id, name)))
}

Expand All @@ -52,7 +56,6 @@ class AccumManager {
}.toMap
}


def removeAccumInfo(id: Long): Option[AccumInfo] = {
accumInfoMap.remove(id)
}
Expand Down

0 comments on commit 5339dd9

Please sign in to comment.