diff --git a/core/pom.xml b/core/pom.xml index d71f1eb3e..30394d021 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -529,7 +529,8 @@ - ${project.basedir}/src/main/resources + ${project.basedir}/src/main/resources + true ${project.basedir}/.. diff --git a/core/src/main/resources/configs/build.properties b/core/src/main/resources/configs/build.properties new file mode 100644 index 000000000..260cfed6e --- /dev/null +++ b/core/src/main/resources/configs/build.properties @@ -0,0 +1,25 @@ +# +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +################################## +# Build Properties +################################## + +build.version=${project.version} +build.spark.version=${spark.version} +build.hadoop.version=${hadoop.version} +build.java.version=${java.version} +build.scala.version=${scala.version} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala index 3dcc86a40..b340ed3eb 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids.tool import java.io.FileOutputStream +import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} @@ -64,6 +65,10 @@ class ToolTextFileWriter( outFile.foreach(_.writeBytes(stringToWrite)) } + def writeProperties(props: Properties, comment: String): Unit = { + outFile.foreach(props.store(_, comment)) + } + def flush(): Unit = { outFile.foreach { file => file.flush() diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 479d29784..31e90a514 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -26,11 +26,12 @@ import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor, PlatformFactory} import org.apache.hadoop.conf.Configuration -import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo import org.apache.spark.sql.rapids.tool.ui.ConsoleProgressBar +import org.apache.spark.sql.rapids.tool.util.RuntimeReporter -class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean) extends Logging { +class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean) + extends RuntimeReporter { private val nThreads = appArgs.numThreads.getOrElse( Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) @@ -41,8 +42,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea .setDaemon(true).setNameFormat("profileTool" + "-%d").build() private val threadPool = Executors.newFixedThreadPool(nThreads, threadFactory) .asInstanceOf[ThreadPoolExecutor] - private val outputDir = appArgs.outputDirectory().stripSuffix("/") + - s"/${Profiler.SUBDIR}" private val numOutputRows = appArgs.numOutputRows.getOrElse(1000) private val outputCSV: Boolean = appArgs.csv() @@ -51,6 +50,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private val useAutoTuner: Boolean = appArgs.autoTuner() private var progressBar: Option[ConsoleProgressBar] = None + override val outputDir = appArgs.outputDirectory().stripSuffix("/") + + s"/${Profiler.SUBDIR}" + logInfo(s"Threadpool size is $nThreads") /** @@ -60,6 +62,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea * what else we can do in parallel. */ def profile(eventLogInfos: Seq[EventLogInfo]): Unit = { + generateRuntimeReport() if (enablePB && eventLogInfos.nonEmpty) { // total count to start the PB cannot be 0 progressBar = Some(new ConsoleProgressBar("Profile Tool", eventLogInfos.length)) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index 0286cd582..6b06733e0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -25,17 +25,16 @@ import com.nvidia.spark.rapids.tool.EventLogInfo import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY import org.apache.hadoop.conf.Configuration -import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.qualification._ import org.apache.spark.sql.rapids.tool.ui.{ConsoleProgressBar, QualificationReportGenerator} import org.apache.spark.sql.rapids.tool.util._ -class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, +class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration, timeout: Option[Long], nThreads: Int, order: String, pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean, reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean, - penalizeTransitions: Boolean) extends Logging { + penalizeTransitions: Boolean) extends RuntimeReporter { private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]() @@ -52,6 +51,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, // Store application status reports indexed by event log path. private val appStatusReporter = new ConcurrentHashMap[String, QualAppResult] + override val outputDir = s"$outputPath/rapids_4_spark_qualification_output" private class QualifyThread(path: EventLogInfo) extends Runnable { def run: Unit = qualifyApp(path, hadoopConf) } @@ -60,6 +60,9 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, if (enablePB && allPaths.nonEmpty) { // total count to start the PB cannot be 0 progressBar = Some(new ConsoleProgressBar("Qual Tool", allPaths.length)) } + // generate metadata + generateRuntimeReport() + allPaths.foreach { path => try { threadPool.submit(new QualifyThread(path)) @@ -78,7 +81,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, progressBar.foreach(_.finishAll()) val allAppsSum = estimateAppFrequency(allApps.asScala.toSeq) - val qWriter = new QualOutputWriter(getReportOutputPath, reportReadSchema, printStdout, + val qWriter = new QualOutputWriter(outputDir, reportReadSchema, printStdout, order) // sort order and limit only applies to the report summary text file, // the csv file we write the entire data in descending order @@ -105,8 +108,9 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, } } if (uiEnabled) { - QualificationReportGenerator.generateDashBoard(getReportOutputPath, allAppsSum) + QualificationReportGenerator.generateDashBoard(outputDir, allAppsSum) } + sortedDescDetailed } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala index 10735692a..0a28c1342 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RapidsToolsConfUtil.scala @@ -16,6 +16,11 @@ package org.apache.spark.sql.rapids.tool.util +import java.io.FileNotFoundException +import java.util.Properties + +import scala.io.Source + import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging @@ -26,6 +31,9 @@ import org.apache.spark.sql.SparkSession */ object RapidsToolsConfUtil extends Logging { private val RAPIDS_TOOLS_HADOOP_CONF_PREFIX = s"${RAPIDS_TOOLS_SYS_PROP_PREFIX}hadoop." + // Directory name inside resources that hosts all the configurations in plain properties format + private val CONFIG_DIR = "/configs" + private val BUILD_PROPS_FILE_NAME = "build.properties" /** * Creates a sparkConfiguration object with system properties applied on-top. @@ -72,4 +80,27 @@ object RapidsToolsConfUtil extends Logging { destMap.set(k, value) } } + + /** + * Reads a properties file from resources/configs. + * If the file cannot be loaded, an error message will show in the log. + * Note that this should not happen because it is an internal functionality. + * @param fileName the name of the file in teh directory + * @return a Java properties object + */ + private def loadPropFile(fileName: String): Properties = { + val props: Properties = new SortedJProperties + val propsFilePath = s"$CONFIG_DIR/$fileName" + getClass.getResourceAsStream(propsFilePath) match { + case null => // return empty properties if the file cannot be loaded + logError("Cannot load properties from file", new FileNotFoundException(fileName)) + case stream => + props.load(Source.fromInputStream(stream).bufferedReader()) + } + props + } + + def loadBuildProperties: Properties = { + loadPropFile(BUILD_PROPS_FILE_NAME) + } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala new file mode 100644 index 000000000..72505b9f1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.tool.util + +import java.io.{PrintWriter, StringWriter} + +import com.nvidia.spark.rapids.tool.ToolTextFileWriter +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.rapids.tool.ToolUtils + + +trait RuntimeReporter extends Logging { + val outputDir: String + def generateRuntimeReport(hadoopConf: Option[Configuration] = None): Unit = { + RuntimeReportGenerator.generateReport(outputDir, hadoopConf) + } +} + +/** + * Generates a file containing the properties of the build loaded. + * In addition, it concatenates properties from the runtime (i.e., SparkVersion). + * It is expected that the list of properties in that file will grow depending on whether a + * property helps understanding and investigating the tools output. + * @param outputDir the directory where the report is generated. + * @param hadoopConf the hadoop configuration object used to access the HDFS if any. + */ +object RuntimeReportGenerator extends Logging { + private val REPORT_LABEL = "RAPIDS Accelerator for Apache Spark's Build/Runtime Information" + private val REPORT_FILE_NAME = "runtime.properties" + def generateReport(outputDir: String, hadoopConf: Option[Configuration] = None): Unit = { + val buildProps = RapidsToolsConfUtil.loadBuildProperties + // Add the Spark version used in runtime. + // Note that it is different from the Spark version used in the build. + buildProps.setProperty("runtime.spark.version", ToolUtils.sparkRuntimeVersion) + val reportWriter = new ToolTextFileWriter(outputDir, REPORT_FILE_NAME, REPORT_LABEL, hadoopConf) + try { + reportWriter.writeProperties(buildProps, REPORT_LABEL) + } finally { + reportWriter.close() + } + // Write the properties to the log + val writer = new StringWriter + buildProps.list(new PrintWriter(writer)) + logInfo(s"\n$REPORT_LABEL\n${writer.getBuffer.toString}") + } +} + + diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala new file mode 100644 index 000000000..e80affe24 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/SortedJProperties.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.tool.util + +import java.io.{IOException, OutputStream} +import java.util.{Collections, Comparator, Enumeration, Map, Properties, Set, TreeSet} + + + +/** + * This is an implementation of Java Properties that stores the properties + * into a file after sorting them by key. + * Another approach would be load the properties into a hashMap. However, this + * won't take into consideration serialization rules and comments. + * This implementation works for Java8+. + * See the following answer on StackOverflow: + * https://stackoverflow.com/questions/10275862/how-to-sort-properties-in-java/55957344#55957344 + */ +class SortedJProperties extends Properties { + @throws[IOException] + override def store(out: OutputStream, comments: String): Unit = { + val sortedProps: Properties = new Properties() { + override def entrySet: Set[Map.Entry[AnyRef, AnyRef]] = { + /* + * Using comparator to avoid the following exception on jdk >=9: + * java.lang.ClassCastException: java.base/java.util.concurrent.ConcurrentHashMap$MapEntry + * cannot be cast to java.base/java.lang.Comparable + */ + val sortedSet: Set[Map.Entry[AnyRef, AnyRef]] = + new TreeSet[Map.Entry[AnyRef, AnyRef]]( + new Comparator[Map.Entry[AnyRef, AnyRef]]() { + override def compare(o1: Map.Entry[AnyRef, AnyRef], + o2: Map.Entry[AnyRef, AnyRef]): Int = + o1.getKey.toString.compareTo(o2.getKey.toString) + }) + sortedSet.addAll(super.entrySet) + sortedSet + } + + override def keySet: Set[AnyRef] = new TreeSet[AnyRef](super.keySet) + + override def keys: Enumeration[AnyRef] = + Collections.enumeration(new TreeSet[AnyRef](super.keySet)) + } + sortedProps.putAll(this) + sortedProps.store(out, comments) + } +}