Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve implementation of finding median in StatisticsMetrics #1474

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
Expand Down Expand Up @@ -265,7 +266,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val jobsWithSQL = app.jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
jobsWithSQL.flatMap { case (jobId, j) =>
val stages = j.stageIds
val stagesInJob = app.stageManager.getStagesByIds(stages)
stagesInJob.map { sModel =>
Expand All @@ -283,8 +284,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
}
sqlToStages.toSeq
}(breakOut)
}

def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
Expand All @@ -294,20 +294,11 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
a.sqlID == metric.sqlID
}
val accumValues = filtered.map(_.value).sortWith(_ < _)
if (accumValues.isEmpty) {
None
} else if (accumValues.length <= 1) {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
} else {
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
accumValues(accumValues.size - 1), accumValues.sum))
}
case None =>
None
StatisticsMetrics.createOptionalFromArr(accums.collect {
case a if a.sqlID == metric.sqlID =>
a.value
}(breakOut))
case _ => None
}

if (accumTaskStats.isDefined || driverMax.isDefined) {
Expand All @@ -325,7 +316,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
None
}
}
}(breakOut)
}

/**
Expand All @@ -341,40 +332,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet
// get the task updates that belong to that stage
val taskUpatesSubset =
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds.contains).values.toSeq.sorted
if (taskUpatesSubset.isEmpty) {
None
} else {
val min = taskUpatesSubset.head
val max = taskUpatesSubset.last
val sum = taskUpatesSubset.sum
val median = if (taskUpatesSubset.size % 2 == 0) {
val mid = taskUpatesSubset.size / 2
(taskUpatesSubset(mid) + taskUpatesSubset(mid - 1)) / 2
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
accumInfo.stageValuesMap.keys.flatMap( stageId => {
val stageTaskIds: Set[Long] =
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = stat.min,
median = stat.med,
max = stat.max,
total = stat.total)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
case _ => None
}
})
}
}.toSeq
}(breakOut)
}
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import scala.collection.breakOut
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
Expand Down Expand Up @@ -79,7 +80,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return sequence of JobAggTaskMetricsProfileResult that contains only Job Ids
*/
def aggregateSparkMetricsByJob(index: Int): Seq[JobAggTaskMetricsProfileResult] = {
val jobRows = app.jobIdToInfo.flatMap { case (id, jc) =>
app.jobIdToInfo.flatMap { case (id, jc) =>
if (jc.stageIds.isEmpty) {
None
} else {
Expand Down Expand Up @@ -126,8 +127,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perJobRec.swWriteTimeSum))
}
}
}
jobRows.toSeq
}(breakOut)
}

private case class AverageStageInfo(avgDuration: Double, avgShuffleReadBytes: Double)
Expand Down Expand Up @@ -163,7 +163,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
tc.taskId, tc.attempt, tc.duration, avg.avgDuration, tc.sr_totalBytesRead,
avg.avgShuffleReadBytes, tc.peakExecutionMemory, tc.successful, tc.endReason)
}
}.toSeq
}(breakOut)
}

/**
Expand All @@ -172,7 +172,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return sequence of SQLTaskAggMetricsProfileResult
*/
def aggregateSparkMetricsBySql(index: Int): Seq[SQLTaskAggMetricsProfileResult] = {
val sqlRows = app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) =>
app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) =>
if (app.sqlIdToStages.contains(sqlId)) {
val stagesInSQL = app.sqlIdToStages(sqlId)
// TODO: Should we only consider successful tasks?
Expand Down Expand Up @@ -229,8 +229,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
} else {
None
}
}
sqlRows.toSeq
}(breakOut)
}

/**
Expand All @@ -241,7 +240,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
*/
def aggregateIOMetricsBySql(
sqlMetricsAggs: Seq[SQLTaskAggMetricsProfileResult]): Seq[IOAnalysisProfileResult] = {
val sqlIORows = sqlMetricsAggs.map { sqlAgg =>
sqlMetricsAggs.map { sqlAgg =>
IOAnalysisProfileResult(sqlAgg.appIndex,
app.appId,
sqlAgg.sqlId,
Expand All @@ -253,8 +252,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlAgg.memoryBytesSpilledSum,
sqlAgg.srTotalBytesReadSum,
sqlAgg.swBytesWrittenSum)
}
sqlIORows
}(breakOut)
}

/**
Expand Down Expand Up @@ -289,7 +287,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return a sequence of SQLDurationExecutorTimeProfileResult or Empty if None.
*/
def aggregateDurationAndCPUTimeBySql(index: Int): Seq[SQLDurationExecutorTimeProfileResult] = {
val sqlRows = app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
// First, build the SQLIssues string by retrieving the potential issues from the
// app.sqlIDtoProblematic map.
val sqlIssues = if (app.sqlIDtoProblematic.contains(sqlId)) {
Expand All @@ -301,8 +299,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
SQLDurationExecutorTimeProfileResult(index, app.appId, sqlCase.rootExecutionID,
sqlId, sqlCase.duration, sqlCase.hasDatasetOrRDD,
app.getAppDuration.orElse(Option(0L)), sqlIssues, sqlCase.sqlCpuTimePercent)
}
sqlRows.toSeq
}(breakOut)
}

/**
Expand Down Expand Up @@ -338,7 +335,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
.getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics)
.withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))
StatisticsMetrics.createFromArr(tasksInStage.map(_.sr_totalBytesRead)(breakOut))

StageDiagnosticResult(index,
app.getAppName,
Expand All @@ -359,7 +356,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
diagnosticMetricsMap(SW_WRITE_TIME_METRIC),
diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC),
nodeNames)
}.toSeq
}(breakOut)
}

/**
Expand Down Expand Up @@ -456,24 +453,3 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
}
}
}


object AppSparkMetricsAnalyzer {
/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
StatisticsMetrics(0L, 0L, 0L, 0L)
} else {
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
val med = if (len % 2 == 0) {
(sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
} else {
sortedArr(len / 2)
}
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,40 @@

package com.nvidia.spark.rapids.tool.analysis

import org.apache.spark.sql.rapids.tool.util.InPlaceMedianArrView.{chooseMidpointPivotInPlace, findMedianInPlace}

// Store (min, median, max, total) for a given metric
case class StatisticsMetrics(min: Long, med: Long, max: Long, total: Long)

object StatisticsMetrics {
// a static variable used to represent zero-statistics instead of allocating a dummy record
// on every calculation.
val ZERO_RECORD: StatisticsMetrics = StatisticsMetrics(0L, 0L, 0L, 0L)

def createFromArr(arr: Array[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
return ZERO_RECORD
}
val medV = findMedianInPlace(arr)(chooseMidpointPivotInPlace)
var minV = Long.MaxValue
var maxV = Long.MinValue
var totalV = 0L
arr.foreach { v =>
if (v < minV) {
minV = v
}
if (v > maxV) {
maxV = v
}
totalV += v
}
StatisticsMetrics(minV, medV, maxV, totalV)
}

def createOptionalFromArr(arr: Array[Long]): Option[StatisticsMetrics] = {
if (arr.isEmpty) {
return None
}
Some(createFromArr(arr))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.tool.store

import scala.collection.mutable
import scala.collection.{breakOut, mutable}

import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics

Expand Down Expand Up @@ -98,22 +98,8 @@ class AccumInfo(val infoRef: AccumMetaRef) {
}

def calculateAccStats(): StatisticsMetrics = {
val sortedTaskUpdates = taskUpdatesMap.values.toSeq.sorted
if (sortedTaskUpdates.isEmpty) {
// do not check stage values because the stats is only meant for task updates
StatisticsMetrics.ZERO_RECORD
} else {
val min = sortedTaskUpdates.head
val max = sortedTaskUpdates.last
val sum = sortedTaskUpdates.sum
val median = if (sortedTaskUpdates.size % 2 == 0) {
val mid = sortedTaskUpdates.size / 2
(sortedTaskUpdates(mid) + sortedTaskUpdates(mid - 1)) / 2
} else {
sortedTaskUpdates(sortedTaskUpdates.size / 2)
}
StatisticsMetrics(min, median, max, sum)
}
// do not check stage values because the stats is only meant for task updates
StatisticsMetrics.createFromArr(taskUpdatesMap.map(_._2)(breakOut))
}

def getMaxStageValue: Option[Long] = {
Expand Down
Loading
Loading