Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AccumManager Usage + Refactoring #14

Open
wants to merge 19 commits into
base: rapids-tools-815-1202
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SQLMetricsStats, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
Expand Down Expand Up @@ -274,41 +274,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap

def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
allSQLMetrics.flatMap { metric =>
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) =>
// Avoid getOrElse to reduce memory allocations
jc.sqlID.isDefined && jc.sqlID.get == metric.sqlID
}
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSet
val accumsOpt = app.taskStageAccumMap.get(metric.accumulatorId)
val taskMax = accumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
stageIdsForSQL.contains(a.stageId)
}
// If metricType is size, average or timing, we want to read field `update` value
// to get the min, median, max, and total. Otherwise, we want to use field `value`.
if (SQLMetricsStats.hasStats(metric.metricType)) {
val accumValues = filtered.map(_.update.getOrElse(0L)).sortWith(_ < _)
if (accumValues.isEmpty) {
None
}
else if (accumValues.length <= 1) {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
} else {
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
accumValues(accumValues.size - 1), accumValues.sum))
}
} else {
val accumValues = filtered.map(_.value.getOrElse(0L))
if (accumValues.isEmpty) {
None
} else {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.max))
}
}
case None => None
}

val accumTaskStats = app.accumManager.calculateAccStats(metric.accumulatorId)
// local mode driver gets updates
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
Expand All @@ -329,8 +295,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
None
}

if (taskMax.isDefined || driverMax.isDefined) {
val taskInfo = taskMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))
if (accumTaskStats.isDefined || driverMax.isDefined) {
val taskInfo = accumTaskStats.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))
val driverInfo = driverMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))

val max = Math.max(taskInfo.max, driverInfo.max)
Expand Down Expand Up @@ -358,45 +324,21 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {

def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = {
// drop the metrics if there are no values
if (updates.isEmpty) {
None
} else if (updates.length == 1) {
Some(StatisticsMetrics(0L, 0L, 0L, updates.sum))
} else {
Some(StatisticsMetrics(
min = updates.head,
med = updates(updates.size / 2),
max = updates.last,
total = updates.sum
))
}
}

// Process taskStageAccumMap to get all the accumulators
val stageLevelAccums = app.taskStageAccumMap.values.flatten
val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId)
groupedByAccumulatorId.flatMap { case (accumulatorId, accums) =>
// Extract and sort the update values, defaulting to 0 if not present
val sortedUpdates = accums.flatMap(_.update).toSeq.sorted

// Compute the statistics for the accumulator if applicable
computeStatistics(sortedUpdates).map { stats =>
val sampleAccum = accums.head
AccumProfileResults(
appIndex = appIndex,
stageId = sampleAccum.stageId.toString,
accumulatorId = accumulatorId,
name = sampleAccum.name.getOrElse("Unknown"),
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
}.toSeq
app.accumManager.accumInfoMap.map( entry => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look correct.
calculateAccStats(accumId) gets the stats by accumulating the taskMap.
In order to get stageLevel accumulable values, we need to filter stageIds that belong to the specific stage. This is because an accumulable can be updated by more than one Stage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes have been corrected in the latest code to do stage level metric aggregation

val accumId = entry._1
val accumInfo = entry._2
val accumStats = app.accumManager.calculateAccStats(accumId)
AccumProfileResults(
appIndex = appIndex,
stageId = accumInfo.stageValuesMap.keySet.head.toString,
accumulatorId = accumId,
name = accumInfo.infoRef.name.value,
min = accumStats.map(_.min).getOrElse(0L),
median = accumStats.map(_.med).getOrElse(0L),
max = accumStats.map(_.max).getOrElse(0L),
total = accumStats.map(_.total).getOrElse(0L)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too many accumStats.map here
we can do that stats are defined after val accumStats

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

)
}).toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ object SQLPlanParser extends Logging {

def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Set[Int] = {
val nodeAccums = node.metrics.map(_.accumulatorId)
nodeAccums.flatMap { nodeAccumId =>
// val res = app.accumManager.getAccStageIds(nodeAccumId)
app.stageManager.getStagesIdsByAccumId(nodeAccumId)
}.toSet
nodeAccums.flatMap(app.accumManager.getAccStageIds).toSet
}

// Set containing execs that refers to other expressions. We need this to be a list to allow
Expand Down Expand Up @@ -614,15 +611,10 @@ object SQLPlanParser extends Logging {
* the duration.
*/
def getTotalDuration(accumId: Option[Long], app: AppBase): Option[Long] = {
val taskForAccum = accumId.flatMap(id => app.taskStageAccumMap.get(id))
.getOrElse(ArrayBuffer.empty)
val accumValues = taskForAccum.map(_.value.getOrElse(0L))
val maxDuration = if (accumValues.isEmpty) {
None
} else {
Some(accumValues.max)
accumId match {
case Some(x) => app.accumManager.getMaxStageValue(x)
case _ => None
}
maxDuration
}

def getDriverTotalDuration(accumId: Option[Long], app: AppBase): Option[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CompareApplications(apps: Seq[ApplicationInfo]) extends Logging {
val normalizedByAppId = apps.map { app =>
val normalized = app.sqlPlans.mapValues { plan =>
SparkPlanInfoWithStage(plan,
app.stageManager.getAccumToSingleStage()).normalizeForStageComparison
app.accumManager.getAccumSingleStage).normalizeForStageComparison
}
(app.appId, normalized)
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object GenerateDot {
val accumSummary = accums.map { a =>
Seq(a.sqlID, a.accumulatorId, a.total)
}
val accumIdToStageId = app.stageManager.getAccumToSingleStage()
val accumIdToStageId = app.accumManager.getAccumSingleStage
val formatter = java.text.NumberFormat.getIntegerInstance
val stageIdToStageMetrics = app.taskManager.stageAttemptToTasks.collect { case (stageId, _) =>
val tasks = app.taskManager.getAllTasksStageAttempt(stageId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import com.nvidia.spark.rapids.tool.ToolTextFileWriter

import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.{AccNameRef, AccumInfo}

abstract class TimelineTiming(
val startTime: Long,
Expand Down Expand Up @@ -273,27 +274,22 @@ object GenerateTimeline {
}
}

val semMetricsNs = semWaitIds.toList.flatMap { id =>
app.taskStageAccumMap.get(id)
}.flatten
val semMetricsNs = semWaitIds.toList
.flatMap(app.accumManager.accumInfoMap.get)
.flatMap(_.taskUpdatesMap.values).sum

val semMetricsMs = app.taskStageAccumMap.values.filter { buffer =>
buffer.headOption.exists { h =>
h.name.exists(_ == "gpuSemaphoreWait")
}
}.flatten
val semMetricsMs = app.accumManager.accumInfoMap.flatMap{
case (_,accumInfo: AccumInfo)
if accumInfo.infoRef.name == AccNameRef.NAMES_TABLE.get("gpuSemaphoreWait") =>
Some(accumInfo.taskUpdatesMap.values.sum)
case _ => None
}.sum

val readMetrics = readTimeIds.toList.flatMap { id =>
app.taskStageAccumMap.get(id)
}.flatten
val readMetrics = readTimeIds.toList.flatMap(app.accumManager.accumInfoMap.get)

val opMetrics = opTimeIds.toList.flatMap { id =>
app.taskStageAccumMap.get(id)
}.flatten
val opMetrics = opTimeIds.toList.flatMap(app.accumManager.accumInfoMap.get)

val writeMetrics = writeTimeIds.toList.flatMap { id =>
app.taskStageAccumMap.get(id)
}.flatten
val writeMetrics = writeTimeIds.toList.flatMap(app.accumManager.accumInfoMap.get)

app.taskManager.getAllTasks().foreach { tc =>
val host = tc.host
Expand All @@ -303,20 +299,12 @@ object GenerateTimeline {
val launchTime = tc.launchTime
val finishTime = tc.finishTime
val duration = tc.duration
val semTimeMs = (semMetricsNs.filter { m =>
m.stageId == stageId && m.taskId.contains(taskId) && m.update.isDefined
}.flatMap(_.update).sum / 1000000) + (semMetricsMs.filter{ m =>
m.stageId == stageId && m.taskId.contains(taskId) && m.update.isDefined
}.flatMap(_.update).sum)
val readTimeMs = readMetrics.filter { m =>
m.stageId == stageId && m.taskId.contains(taskId) && m.update.isDefined
}.flatMap(_.update).sum / 1000000 + tc.sr_fetchWaitTime
val opTimeMs = opMetrics.filter { m =>
m.stageId == stageId && m.taskId.contains(taskId) && m.update.isDefined
}.flatMap(_.update).sum / 1000000
val writeTimeMs = writeMetrics.filter { m =>
m.stageId == stageId && m.taskId.contains(taskId) && m.update.isDefined
}.flatMap(_.update).sum / 1000000 + tc.sw_writeTime
val semTimeMs = ( semMetricsNs / 1000000) + semMetricsMs
val readTimeMs = readMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000 +
tc.sr_fetchWaitTime
val opTimeMs = opMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000
val writeTimeMs = writeMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000 +
tc.sw_writeTime
val taskInfo = new TimelineTaskInfo(stageId, taskId, launchTime, finishTime, duration,
tc.executorDeserializeTime, readTimeMs, semTimeMs, opTimeMs, writeTimeMs)
val execHost = s"$execId/$host"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, M
import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo}
import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser}
import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode
import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DataSourceCase, DriverAccumCase, JobInfoClass, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase, TaskStageAccumCase}
import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DataSourceCase, DriverAccumCase, JobInfoClass, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand Down Expand Up @@ -79,8 +79,6 @@ abstract class AppBase(
var sqlPlanMetricsAdaptive: ArrayBuffer[SQLPlanMetricsCase] = ArrayBuffer[SQLPlanMetricsCase]()

// accum id to task stage accum info
var taskStageAccumMap: HashMap[Long, ArrayBuffer[TaskStageAccumCase]] =
HashMap[Long, ArrayBuffer[TaskStageAccumCase]]()
lazy val accumManager: AccumManager = new AccumManager()

lazy val stageManager: StageModelManager = new StageModelManager()
Expand Down Expand Up @@ -181,9 +179,8 @@ abstract class AppBase(
}

def cleanupAccumId(accId: Long): Unit = {
taskStageAccumMap.remove(accId)
accumManager.removeAccumInfo(accId)
driverAccumMap.remove(accId)
stageManager.removeAccumulatorId(accId)
}

def cleanupStages(stageIds: Set[Int]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids.tool
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DriverAccumCase, JobInfoClass, ProfileUtils, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase, TaskStageAccumCase}
import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DriverAccumCase, JobInfoClass, ProfileUtils, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase}

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -349,18 +349,9 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
app: T,
event: SparkListenerTaskEnd): Unit = {
// TODO: this implementation needs to be updated to use attemptID
// Update the map between accumulators and stages
app.stageManager.addAccumIdToStage(
event.stageId, event.taskInfo.accumulables.map(_.id))
// Parse task accumulables
for (res <- event.taskInfo.accumulables) {
try {
EventUtils.buildTaskStageAccumFromAccumInfo(res,
event.stageId, event.stageAttemptId, Some(event.taskInfo.taskId)).foreach { thisMetric =>
val arrBuf = app.taskStageAccumMap.getOrElseUpdate(res.id,
ArrayBuffer[TaskStageAccumCase]())
arrBuf += thisMetric
}
app.accumManager.addAccToTask(event.stageId, event.taskInfo.taskId, res)
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -470,12 +461,6 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
for (res <- event.stageInfo.accumulables) {
try {
val accumInfo = res._2
EventUtils.buildTaskStageAccumFromAccumInfo(accumInfo,
event.stageInfo.stageId, event.stageInfo.attemptNumber()).foreach { thisMetric =>
val arrBuf = app.taskStageAccumMap.getOrElseUpdate(accumInfo.id,
ArrayBuffer[TaskStageAccumCase]())
arrBuf += thisMetric
}
app.accumManager.addAccToStage(event.stageInfo.stageId, accumInfo)
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package org.apache.spark.sql.rapids.tool.store

case class AccMetaRef(id: Long, name: AccNameRef) {

}
case class AccMetaRef(id: Long, name: AccNameRef)

object AccMetaRef {
def apply(id: Long, name: Option[String]): AccMetaRef =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,25 @@ import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.rapids.tool.util.EventUtils.normalizeMetricName

case class AccNameRef(value: String) {

}
case class AccNameRef(value: String)

object AccNameRef {
val EMPTY_ACC_NAME_REF: AccNameRef = new AccNameRef("N/A")
val namesTable: ConcurrentHashMap[String, AccNameRef] =
new ConcurrentHashMap[String, AccNameRef]()
private val EMPTY_ACC_NAME_REF: AccNameRef = new AccNameRef("N/A")
val NAMES_TABLE: ConcurrentHashMap[String, AccNameRef] = {
val initMap = new ConcurrentHashMap[String, AccNameRef]()
initMap.put("gpuSemaphoreWait", fromString("gpuSemaphoreWait"))
initMap
}

def internAccName(name: Option[String]): AccNameRef = {
name match {
case Some(n) =>
namesTable.computeIfAbsent(n, AccNameRef.fromString)
NAMES_TABLE.computeIfAbsent(n, AccNameRef.fromString)
case _ =>
AccNameRef.EMPTY_ACC_NAME_REF
}
}

def fromString(value: String): AccNameRef =
new AccNameRef(normalizeMetricName(value))
}
Loading
Loading