Skip to content

Commit

Permalink
Profiler may crash if driverLog is not accessible
Browse files Browse the repository at this point in the history
Signed-off-by: Kuhu Shukla <[email protected]>
  • Loading branch information
kuhushukla committed Jan 16, 2024
1 parent 8ed191a commit 1097b0c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@

package com.nvidia.spark.rapids.tool.profiling

import scala.io.Source
import java.io.{BufferedReader, InputStreamReader}

import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}

import org.apache.spark.internal.Logging

class DriverLogProcessor(driverlogPath: String) extends Logging {
def processDriverLog(): Seq[DriverLogUnsupportedOperators] = {
val source = Source.fromFile(driverlogPath)
def processDriverLog(fs : FileSystem): Seq[DriverLogUnsupportedOperators] = {
var source : FSDataInputStream = null
// Create a map to store the counts for each operator and reason
var countsMap = Map[(String, String), Int]().withDefaultValue(0)
try {
source = fs.open(new Path(driverlogPath))
val reader = new BufferedReader(new InputStreamReader(source))
// Process each line in the file
for (line <- source.getLines()) {
val lines = reader.lines()
lines.forEach { line =>
// condition to check if the line contains unsupported operators
if (line.contains("cannot run on GPU") &&
!line.contains("not all expressions can be replaced")) {
if (line.mkString.contains("cannot run on GPU") &&
!line.mkString.contains("not all expressions can be replaced")) {
val operatorName = line.split("<")(1).split(">")(0)
val reason = line.split("because")(1).trim()
val key = (operatorName, reason)
Expand All @@ -41,7 +46,9 @@ class DriverLogProcessor(driverlogPath: String) extends Logging {
case e: Exception =>
logError(s"Unexpected exception processing driver log: $driverlogPath", e)
} finally {
source.close()
if (source != null) {
source.close()
}
}
countsMap.map(x => DriverLogUnsupportedOperators(x._1._1, x._2, x._1._2)).toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.nvidia.spark.rapids.tool.profiling

import java.io.IOException

import com.nvidia.spark.rapids.tool.EventLogPathProcessor

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil


/**
* A profiling tool to parse Spark Event Log
*/
Expand Down Expand Up @@ -78,7 +81,15 @@ object ProfileMain extends Logging {

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
if (driverLog.nonEmpty) {
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty)
val dLogPath = new org.apache.hadoop.fs.Path(driverLog)
try {
val fs = dLogPath.getFileSystem(hadoopConf)
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty, fs)
} catch {
case e : IOException =>
logError(s"Unexpected exception processing driver log: $driverLog," +
s" check the provided path", e)
}
}
profiler.profile(eventLogFsFiltered)
(0, filteredLogs.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
Expand Down Expand Up @@ -124,12 +125,12 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = {
def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean, fs : FileSystem): Unit = {
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)
try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
val unsupportedDriverOperators = driverLogProcessor.processDriverLog()
val unsupportedDriverOperators = driverLogProcessor.processDriverLog(fs)
profileOutputWriter.write(s"Unsupported operators in driver log",
unsupportedDriverOperators)
if (eventLogsEmpty && useAutoTuner) {
Expand Down

0 comments on commit 1097b0c

Please sign in to comment.