Skip to content

Commit

Permalink
Generate an output file with runtime and build information
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Fixes #699

This PR is to dump the runtime/build information of the tools jar to the
output folder. By knowing which Tools version has been used and which
Spark version has been used in the runtime, developers can have more
insights about the output and the numbers generated by the Tools.

For Qualification:

- `rapids_4_spark_qualification_output/runtime.properties`

For Profiling:

- `rapids_4_spark_profile/runtime.properties`

A sample of the generated file is as follows:

Notice that

- `runtime.spark.version` is the spark version loaded
during runtime, while `build.spark.version` is the version used to build
the tools jar.

- `build.verion` represents the Tools jar version

```
build.hadoop.version=3.3.6
build.java.version=1.8.0_322
build.scala.version=2.12.15
build.spark.version=3.1.1
build.version=23.10.2-SNAPSHOT
runtime.spark.version=3.3.3
```
  • Loading branch information
amahussein committed Dec 26, 2023
1 parent 03effc2 commit 90f2997
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 10 deletions.
3 changes: 2 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,8 @@
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
<directory>${project.basedir}/src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>${project.basedir}/..</directory>
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/resources/configs/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

##################################
# Build Properties
##################################

build.version=${project.version}
build.spark.version=${spark.version}
build.hadoop.version=${hadoop.version}
build.java.version=${java.version}
build.scala.version=${scala.version}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool

import java.io.FileOutputStream
import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
Expand Down Expand Up @@ -64,6 +65,10 @@ class ToolTextFileWriter(
outFile.foreach(_.writeBytes(stringToWrite))
}

def writeProperties(props: Properties, comment: String): Unit = {
outFile.foreach(props.store(_, comment))
}

def flush(): Unit = {
outFile.foreach { file =>
file.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformFactory}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.ui.ConsoleProgressBar
import org.apache.spark.sql.rapids.tool.util.RuntimeReporter

class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean) extends Logging {
class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean)
extends RuntimeReporter {

private val nThreads = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
Expand All @@ -41,8 +42,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
.setDaemon(true).setNameFormat("profileTool" + "-%d").build()
private val threadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]
private val outputDir = appArgs.outputDirectory().stripSuffix("/") +
s"/${Profiler.SUBDIR}"
private val numOutputRows = appArgs.numOutputRows.getOrElse(1000)

private val outputCSV: Boolean = appArgs.csv()
Expand All @@ -51,6 +50,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
private val useAutoTuner: Boolean = appArgs.autoTuner()
private var progressBar: Option[ConsoleProgressBar] = None

override val outputDir = appArgs.outputDirectory().stripSuffix("/") +
s"/${Profiler.SUBDIR}"

logInfo(s"Threadpool size is $nThreads")

/**
Expand All @@ -60,6 +62,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
* what else we can do in parallel.
*/
def profile(eventLogInfos: Seq[EventLogInfo]): Unit = {
generateRuntimeReport()
if (enablePB && eventLogInfos.nonEmpty) { // total count to start the PB cannot be 0
progressBar = Some(new ConsoleProgressBar("Profile Tool", eventLogInfos.length))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification._
import org.apache.spark.sql.rapids.tool.ui.{ConsoleProgressBar, QualificationReportGenerator}
import org.apache.spark.sql.rapids.tool.util._

class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
timeout: Option[Long], nThreads: Int, order: String,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
penalizeTransitions: Boolean) extends Logging {
penalizeTransitions: Boolean) extends RuntimeReporter {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand All @@ -52,6 +51,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
// Store application status reports indexed by event log path.
private val appStatusReporter = new ConcurrentHashMap[String, QualAppResult]

override val outputDir = s"$outputPath/rapids_4_spark_qualification_output"
private class QualifyThread(path: EventLogInfo) extends Runnable {
def run: Unit = qualifyApp(path, hadoopConf)
}
Expand All @@ -60,6 +60,9 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
if (enablePB && allPaths.nonEmpty) { // total count to start the PB cannot be 0
progressBar = Some(new ConsoleProgressBar("Qual Tool", allPaths.length))
}
// generate metadata
generateRuntimeReport()

allPaths.foreach { path =>
try {
threadPool.submit(new QualifyThread(path))
Expand All @@ -78,7 +81,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
progressBar.foreach(_.finishAll())
val allAppsSum = estimateAppFrequency(allApps.asScala.toSeq)

val qWriter = new QualOutputWriter(getReportOutputPath, reportReadSchema, printStdout,
val qWriter = new QualOutputWriter(outputDir, reportReadSchema, printStdout,
order)
// sort order and limit only applies to the report summary text file,
// the csv file we write the entire data in descending order
Expand All @@ -105,8 +108,9 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
}
}
if (uiEnabled) {
QualificationReportGenerator.generateDashBoard(getReportOutputPath, allAppsSum)
QualificationReportGenerator.generateDashBoard(outputDir, allAppsSum)
}

sortedDescDetailed
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package org.apache.spark.sql.rapids.tool.util

import java.io.FileNotFoundException
import java.util.Properties

import scala.io.Source

import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
Expand All @@ -26,6 +31,9 @@ import org.apache.spark.sql.SparkSession
*/
object RapidsToolsConfUtil extends Logging {
private val RAPIDS_TOOLS_HADOOP_CONF_PREFIX = s"${RAPIDS_TOOLS_SYS_PROP_PREFIX}hadoop."
// Directory name inside resources that hosts all the configurations in plain properties format
private val CONFIG_DIR = "/configs"
private val BUILD_PROPS_FILE_NAME = "build.properties"

/**
* Creates a sparkConfiguration object with system properties applied on-top.
Expand Down Expand Up @@ -72,4 +80,27 @@ object RapidsToolsConfUtil extends Logging {
destMap.set(k, value)
}
}

/**
* Reads a properties file from resources/configs.
* If the file cannot be loaded, an error message will show in the log.
* Note that this should not happen because it is an internal functionality.
* @param fileName the name of the file in teh directory
* @return a Java properties object
*/
private def loadPropFile(fileName: String): Properties = {
val props: Properties = new SortedJProperties
val propsFilePath = s"$CONFIG_DIR/$fileName"
getClass.getResourceAsStream(propsFilePath) match {
case null => // return empty properties if the file cannot be loaded
logError("Cannot load properties from file", new FileNotFoundException(fileName))
case stream =>
props.load(Source.fromInputStream(stream).bufferedReader())
}
props
}

def loadBuildProperties: Properties = {
loadPropFile(BUILD_PROPS_FILE_NAME)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.tool.util

import java.io.{PrintWriter, StringWriter}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.ToolUtils


trait RuntimeReporter extends Logging {
val outputDir: String
def generateRuntimeReport(hadoopConf: Option[Configuration] = None): Unit = {
RuntimeReportGenerator.generateReport(outputDir, hadoopConf)
}
}

/**
* Generates a file containing the properties of the build loaded.
* In addition, it concatenates properties from the runtime (i.e., SparkVersion).
* It is expected that the list of properties in that file will grow depending on whether a
* property helps understanding and investigating the tools output.
* @param outputDir the directory where the report is generated.
* @param hadoopConf the hadoop configuration object used to access the HDFS if any.
*/
object RuntimeReportGenerator extends Logging {
private val REPORT_LABEL = "RAPIDS Accelerator for Apache Spark's Build/Runtime Information"
private val REPORT_FILE_NAME = "runtime.properties"
def generateReport(outputDir: String, hadoopConf: Option[Configuration] = None): Unit = {
val buildProps = RapidsToolsConfUtil.loadBuildProperties
// Add the Spark version used in runtime.
// Note that it is different from the Spark version used in the build.
buildProps.setProperty("runtime.spark.version", ToolUtils.sparkRuntimeVersion)
val reportWriter = new ToolTextFileWriter(outputDir, REPORT_FILE_NAME, REPORT_LABEL, hadoopConf)
try {
reportWriter.writeProperties(buildProps, REPORT_LABEL)
} finally {
reportWriter.close()
}
// Write the properties to the log
val writer = new StringWriter
buildProps.list(new PrintWriter(writer))
logInfo(s"\n$REPORT_LABEL\n${writer.getBuffer.toString}")
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.tool.util

import java.io.{IOException, OutputStream}
import java.util.{Collections, Comparator, Enumeration, Map, Properties, Set, TreeSet}



/**
* This is an implementation of Java Properties that stores the properties
* into a file after sorting them by key.
* Another approach would be load the properties into a hashMap. However, this
* won't take into consideration serialization rules and comments.
* This implementation works for Java8+.
* See the following answer on StackOverflow:
* https://stackoverflow.com/questions/10275862/how-to-sort-properties-in-java/55957344#55957344
*/
class SortedJProperties extends Properties {
@throws[IOException]
override def store(out: OutputStream, comments: String): Unit = {
val sortedProps: Properties = new Properties() {
override def entrySet: Set[Map.Entry[AnyRef, AnyRef]] = {
/*
* Using comparator to avoid the following exception on jdk >=9:
* java.lang.ClassCastException: java.base/java.util.concurrent.ConcurrentHashMap$MapEntry
* cannot be cast to java.base/java.lang.Comparable
*/
val sortedSet: Set[Map.Entry[AnyRef, AnyRef]] =
new TreeSet[Map.Entry[AnyRef, AnyRef]](
new Comparator[Map.Entry[AnyRef, AnyRef]]() {
override def compare(o1: Map.Entry[AnyRef, AnyRef],
o2: Map.Entry[AnyRef, AnyRef]): Int =
o1.getKey.toString.compareTo(o2.getKey.toString)
})
sortedSet.addAll(super.entrySet)
sortedSet
}

override def keySet: Set[AnyRef] = new TreeSet[AnyRef](super.keySet)

override def keys: Enumeration[AnyRef] =
Collections.enumeration(new TreeSet[AnyRef](super.keySet))
}
sortedProps.putAll(this)
sortedProps.store(out, comments)
}
}

0 comments on commit 90f2997

Please sign in to comment.