Skip to content

Commit

Permalink
Support parsing of inprogress eventlogs
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Fixes NVIDIA#685

* Added feature: support parsing of "inprogress" eventlogs
* Catches exception thrown by Json Parser when an unexpected EOF occurs
  • Loading branch information
amahussein committed Dec 12, 2023
1 parent 9f60c9e commit 5c4b67f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ object ToolUtils extends Logging {
// Normally it should not happen that invocation target is null.
logError(s"Unknown exception while parsing an event", i)
}
case j: com.fasterxml.jackson.core.JsonParseException =>
case j: com.fasterxml.jackson.core.io.JsonEOFException =>
// Note that JsonEOFException is child of JsonParseException
// In case the eventlog is incomplete (i.e., inprogress), we show a warning message
// because we do not want to cause the entire app to fail.
logWarning(s"Incomplete eventlog, ${j.getMessage}")
case k: com.fasterxml.jackson.core.JsonParseException =>
// this is a parser error thrown by version prior to spark-3.4+ which indicates the
// log is malformed
throw j
throw k
}
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.qualification

import java.io.File
import java.io.{BufferedWriter, File, FileWriter, PrintWriter}
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
Expand Down Expand Up @@ -460,6 +460,67 @@ class QualificationSuite extends BaseTestSuite {
runQualificationTest(logFiles, "nds_q86_test_expectation.csv", expectedStatus = expectedStatus)
}

test("incomplete json file does not cause entire app to fail") {
// The purpose of this test is to make sure that the app is not skipped when the JSON parser
// encounters an unexpected EOF.
// There are two cases to evaluate:
// 1- An eventlog that has an end-to-end application but for some reason the EOF is incorrect
// 2- An eventlog of an unfinished app (missing SparkListenerApplicationEnd)

TrampolineUtil.withTempDir { eventLogDir =>
// generate the original eventlog
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
"WholeStageFilterProject") { spark =>
import spark.implicits._
val df = spark.sparkContext.makeRDD(1 to 100, 3).toDF
val df2 = spark.sparkContext.makeRDD(1 to 100, 3).toDF
df.select($"value" as "a")
.join(df2.select($"value" as "b"), $"a" === $"b")
.filter("(((b < 100) AND (a > 50)) OR (a = 0))")
.sort($"b")
}
// create the following files:
// 1- inprogress eventlog that does not contain "SparkListenerApplicationEnd" (unfinished)
// 2- inprogress eventlog with a terminated app (incomplete)
val unfinishedLog = new File(s"$eventLogDir/unfinished.inprogress")
val incompleteLog = new File(s"$eventLogDir/eventlog.inprogress")
val pwList = Array(new PrintWriter(unfinishedLog), new PrintWriter(incompleteLog))
val bufferedSource = Source.fromFile(eventLog)
try {
val allEventLines = bufferedSource.getLines.toList
val selectedLines: List[String] = allEventLines.dropRight(1)
selectedLines.foreach { line =>
pwList.foreach(pw => pw.println(line))
}
// add the "SparkListenerApplicationEnd" to the incompleteLog
pwList(1).println(allEventLines.last)
pwList.foreach( pw =>
pw.print("{\"Event\":\"SparkListenerEnvironmentUpdate\"," +
"\"JVM Information\":{\"Java Home:")
)
} finally {
bufferedSource.close()
pwList.foreach(pw => pw.close())
}
// All the eventlogs should be parsed successfully
// Status counts: 3 SUCCESS, 0 FAILURE, 0 UNKNOWN
val logFiles = Array(eventLog, incompleteLog.getAbsolutePath, unfinishedLog.getAbsolutePath)
// test Qualification
val outpath = new File(s"$eventLogDir/output_folder")
val allArgs = Array(
"--output-directory",
outpath.getAbsolutePath())

val appArgs = new QualificationArgs(allArgs ++ logFiles)
val (exit, appSum) = QualificationMain.mainInternal(appArgs)
assert(exit == 0)
assert(appSum.size == 3)
// test Profiler
val apps = ToolTestUtils.processProfileApps(logFiles, sparkSession)
assert(apps.size == 3)
}
}

test("spark2 eventlog") {
val profileLogDir = ToolTestUtils.getTestResourcePath("spark-events-profiling")
val log = s"$profileLogDir/spark2-eventlog.zstd"
Expand Down

0 comments on commit 5c4b67f

Please sign in to comment.