Skip to content

Commit

Permalink
Improve implementation of finding median in StatisticsMetrics
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Fixes NVIDIA#1461

Adds an InPlace median finding to improve the performance of the metric
aggregates.
We used to sort a sequence to create StatisticsMetrics which turned out
to be very expensive in large eventlogs.

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein committed Dec 19, 2024
1 parent 18b0472 commit ad9d0a5
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
Expand All @@ -29,6 +30,8 @@ import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.store.DataSourceRecord
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph



/**
* This class processes SQL plan to build some information such as: metrics, wholeStage nodes, and
* connecting operators to nodes. The implementation used to be directly under Profiler's
Expand Down Expand Up @@ -265,7 +268,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val jobsWithSQL = app.jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
jobsWithSQL.flatMap { case (jobId, j) =>
val stages = j.stageIds
val stagesInJob = app.stageManager.getStagesByIds(stages)
stagesInJob.map { sModel =>
Expand All @@ -283,8 +286,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
}
sqlToStages.toSeq
}(breakOut)
}

def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
Expand All @@ -294,20 +296,11 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
a.sqlID == metric.sqlID
}
val accumValues = filtered.map(_.value).sortWith(_ < _)
if (accumValues.isEmpty) {
None
} else if (accumValues.length <= 1) {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
} else {
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
accumValues(accumValues.size - 1), accumValues.sum))
}
case None =>
None
StatisticsMetrics.createOptionalFromArr(accums.collect {
case a if a.sqlID == metric.sqlID =>
a.value
}(breakOut))
case _ => None
}

if (accumTaskStats.isDefined || driverMax.isDefined) {
Expand All @@ -325,7 +318,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
None
}
}
}(breakOut)
}

/**
Expand All @@ -341,40 +334,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet
// get the task updates that belong to that stage
val taskUpatesSubset =
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds.contains).values.toSeq.sorted
if (taskUpatesSubset.isEmpty) {
None
} else {
val min = taskUpatesSubset.head
val max = taskUpatesSubset.last
val sum = taskUpatesSubset.sum
val median = if (taskUpatesSubset.size % 2 == 0) {
val mid = taskUpatesSubset.size / 2
(taskUpatesSubset(mid) + taskUpatesSubset(mid - 1)) / 2
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
accumInfo.stageValuesMap.keys.flatMap( stageId => {
val stageTaskIds: Set[Long] =
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = stat.min,
median = stat.med,
max = stat.max,
total = stat.total)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
case _ => None
}
})
}
}.toSeq
}(breakOut)
}
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import scala.collection.breakOut
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
Expand Down Expand Up @@ -79,7 +80,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return sequence of JobAggTaskMetricsProfileResult that contains only Job Ids
*/
def aggregateSparkMetricsByJob(index: Int): Seq[JobAggTaskMetricsProfileResult] = {
val jobRows = app.jobIdToInfo.flatMap { case (id, jc) =>
app.jobIdToInfo.flatMap { case (id, jc) =>
if (jc.stageIds.isEmpty) {
None
} else {
Expand Down Expand Up @@ -126,8 +127,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perJobRec.swWriteTimeSum))
}
}
}
jobRows.toSeq
}(breakOut)
}

private case class AverageStageInfo(avgDuration: Double, avgShuffleReadBytes: Double)
Expand Down Expand Up @@ -163,7 +163,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
tc.taskId, tc.attempt, tc.duration, avg.avgDuration, tc.sr_totalBytesRead,
avg.avgShuffleReadBytes, tc.peakExecutionMemory, tc.successful, tc.endReason)
}
}.toSeq
}(breakOut)
}

/**
Expand All @@ -172,7 +172,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return sequence of SQLTaskAggMetricsProfileResult
*/
def aggregateSparkMetricsBySql(index: Int): Seq[SQLTaskAggMetricsProfileResult] = {
val sqlRows = app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) =>
app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) =>
if (app.sqlIdToStages.contains(sqlId)) {
val stagesInSQL = app.sqlIdToStages(sqlId)
// TODO: Should we only consider successful tasks?
Expand Down Expand Up @@ -229,8 +229,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
} else {
None
}
}
sqlRows.toSeq
}(breakOut)
}

/**
Expand All @@ -241,7 +240,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
*/
def aggregateIOMetricsBySql(
sqlMetricsAggs: Seq[SQLTaskAggMetricsProfileResult]): Seq[IOAnalysisProfileResult] = {
val sqlIORows = sqlMetricsAggs.map { sqlAgg =>
sqlMetricsAggs.map { sqlAgg =>
IOAnalysisProfileResult(sqlAgg.appIndex,
app.appId,
sqlAgg.sqlId,
Expand All @@ -253,8 +252,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlAgg.memoryBytesSpilledSum,
sqlAgg.srTotalBytesReadSum,
sqlAgg.swBytesWrittenSum)
}
sqlIORows
}(breakOut)
}

/**
Expand Down Expand Up @@ -289,7 +287,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return a sequence of SQLDurationExecutorTimeProfileResult or Empty if None.
*/
def aggregateDurationAndCPUTimeBySql(index: Int): Seq[SQLDurationExecutorTimeProfileResult] = {
val sqlRows = app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
// First, build the SQLIssues string by retrieving the potential issues from the
// app.sqlIDtoProblematic map.
val sqlIssues = if (app.sqlIDtoProblematic.contains(sqlId)) {
Expand All @@ -301,8 +299,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
SQLDurationExecutorTimeProfileResult(index, app.appId, sqlCase.rootExecutionID,
sqlId, sqlCase.duration, sqlCase.hasDatasetOrRDD,
app.getAppDuration.orElse(Option(0L)), sqlIssues, sqlCase.sqlCpuTimePercent)
}
sqlRows.toSeq
}(breakOut)
}

/**
Expand Down Expand Up @@ -338,7 +335,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
.getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics)
.withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))
StatisticsMetrics.createFromArr(tasksInStage.map(_.sr_totalBytesRead)(breakOut))

StageDiagnosticResult(index,
app.getAppName,
Expand All @@ -359,7 +356,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
diagnosticMetricsMap(SW_WRITE_TIME_METRIC),
diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC),
nodeNames)
}.toSeq
}(breakOut)
}

/**
Expand Down Expand Up @@ -456,24 +453,3 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
}
}
}


object AppSparkMetricsAnalyzer {
/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
StatisticsMetrics(0L, 0L, 0L, 0L)
} else {
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
val med = if (len % 2 == 0) {
(sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
} else {
sortedArr(len / 2)
}
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,40 @@

package com.nvidia.spark.rapids.tool.analysis

import org.apache.spark.sql.rapids.tool.util.InPlaceMedianArrView.{chooseMidpointPivotInPlace, findMedianInPlace}

// Store (min, median, max, total) for a given metric
case class StatisticsMetrics(min: Long, med: Long, max: Long, total: Long)

object StatisticsMetrics {
// a static variable used to represent zero-statistics instead of allocating a dummy record
// on every calculation.
val ZERO_RECORD: StatisticsMetrics = StatisticsMetrics(0L, 0L, 0L, 0L)

def createFromArr(arr: Array[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
return ZERO_RECORD
}
val medV = findMedianInPlace(arr)(chooseMidpointPivotInPlace)
var minV = Long.MaxValue
var maxV = Long.MinValue
var totalV = 0L
arr.foreach { v =>
if (v < minV) {
minV = v
}
if (v > maxV) {
maxV = v
}
totalV += v
}
StatisticsMetrics(minV, medV, maxV, totalV)
}

def createOptionalFromArr(arr: Array[Long]): Option[StatisticsMetrics] = {
if (arr.isEmpty) {
return None
}
Some(createFromArr(arr))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.tool.store

import scala.collection.mutable
import scala.collection.{breakOut, mutable}

import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics

Expand Down Expand Up @@ -98,22 +98,8 @@ class AccumInfo(val infoRef: AccumMetaRef) {
}

def calculateAccStats(): StatisticsMetrics = {
val sortedTaskUpdates = taskUpdatesMap.values.toSeq.sorted
if (sortedTaskUpdates.isEmpty) {
// do not check stage values because the stats is only meant for task updates
StatisticsMetrics.ZERO_RECORD
} else {
val min = sortedTaskUpdates.head
val max = sortedTaskUpdates.last
val sum = sortedTaskUpdates.sum
val median = if (sortedTaskUpdates.size % 2 == 0) {
val mid = sortedTaskUpdates.size / 2
(sortedTaskUpdates(mid) + sortedTaskUpdates(mid - 1)) / 2
} else {
sortedTaskUpdates(sortedTaskUpdates.size / 2)
}
StatisticsMetrics(min, median, max, sum)
}
// do not check stage values because the stats is only meant for task updates
StatisticsMetrics.createFromArr(taskUpdatesMap.map(_._2)(breakOut))
}

def getMaxStageValue: Option[Long] = {
Expand Down
Loading

0 comments on commit ad9d0a5

Please sign in to comment.