Skip to content

Commit

Permalink
Changes for correcting reformating usage
Browse files Browse the repository at this point in the history
Signed-off-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari committed Aug 5, 2024
1 parent 22c6db4 commit 4c85f65
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}

app.accumManager.accumInfoMap.flatMap{ accumMapEntry =>
val accumId = accumMapEntry._1
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val tasks = app.taskManager.getAllTasksStageAttempt(stageId)
Expand All @@ -355,8 +354,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
AccumProfileResults(
appIndex,
stageId,
accumId,
accumInfo.infoRef.name.value,
accumInfo.infoRef,
min = stats.min,
median = stats.med,
max = stats.max,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package com.nvidia.spark.rapids.tool.profiling
import scala.collection.Map

import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
import org.apache.spark.sql.rapids.tool.store.AccMetaRef
import org.apache.spark.sql.rapids.tool.store.AccNameRef.getCSVSupportedAccumName
import org.apache.spark.sql.rapids.tool.util.StringUtils

/**
Expand Down Expand Up @@ -219,20 +221,20 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
}
}

case class AccumProfileResults(appIndex: Int, stageId: Int, accumulatorId: Long, name: String,
case class AccumProfileResults(appIndex: Int, stageId: Int, accumMetaRef: AccMetaRef,
min: Long, median: Long, max: Long, total: Long) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min",
"median", "max", "total")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, stageId.toString, accumulatorId.toString, name, min.toString,
median.toString, max.toString, total.toString)
Seq(appIndex.toString, stageId.toString, accumMetaRef.id.toString, accumMetaRef.name.value,
min.toString, median.toString, max.toString, total.toString)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, stageId.toString, accumulatorId.toString,
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString,
total.toString)
Seq(appIndex.toString, stageId.toString, accumMetaRef.id.toString,
getCSVSupportedAccumName(accumMetaRef.name), min.toString,
median.toString, max.toString, total.toString)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] {

override def sortView(
rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = {
rows.sortBy(cols => (cols.appIndex, cols.stageId, cols.accumulatorId))
rows.sortBy(cols => (cols.appIndex, cols.stageId, cols.accumMetaRef.id))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ case class AccMetaRef(id: Long, name: AccNameRef)

object AccMetaRef {
def apply(id: Long, name: Option[String]): AccMetaRef =
new AccMetaRef(id, AccNameRef.internAccName(name))
new AccMetaRef(id, AccNameRef.getInternalAccName(name))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package org.apache.spark.sql.rapids.tool.store

import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable

import org.apache.spark.sql.rapids.tool.util.EventUtils.normalizeMetricName
import org.apache.spark.sql.rapids.tool.util.StringUtils

/**
* Accumulator Name Reference
Expand All @@ -29,13 +32,15 @@ case class AccNameRef(value: String)

object AccNameRef {
private val EMPTY_ACC_NAME_REF: AccNameRef = new AccNameRef("N/A")
private val CSV_SUPPORTED_NAME_MAP: mutable.WeakHashMap[AccNameRef, String] =
mutable.WeakHashMap()
val NAMES_TABLE: ConcurrentHashMap[String, AccNameRef] = {
val initMap = new ConcurrentHashMap[String, AccNameRef]()
initMap.put("gpuSemaphoreWait", fromString("gpuSemaphoreWait"))
initMap
}

def internAccName(name: Option[String]): AccNameRef = {
def getInternalAccName(name: Option[String]): AccNameRef = {
name match {
case Some(n) =>
NAMES_TABLE.computeIfAbsent(n, AccNameRef.fromString)
Expand All @@ -44,6 +49,14 @@ object AccNameRef {
}
}

def getCSVSupportedAccumName(accNameRef: AccNameRef): String = {
synchronized{
CSV_SUPPORTED_NAME_MAP.getOrElseUpdate(accNameRef, {
StringUtils.reformatCSVString(accNameRef.value)
})
}
}

def fromString(value: String): AccNameRef =
new AccNameRef(normalizeMetricName(value))
}

0 comments on commit 4c85f65

Please sign in to comment.