diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index c84261c18..0bddd520a 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -80,10 +80,14 @@ object ToolUtils extends Logging { val targetEx = i.getTargetException if (targetEx != null) { targetEx match { - case j: com.fasterxml.jackson.core.JsonParseException => + case j: com.fasterxml.jackson.core.io.JsonEOFException => + // Spark3.41+ embeds JsonEOFException in the InvocationTargetException + // We need to show a warning message instead of failing the entire app. + logWarning(s"Incomplete eventlog, ${j.getMessage}") + case k: com.fasterxml.jackson.core.JsonParseException => // this is a parser error thrown by spark-3.4+ which indicates the log is // malformed - throw j + throw k case z: ClassNotFoundException if z.getMessage != null => logWarning(s"ClassNotFoundException while parsing an event: ${z.getMessage}") case t: Throwable => @@ -94,10 +98,15 @@ object ToolUtils extends Logging { // Normally it should not happen that invocation target is null. logError(s"Unknown exception while parsing an event", i) } - case j: com.fasterxml.jackson.core.JsonParseException => + case j: com.fasterxml.jackson.core.io.JsonEOFException => + // Note that JsonEOFException is child of JsonParseException + // In case the eventlog is incomplete (i.e., inprogress), we show a warning message + // because we do not want to cause the entire app to fail. + logWarning(s"Incomplete eventlog, ${j.getMessage}") + case k: com.fasterxml.jackson.core.JsonParseException => // this is a parser error thrown by version prior to spark-3.4+ which indicates the // log is malformed - throw j + throw k } None } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 08c0fead2..a1b441a96 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.tool.qualification -import java.io.File +import java.io.{File, PrintWriter} import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -460,6 +460,67 @@ class QualificationSuite extends BaseTestSuite { runQualificationTest(logFiles, "nds_q86_test_expectation.csv", expectedStatus = expectedStatus) } + test("incomplete json file does not cause entire app to fail") { + // The purpose of this test is to make sure that the app is not skipped when the JSON parser + // encounters an unexpected EOF. + // There are two cases to evaluate: + // 1- An eventlog that has an end-to-end application but for some reason the EOF is incorrect + // 2- An eventlog of an unfinished app (missing SparkListenerApplicationEnd) + + TrampolineUtil.withTempDir { eventLogDir => + // generate the original eventlog + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, + "WholeStageFilterProject") { spark => + import spark.implicits._ + val df = spark.sparkContext.makeRDD(1 to 100, 3).toDF + val df2 = spark.sparkContext.makeRDD(1 to 100, 3).toDF + df.select($"value" as "a") + .join(df2.select($"value" as "b"), $"a" === $"b") + .filter("(((b < 100) AND (a > 50)) OR (a = 0))") + .sort($"b") + } + // create the following files: + // 1- inprogress eventlog that does not contain "SparkListenerApplicationEnd" (unfinished) + // 2- inprogress eventlog with a terminated app (incomplete) + val unfinishedLog = new File(s"$eventLogDir/unfinished.inprogress") + val incompleteLog = new File(s"$eventLogDir/eventlog.inprogress") + val pwList = Array(new PrintWriter(unfinishedLog), new PrintWriter(incompleteLog)) + val bufferedSource = Source.fromFile(eventLog) + try { + val allEventLines = bufferedSource.getLines.toList + val selectedLines: List[String] = allEventLines.dropRight(1) + selectedLines.foreach { line => + pwList.foreach(pw => pw.println(line)) + } + // add the "SparkListenerApplicationEnd" to the incompleteLog + pwList(1).println(allEventLines.last) + pwList.foreach( pw => + pw.print("{\"Event\":\"SparkListenerEnvironmentUpdate\"," + + "\"JVM Information\":{\"Java Home:") + ) + } finally { + bufferedSource.close() + pwList.foreach(pw => pw.close()) + } + // All the eventlogs should be parsed successfully + // Status counts: 3 SUCCESS, 0 FAILURE, 0 UNKNOWN + val logFiles = Array(eventLog, incompleteLog.getAbsolutePath, unfinishedLog.getAbsolutePath) + // test Qualification + val outpath = new File(s"$eventLogDir/output_folder") + val allArgs = Array( + "--output-directory", + outpath.getAbsolutePath()) + + val appArgs = new QualificationArgs(allArgs ++ logFiles) + val (exit, appSum) = QualificationMain.mainInternal(appArgs) + assert(exit == 0) + assert(appSum.size == 3) + // test Profiler + val apps = ToolTestUtils.processProfileApps(logFiles, sparkSession) + assert(apps.size == 3) + } + } + test("spark2 eventlog") { val profileLogDir = ToolTestUtils.getTestResourcePath("spark-events-profiling") val log = s"$profileLogDir/spark2-eventlog.zstd"