diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DriverLogProcessor.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DriverLogProcessor.scala index 6ead13466..b10e19741 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DriverLogProcessor.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/DriverLogProcessor.scala @@ -16,21 +16,26 @@ package com.nvidia.spark.rapids.tool.profiling -import scala.io.Source +import java.io.{BufferedReader, InputStreamReader} + +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} import org.apache.spark.internal.Logging class DriverLogProcessor(driverlogPath: String) extends Logging { - def processDriverLog(): Seq[DriverLogUnsupportedOperators] = { - val source = Source.fromFile(driverlogPath) + def processDriverLog(fs : FileSystem): Seq[DriverLogUnsupportedOperators] = { + var source : FSDataInputStream = null // Create a map to store the counts for each operator and reason var countsMap = Map[(String, String), Int]().withDefaultValue(0) try { + source = fs.open(new Path(driverlogPath)) + val reader = new BufferedReader(new InputStreamReader(source)) // Process each line in the file - for (line <- source.getLines()) { + val lines = reader.lines() + lines.forEach { line => // condition to check if the line contains unsupported operators - if (line.contains("cannot run on GPU") && - !line.contains("not all expressions can be replaced")) { + if (line.mkString.contains("cannot run on GPU") && + !line.mkString.contains("not all expressions can be replaced")) { val operatorName = line.split("<")(1).split(">")(0) val reason = line.split("because")(1).trim() val key = (operatorName, reason) @@ -41,7 +46,9 @@ class DriverLogProcessor(driverlogPath: String) extends Logging { case e: Exception => logError(s"Unexpected exception processing driver log: $driverlogPath", e) } finally { - source.close() + if (source != null) { + source.close() + } } countsMap.map(x => DriverLogUnsupportedOperators(x._1._1, x._2, x._1._2)).toSeq } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala index 71759a9ae..8bf0c6f76 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids.tool.profiling +import java.io.IOException + import com.nvidia.spark.rapids.tool.EventLogPathProcessor import org.apache.spark.internal.Logging @@ -78,7 +80,15 @@ object ProfileMain extends Logging { val profiler = new Profiler(hadoopConf, appArgs, enablePB) if (driverLog.nonEmpty) { - profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty) + val dLogPath = new org.apache.hadoop.fs.Path(driverLog) + try { + val fs = dLogPath.getFileSystem(hadoopConf) + profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty, fs) + } catch { + case e : IOException => + logError(s"Unexpected exception processing driver log: $driverLog," + + s" check the provided path", e) + } } profiler.profile(eventLogFsFiltered) (0, filteredLogs.size) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 479d29784..d278893d6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformFactory} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo @@ -124,12 +125,12 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea progressBar.foreach(_.finishAll()) } - def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = { + def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean, fs : FileSystem): Unit = { val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver", Profiler.DRIVER_LOG_NAME, numOutputRows, true) try { val driverLogProcessor = new DriverLogProcessor(driverLogInfos) - val unsupportedDriverOperators = driverLogProcessor.processDriverLog() + val unsupportedDriverOperators = driverLogProcessor.processDriverLog(fs) profileOutputWriter.write(s"Unsupported operators in driver log", unsupportedDriverOperators) if (eventLogsEmpty && useAutoTuner) {