From 90f29975e79a9a99ec37f1aa8f077fc9c114dde1 Mon Sep 17 00:00:00 2001
From: "Ahmed Hussein (amahussein)"
Date: Tue, 26 Dec 2023 11:20:47 -0600
Subject: [PATCH] Generate an output file with runtime and build information
Signed-off-by: Ahmed Hussein (amahussein)
Fixes #699
This PR is to dump the runtime/build information of the tools jar to the
output folder. By knowing which Tools version has been used and which
Spark version has been used in the runtime, developers can have more
insights about the output and the numbers generated by the Tools.
For Qualification:
- `rapids_4_spark_qualification_output/runtime.properties`
For Profiling:
- `rapids_4_spark_profile/runtime.properties`
A sample of the generated file is as follows:
Notice that
- `runtime.spark.version` is the spark version loaded
during runtime, while `build.spark.version` is the version used to build
the tools jar.
- `build.verion` represents the Tools jar version
```
build.hadoop.version=3.3.6
build.java.version=1.8.0_322
build.scala.version=2.12.15
build.spark.version=3.1.1
build.version=23.10.2-SNAPSHOT
runtime.spark.version=3.3.3
```
---
core/pom.xml | 3 +-
.../main/resources/configs/build.properties | 25 ++++++++
.../rapids/tool/ToolTextFileWriter.scala | 5 ++
.../rapids/tool/profiling/Profiler.scala | 11 ++--
.../tool/qualification/Qualification.scala | 14 ++--
.../tool/util/RapidsToolsConfUtil.scala | 31 +++++++++
.../tool/util/RuntimeReportGenerator.scala | 64 +++++++++++++++++++
.../rapids/tool/util/SortedJProperties.scala | 62 ++++++++++++++++++
8 files changed, 205 insertions(+), 10 deletions(-)
create mode 100644 core/src/main/resources/configs/build.properties
create mode 100644 core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala
create mode 100644 core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala
diff --git a/core/pom.xml b/core/pom.xml
index d71f1eb3e..30394d021 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -529,7 +529,8 @@
- ${project.basedir}/src/main/resources
+ ${project.basedir}/src/main/resources
+ true
${project.basedir}/..
diff --git a/core/src/main/resources/configs/build.properties b/core/src/main/resources/configs/build.properties
new file mode 100644
index 000000000..260cfed6e
--- /dev/null
+++ b/core/src/main/resources/configs/build.properties
@@ -0,0 +1,25 @@
+#
+# Copyright (c) 2024, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##################################
+# Build Properties
+##################################
+
+build.version=${project.version}
+build.spark.version=${spark.version}
+build.hadoop.version=${hadoop.version}
+build.java.version=${java.version}
+build.scala.version=${scala.version}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala
index 3dcc86a40..b340ed3eb 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala
@@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool
import java.io.FileOutputStream
+import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
@@ -64,6 +65,10 @@ class ToolTextFileWriter(
outFile.foreach(_.writeBytes(stringToWrite))
}
+ def writeProperties(props: Properties, comment: String): Unit = {
+ outFile.foreach(props.store(_, comment))
+ }
+
def flush(): Unit = {
outFile.foreach { file =>
file.flush()
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala
index 479d29784..31e90a514 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala
@@ -26,11 +26,12 @@ import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformFactory}
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.ui.ConsoleProgressBar
+import org.apache.spark.sql.rapids.tool.util.RuntimeReporter
-class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean) extends Logging {
+class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean)
+ extends RuntimeReporter {
private val nThreads = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
@@ -41,8 +42,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
.setDaemon(true).setNameFormat("profileTool" + "-%d").build()
private val threadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]
- private val outputDir = appArgs.outputDirectory().stripSuffix("/") +
- s"/${Profiler.SUBDIR}"
private val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
private val outputCSV: Boolean = appArgs.csv()
@@ -51,6 +50,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
private val useAutoTuner: Boolean = appArgs.autoTuner()
private var progressBar: Option[ConsoleProgressBar] = None
+ override val outputDir = appArgs.outputDirectory().stripSuffix("/") +
+ s"/${Profiler.SUBDIR}"
+
logInfo(s"Threadpool size is $nThreads")
/**
@@ -60,6 +62,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
* what else we can do in parallel.
*/
def profile(eventLogInfos: Seq[EventLogInfo]): Unit = {
+ generateRuntimeReport()
if (enablePB && eventLogInfos.nonEmpty) { // total count to start the PB cannot be 0
progressBar = Some(new ConsoleProgressBar("Profile Tool", eventLogInfos.length))
}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala
index 0286cd582..6b06733e0 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala
@@ -25,17 +25,16 @@ import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification._
import org.apache.spark.sql.rapids.tool.ui.{ConsoleProgressBar, QualificationReportGenerator}
import org.apache.spark.sql.rapids.tool.util._
-class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
+class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
timeout: Option[Long], nThreads: Int, order: String,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
- penalizeTransitions: Boolean) extends Logging {
+ penalizeTransitions: Boolean) extends RuntimeReporter {
private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()
@@ -52,6 +51,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
// Store application status reports indexed by event log path.
private val appStatusReporter = new ConcurrentHashMap[String, QualAppResult]
+ override val outputDir = s"$outputPath/rapids_4_spark_qualification_output"
private class QualifyThread(path: EventLogInfo) extends Runnable {
def run: Unit = qualifyApp(path, hadoopConf)
}
@@ -60,6 +60,9 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
if (enablePB && allPaths.nonEmpty) { // total count to start the PB cannot be 0
progressBar = Some(new ConsoleProgressBar("Qual Tool", allPaths.length))
}
+ // generate metadata
+ generateRuntimeReport()
+
allPaths.foreach { path =>
try {
threadPool.submit(new QualifyThread(path))
@@ -78,7 +81,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
progressBar.foreach(_.finishAll())
val allAppsSum = estimateAppFrequency(allApps.asScala.toSeq)
- val qWriter = new QualOutputWriter(getReportOutputPath, reportReadSchema, printStdout,
+ val qWriter = new QualOutputWriter(outputDir, reportReadSchema, printStdout,
order)
// sort order and limit only applies to the report summary text file,
// the csv file we write the entire data in descending order
@@ -105,8 +108,9 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
}
}
if (uiEnabled) {
- QualificationReportGenerator.generateDashBoard(getReportOutputPath, allAppsSum)
+ QualificationReportGenerator.generateDashBoard(outputDir, allAppsSum)
}
+
sortedDescDetailed
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala
index 10735692a..0a28c1342 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala
@@ -16,6 +16,11 @@
package org.apache.spark.sql.rapids.tool.util
+import java.io.FileNotFoundException
+import java.util.Properties
+
+import scala.io.Source
+
import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
@@ -26,6 +31,9 @@ import org.apache.spark.sql.SparkSession
*/
object RapidsToolsConfUtil extends Logging {
private val RAPIDS_TOOLS_HADOOP_CONF_PREFIX = s"${RAPIDS_TOOLS_SYS_PROP_PREFIX}hadoop."
+ // Directory name inside resources that hosts all the configurations in plain properties format
+ private val CONFIG_DIR = "/configs"
+ private val BUILD_PROPS_FILE_NAME = "build.properties"
/**
* Creates a sparkConfiguration object with system properties applied on-top.
@@ -72,4 +80,27 @@ object RapidsToolsConfUtil extends Logging {
destMap.set(k, value)
}
}
+
+ /**
+ * Reads a properties file from resources/configs.
+ * If the file cannot be loaded, an error message will show in the log.
+ * Note that this should not happen because it is an internal functionality.
+ * @param fileName the name of the file in teh directory
+ * @return a Java properties object
+ */
+ private def loadPropFile(fileName: String): Properties = {
+ val props: Properties = new SortedJProperties
+ val propsFilePath = s"$CONFIG_DIR/$fileName"
+ getClass.getResourceAsStream(propsFilePath) match {
+ case null => // return empty properties if the file cannot be loaded
+ logError("Cannot load properties from file", new FileNotFoundException(fileName))
+ case stream =>
+ props.load(Source.fromInputStream(stream).bufferedReader())
+ }
+ props
+ }
+
+ def loadBuildProperties: Properties = {
+ loadPropFile(BUILD_PROPS_FILE_NAME)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala
new file mode 100644
index 000000000..72505b9f1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.tool.util
+
+import java.io.{PrintWriter, StringWriter}
+
+import com.nvidia.spark.rapids.tool.ToolTextFileWriter
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.rapids.tool.ToolUtils
+
+
+trait RuntimeReporter extends Logging {
+ val outputDir: String
+ def generateRuntimeReport(hadoopConf: Option[Configuration] = None): Unit = {
+ RuntimeReportGenerator.generateReport(outputDir, hadoopConf)
+ }
+}
+
+/**
+ * Generates a file containing the properties of the build loaded.
+ * In addition, it concatenates properties from the runtime (i.e., SparkVersion).
+ * It is expected that the list of properties in that file will grow depending on whether a
+ * property helps understanding and investigating the tools output.
+ * @param outputDir the directory where the report is generated.
+ * @param hadoopConf the hadoop configuration object used to access the HDFS if any.
+ */
+object RuntimeReportGenerator extends Logging {
+ private val REPORT_LABEL = "RAPIDS Accelerator for Apache Spark's Build/Runtime Information"
+ private val REPORT_FILE_NAME = "runtime.properties"
+ def generateReport(outputDir: String, hadoopConf: Option[Configuration] = None): Unit = {
+ val buildProps = RapidsToolsConfUtil.loadBuildProperties
+ // Add the Spark version used in runtime.
+ // Note that it is different from the Spark version used in the build.
+ buildProps.setProperty("runtime.spark.version", ToolUtils.sparkRuntimeVersion)
+ val reportWriter = new ToolTextFileWriter(outputDir, REPORT_FILE_NAME, REPORT_LABEL, hadoopConf)
+ try {
+ reportWriter.writeProperties(buildProps, REPORT_LABEL)
+ } finally {
+ reportWriter.close()
+ }
+ // Write the properties to the log
+ val writer = new StringWriter
+ buildProps.list(new PrintWriter(writer))
+ logInfo(s"\n$REPORT_LABEL\n${writer.getBuffer.toString}")
+ }
+}
+
+
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala
new file mode 100644
index 000000000..e80affe24
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.tool.util
+
+import java.io.{IOException, OutputStream}
+import java.util.{Collections, Comparator, Enumeration, Map, Properties, Set, TreeSet}
+
+
+
+/**
+ * This is an implementation of Java Properties that stores the properties
+ * into a file after sorting them by key.
+ * Another approach would be load the properties into a hashMap. However, this
+ * won't take into consideration serialization rules and comments.
+ * This implementation works for Java8+.
+ * See the following answer on StackOverflow:
+ * https://stackoverflow.com/questions/10275862/how-to-sort-properties-in-java/55957344#55957344
+ */
+class SortedJProperties extends Properties {
+ @throws[IOException]
+ override def store(out: OutputStream, comments: String): Unit = {
+ val sortedProps: Properties = new Properties() {
+ override def entrySet: Set[Map.Entry[AnyRef, AnyRef]] = {
+ /*
+ * Using comparator to avoid the following exception on jdk >=9:
+ * java.lang.ClassCastException: java.base/java.util.concurrent.ConcurrentHashMap$MapEntry
+ * cannot be cast to java.base/java.lang.Comparable
+ */
+ val sortedSet: Set[Map.Entry[AnyRef, AnyRef]] =
+ new TreeSet[Map.Entry[AnyRef, AnyRef]](
+ new Comparator[Map.Entry[AnyRef, AnyRef]]() {
+ override def compare(o1: Map.Entry[AnyRef, AnyRef],
+ o2: Map.Entry[AnyRef, AnyRef]): Int =
+ o1.getKey.toString.compareTo(o2.getKey.toString)
+ })
+ sortedSet.addAll(super.entrySet)
+ sortedSet
+ }
+
+ override def keySet: Set[AnyRef] = new TreeSet[AnyRef](super.keySet)
+
+ override def keys: Enumeration[AnyRef] =
+ Collections.enumeration(new TreeSet[AnyRef](super.keySet))
+ }
+ sortedProps.putAll(this)
+ sortedProps.store(out, comments)
+ }
+}