Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AccumManager Usage + Refactoring #14

Open
wants to merge 19 commits into
base: rapids-tools-815-1202
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.FileNotFoundException
import java.time.LocalDateTime
import java.util.zip.ZipOutputStream

import scala.collection.mutable.{LinkedHashMap, ListBuffer}
import scala.collection.mutable.LinkedHashMap
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -102,35 +102,38 @@ object EventLogPathProcessor extends Logging {
(dbLogFiles.size > 1)
}

// If the user provides wildcard in the eventlogs, then the path processor needs
// to process the path as a pattern. Otherwise, HDFS throws an exception mistakenly that
// no such file exists.
// Once the glob path is checked, the list of eventlogs is the result of the flattenMap
// of all the processed files.
/**
* If the user provides wildcard in the eventlogs, then the path processor needs
* to process the path as a pattern. Otherwise, HDFS throws an exception mistakenly that
* no such file exists.
* Once the glob path is checked, the list of eventlogs is the result of the flattenMap
* of all the processed files.
*
* @return List of (rawPath, List[processedPaths])
*/
private def processWildcardsLogs(eventLogsPaths: List[String],
hadoopConf: Configuration): List[String] = {
val processedLogs = ListBuffer[String]()
eventLogsPaths.foreach { rawPath =>
hadoopConf: Configuration): List[(String, List[String])] = {
eventLogsPaths.map { rawPath =>
if (!rawPath.contains("*")) {
processedLogs += rawPath
(rawPath, List(rawPath))
} else {
try {
val globPath = new Path(rawPath)
val fileContext = FileContext.getFileContext(globPath.toUri(), hadoopConf)
val fileStatuses = fileContext.util().globStatus(globPath)
processedLogs ++= fileStatuses.map(_.getPath.toString)
val paths = fileStatuses.map(_.getPath.toString).toList
(rawPath, paths)
} catch {
case _ : Throwable =>
// Do not fail in this block.
// Instead, ignore the error and add the file as is; then the caller should fail when
// processing the file.
// This will make handling errors more consistent during the processing of the analysis
logWarning(s"Processing pathLog with wildCard has failed: $rawPath")
processedLogs += rawPath
(rawPath, List.empty)
}
}
}
processedLogs.toList
}

def getEventLogInfo(pathString: String,
Expand Down Expand Up @@ -226,7 +229,13 @@ object EventLogPathProcessor extends Logging {
eventLogsPaths: List[String],
hadoopConf: Configuration): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf)
val logsWithTimestamp = logsPathNoWildCards.flatMap(getEventLogInfo(_, hadoopConf)).toMap
val logsWithTimestamp = logsPathNoWildCards.flatMap {
case (rawPath, processedPaths) if processedPaths.isEmpty =>
// If no event logs are found in the path after wildcard expansion, return a failed event
Map(FailedEventLog(new Path(rawPath), s"No event logs found in $rawPath") -> None)
case (_, processedPaths) =>
processedPaths.flatMap(getEventLogInfo(_, hadoopConf))
}.toMap

logDebug("Paths after stringToPath: " + logsWithTimestamp)
// Filter the event logs to be processed based on the criteria. If it is not provided in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SQLMetricsStats, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
Expand Down Expand Up @@ -66,6 +66,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
/**
* Connects Operators to Stages using AccumulatorIDs.
* TODO: This function can be fused in the visitNode function to avoid the extra iteration.
*
* @param cb function that creates a SparkPlanGraph. This can be used as a cacheHolder for the
* object created to be used later.
*/
Expand All @@ -87,7 +88,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* Both Qual/Prof analysis.
* For Qual apps, the app.sqlIDtoProblematic won't be set because it is done later during the
* aggregation phase.
* @param sqlId the SQL ID being analyzed
*
* @param sqlId the SQL ID being analyzed
* @param potentialProblems a set of strings that represent the potential problems found in the
* SQL plan.
*/
Expand Down Expand Up @@ -119,26 +121,26 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* 1- allSQLMetrics: a list of SQLMetricInfoCase
* 2- wholeStage: a list of WholeStageCodeGenResults
* 3- unsupportedSQLPlan: a list of UnsupportedSQLPlan that contains the SQL ID, node ID,
* node name.
* TODO: Consider handling the construction of this list in a different way for the
* Qualification
* node name.
* TODO: Consider handling the construction of this list in a different way for the
* Qualification
* 4- sqlPlanNodeIdToStageIds: A map between (SQL ID, Node ID) and the set of stage IDs
*
* It has the following effect on the visitor object:
* 1- It updates the sqlIsDsOrRDD argument to True when the visited node is an RDD or Dataset.
* 2- If the SQLID is an RDD, the potentialProblems is cleared because once SQL is marked as RDD,
* all the other problems are ignored. Note that we need to set that flag only once to True
* for the given SQLID.
* all the other problems are ignored. Note that we need to set that flag only once to True
* for the given SQLID.
* 3- It appends the current node's potential problems to the SQLID problems only if the SQL is
* visitor.sqlIsDsOrRDD is False. Otherwise, it is kind of redundant to keep checking for
* potential problems for every node when they get to be ignored.
* visitor.sqlIsDsOrRDD is False. Otherwise, it is kind of redundant to keep checking for
* potential problems for every node when they get to be ignored.
*
* It has the following effect on the app object:
* 1- it updates dataSourceInfo with V2 and V1 data sources
* 2- it updates sqlIDtoProblematic the map between SQL ID and potential problems
*
*
* @param visitor the visitor context defined per SQLPlan
* @param node the node being currently visited.
* @param node the node being currently visited.
*/
protected def visitNode(visitor: SQLPlanVisitorContext,
node: SparkPlanGraphNode): Unit = {
Expand All @@ -161,7 +163,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
if (nodeIsDsOrRDD) {
// we want to report every node that is an RDD
val thisPlan = UnsupportedSQLPlan(visitor.sqlPIGEntry.sqlID, node.id, node.name, node.desc,
"Contains Dataset or RDD")
"Contains Dataset or RDD")
unsupportedSQLPlan += thisPlan
// If one node is RDD, the Sql should be set too
if (!visitor.sqlIsDsOrRDD) { // We need to set the flag only once for the given sqlID
Expand Down Expand Up @@ -274,41 +276,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap

def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
allSQLMetrics.flatMap { metric =>
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) =>
// Avoid getOrElse to reduce memory allocations
jc.sqlID.isDefined && jc.sqlID.get == metric.sqlID
}
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSet
val accumsOpt = app.taskStageAccumMap.get(metric.accumulatorId)
val taskMax = accumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
stageIdsForSQL.contains(a.stageId)
}
// If metricType is size, average or timing, we want to read field `update` value
// to get the min, median, max, and total. Otherwise, we want to use field `value`.
if (SQLMetricsStats.hasStats(metric.metricType)) {
val accumValues = filtered.map(_.update.getOrElse(0L)).sortWith(_ < _)
if (accumValues.isEmpty) {
None
}
else if (accumValues.length <= 1) {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
} else {
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
accumValues(accumValues.size - 1), accumValues.sum))
}
} else {
val accumValues = filtered.map(_.value.getOrElse(0L))
if (accumValues.isEmpty) {
None
} else {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.max))
}
}
case None => None
}

val accumTaskStats = app.accumManager.calculateAccStats(metric.accumulatorId)
// local mode driver gets updates
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
Expand All @@ -329,8 +297,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
None
}

if (taskMax.isDefined || driverMax.isDefined) {
val taskInfo = taskMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))
if (accumTaskStats.isDefined || driverMax.isDefined) {
val taskInfo = accumTaskStats.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))
val driverInfo = driverMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L))

val max = Math.max(taskInfo.max, driverInfo.max)
Expand Down Expand Up @@ -358,46 +326,44 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {

def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = {
// drop the metrics if there are no values
if (updates.isEmpty) {
def computeStatistics(taskUpdates: Seq[Long]): Option[StatisticsMetrics] = {
if (taskUpdates.isEmpty) {
None
} else if (updates.length == 1) {
Some(StatisticsMetrics(0L, 0L, 0L, updates.sum))
} else {
Some(StatisticsMetrics(
min = updates.head,
med = updates(updates.size / 2),
max = updates.last,
total = updates.sum
))
val min = taskUpdates.min
val max = taskUpdates.max
val sum = taskUpdates.sum
val median = if (taskUpdates.size % 2 == 0) {
val mid = taskUpdates.size / 2
(taskUpdates(mid) + taskUpdates(mid - 1)) / 2
} else {
taskUpdates(taskUpdates.size / 2)
}
Some(StatisticsMetrics(min, median, max, sum))
}
}

// Process taskStageAccumMap to get all the accumulators
val stageLevelAccums = app.taskStageAccumMap.values.flatten
val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId)
groupedByAccumulatorId.flatMap { case (accumulatorId, accums) =>
// Extract and sort the update values, defaulting to 0 if not present
val sortedUpdates = accums.flatMap(_.update).toSeq.sorted

// Compute the statistics for the accumulator if applicable
computeStatistics(sortedUpdates).map { stats =>
val sampleAccum = accums.head
AccumProfileResults(
appIndex = appIndex,
stageId = sampleAccum.stageId.toString,
accumulatorId = accumulatorId,
name = sampleAccum.name.getOrElse("Unknown"),
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
}.toSeq
}
app.accumManager.accumInfoMap.flatMap{ accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val tasks = app.taskManager.getAllTasksStageAttempt(stageId)
val taskUpdatesSorted = tasks.map( task => {
accumInfo.taskUpdatesMap.getOrElse(task.taskId, 0L)
}).toSeq.sorted
computeStatistics(taskUpdatesSorted).map { stats =>
AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
})
}
}.toSeq
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ object SQLPlanParser extends Logging {

def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Set[Int] = {
val nodeAccums = node.metrics.map(_.accumulatorId)
nodeAccums.flatMap { nodeAccumId =>
// val res = app.accumManager.getAccStageIds(nodeAccumId)
app.stageManager.getStagesIdsByAccumId(nodeAccumId)
}.toSet
nodeAccums.flatMap(app.accumManager.getAccStageIds).toSet
}

// Set containing execs that refers to other expressions. We need this to be a list to allow
Expand Down Expand Up @@ -614,15 +611,10 @@ object SQLPlanParser extends Logging {
* the duration.
*/
def getTotalDuration(accumId: Option[Long], app: AppBase): Option[Long] = {
val taskForAccum = accumId.flatMap(id => app.taskStageAccumMap.get(id))
.getOrElse(ArrayBuffer.empty)
val accumValues = taskForAccum.map(_.value.getOrElse(0L))
val maxDuration = if (accumValues.isEmpty) {
None
} else {
Some(accumValues.max)
accumId match {
case Some(x) => app.accumManager.getMaxStageValue(x)
case _ => None
}
maxDuration
}

def getDriverTotalDuration(accumId: Option[Long], app: AppBase): Option[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,31 @@ class AutoTuner(
recommendFileCache()
recommendMaxPartitionBytes()
recommendShufflePartitions()
recommendKryoSerializerSetting()
recommendGCProperty()
recommendClassPathEntries()
recommendSystemProperties()
}

// if the user set the serializer to use Kryo, make sure we recommend using the GPU version
// of it.
def recommendKryoSerializerSetting(): Unit = {
getPropertyValue("spark.serializer") match {
case Some(f) if f.contains("org.apache.spark.serializer.KryoSerializer") =>
val existingRegistrars = getPropertyValue("spark.kryo.registrator")
val regToUse = if (existingRegistrars.isDefined && !existingRegistrars.get.isEmpty) {
// spark.kryo.registrator is a comma separated list. If the user set some then
// we need to append our GpuKryoRegistrator to ones they specified.
existingRegistrars.get + ",com.nvidia.spark.rapids.GpuKryoRegistrator"
} else {
"com.nvidia.spark.rapids.GpuKryoRegistrator"
}
appendRecommendation("spark.kryo.registrator", regToUse)
case None =>
// do nothing
}
}

def getShuffleManagerClassName() : Option[String] = {
appInfoProvider.getSparkVersion.map { sparkVersion =>
val shuffleManagerVersion = sparkVersion.filterNot("().".toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CompareApplications(apps: Seq[ApplicationInfo]) extends Logging {
val normalizedByAppId = apps.map { app =>
val normalized = app.sqlPlans.mapValues { plan =>
SparkPlanInfoWithStage(plan,
app.stageManager.getAccumToSingleStage()).normalizeForStageComparison
app.accumManager.getAccumSingleStage).normalizeForStageComparison
}
(app.appId, normalized)
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object GenerateDot {
val accumSummary = accums.map { a =>
Seq(a.sqlID, a.accumulatorId, a.total)
}
val accumIdToStageId = app.stageManager.getAccumToSingleStage()
val accumIdToStageId = app.accumManager.getAccumSingleStage
val formatter = java.text.NumberFormat.getIntegerInstance
val stageIdToStageMetrics = app.taskManager.stageAttemptToTasks.collect { case (stageId, _) =>
val tasks = app.taskManager.getAllTasksStageAttempt(stageId)
Expand Down
Loading