Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiler may crash if driverLog is not accessible #725

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@

package com.nvidia.spark.rapids.tool.profiling

import scala.io.Source
import java.io.{BufferedReader, InputStreamReader}

import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}

import org.apache.spark.internal.Logging

class DriverLogProcessor(driverlogPath: String) extends Logging {
def processDriverLog(): Seq[DriverLogUnsupportedOperators] = {
val source = Source.fromFile(driverlogPath)
def processDriverLog(fs : FileSystem): Seq[DriverLogUnsupportedOperators] = {
var source : FSDataInputStream = null
// Create a map to store the counts for each operator and reason
var countsMap = Map[(String, String), Int]().withDefaultValue(0)
try {
source = fs.open(new Path(driverlogPath))
val reader = new BufferedReader(new InputStreamReader(source))
// Process each line in the file
for (line <- source.getLines()) {
val lines = reader.lines()
lines.forEach { line =>
// condition to check if the line contains unsupported operators
if (line.contains("cannot run on GPU") &&
!line.contains("not all expressions can be replaced")) {
if (line.mkString.contains("cannot run on GPU") &&
!line.mkString.contains("not all expressions can be replaced")) {
val operatorName = line.split("<")(1).split(">")(0)
val reason = line.split("because")(1).trim()
val key = (operatorName, reason)
Expand All @@ -41,7 +46,9 @@ class DriverLogProcessor(driverlogPath: String) extends Logging {
case e: Exception =>
logError(s"Unexpected exception processing driver log: $driverlogPath", e)
} finally {
source.close()
if (source != null) {
source.close()
}
}
countsMap.map(x => DriverLogUnsupportedOperators(x._1._1, x._2, x._1._2)).toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids.tool.profiling

import java.io.IOException

import com.nvidia.spark.rapids.tool.EventLogPathProcessor

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -78,7 +80,15 @@ object ProfileMain extends Logging {

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
if (driverLog.nonEmpty) {
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty)
val dLogPath = new org.apache.hadoop.fs.Path(driverLog)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein Regarding the comment in issue description,

Furthermore, the class is not using hadoop API to load the files, which means that only files on local path can be processed.

Is this referring to the use case when the driver logs can be located in cloud storage directories (s3, dbfs), similar to what we have for event logs?

In that case, we might have to use a similar approach as processing event logs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is exactly what this issue is about.
It does not make sense to me that we process eventlogs on remote storage but we are asking the user to download the driver-logs locally.

try {
val fs = dLogPath.getFileSystem(hadoopConf)
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty, fs)
} catch {
case e : IOException =>
logError(s"Unexpected exception processing driver log: $driverLog," +
s" check the provided path", e)
}
}
profiler.profile(eventLogFsFiltered)
(0, filteredLogs.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
Expand Down Expand Up @@ -124,12 +125,12 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = {
def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean, fs : FileSystem): Unit = {
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)
try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
val unsupportedDriverOperators = driverLogProcessor.processDriverLog()
val unsupportedDriverOperators = driverLogProcessor.processDriverLog(fs)
profileOutputWriter.write(s"Unsupported operators in driver log",
unsupportedDriverOperators)
if (eventLogsEmpty && useAutoTuner) {
Expand Down
Loading