diff --git a/core/pom.xml b/core/pom.xml
index c71551a21..94614472c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -524,6 +524,13 @@
${delta.core.version}
test
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+ test
+
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala
index 446eb389c..a75975a3a 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -68,10 +68,12 @@ object DataWritingCommandExecParser {
// - InsertIntoHadoopFsRelationCommand is a logical command that does not have an entry in the
// supported execs file
// - SaveIntoDataSourceCommand is generated by DeltaLake Table writes
+ // - InsertIntoHiveTable is generated by Hive catalogue implementation
private val logicalWriteCommands = Set(
dataWriteCMD,
insertIntoHadoopCMD,
- saveIntoDataSrcCMD
+ saveIntoDataSrcCMD,
+ HiveParseHelper.INSERT_INTO_HIVE_LABEL
)
// Note: Defines a list of the execs that include formatted data.
@@ -82,7 +84,8 @@ object DataWritingCommandExecParser {
// have speedup entry for the deltaLake write operation
private val logicalToPhysicalCmdMap = Map(
insertIntoHadoopCMD -> defaultPhysicalCMD,
- saveIntoDataSrcCMD -> defaultPhysicalCMD
+ saveIntoDataSrcCMD -> defaultPhysicalCMD,
+ HiveParseHelper.INSERT_INTO_HIVE_LABEL-> defaultPhysicalCMD
)
// Map to hold the relation between writeExecCmd and the format.
@@ -105,7 +108,15 @@ object DataWritingCommandExecParser {
// Otherwise, fallback to the string parser
val dataFormat = specialWriteFormatMap.get(wCmd) match {
case Some(f) => f
- case None => getWriteFormatString(node.desc)
+ case None =>
+ if (HiveParseHelper.isHiveTableInsertNode(node.name)) {
+ // Use Hive Utils to extract the format from insertIntoHiveTable based on the SerDe
+ // class.
+ HiveParseHelper.getWriteFormat(node)
+ } else {
+ // USe the default parser to extract the write-format
+ getWriteFormatString(node.desc)
+ }
}
val physicalCmd = logicalToPhysicalCmdMap(wCmd)
Some(DataWritingCmdWrapper(wCmd, physicalCmd, dataFormat))
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala
index f1b70b9c3..eb6fb98a3 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,12 +37,17 @@ case class FileSourceScanExecParser(
val nodeName = node.name.trim
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)
-
- val readInfo = ReadParser.parseReadNode(node)
+ val (execName, readInfo) = if (HiveParseHelper.isHiveTableScanNode(node)) {
+ // Use the hive parser
+ (HiveParseHelper.SCAN_HIVE_EXEC_NAME, HiveParseHelper.parseReadNode(node))
+ } else {
+ // Use the default parser
+ (fullExecName, ReadParser.parseReadNode(node))
+ }
+ val speedupFactor = checker.getSpeedupFactor(execName)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
- val speedupFactor = checker.getSpeedupFactor(fullExecName)
- val overallSpeedup = Math.max((speedupFactor * score), 1.0)
+ val overallSpeedup = Math.max(speedupFactor * score, 1.0)
// TODO - add in parsing expressions - average speedup across?
new ExecInfo(sqlID, nodeName, "", overallSpeedup, maxDuration, node.id, score > 0, None)
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala
new file mode 100644
index 000000000..2826bfe5e
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.planparser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
+import org.apache.spark.sql.rapids.tool.util.EventUtils
+
+// A wrapper class to map between
+case class HiveScanSerdeClasses(className: String, format: String) extends Logging {
+ def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
+ logDebug(s"Parsing node as ScanHiveTable: ${node.desc}")
+ // Schema, pushedFilters empty for now as we cannot extract them yet from eventlogs
+ ReadMetaData("", "HiveTableRelation", "unknown", format)
+ }
+}
+
+// Utilities used to handle Hive Ops.
+object HiveParseHelper extends Logging {
+ val SCAN_HIVE_LABEL = "scan hive"
+ val SCAN_HIVE_EXEC_NAME = "HiveTableScanExec"
+ val INSERT_INTO_HIVE_LABEL = "InsertIntoHiveTable"
+
+ // The following is a list of Classes we can look for is SerDe.
+ // We should maintain this table with custom classes as needed.
+ // Note that we map each SerDe to a format "Hive*" because the hive formats are still different
+ // compared to the native onces according to the documentation. For example, this is why the
+ // the "supportedDataSource.csv" has a "HiveText" entry.
+ private val LOADED_SERDE_CLASSES = Seq(
+ HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "HiveText"),
+ HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+ "HiveParquet"),
+ HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.avro.AvroSerDe", "HiveAvro"),
+ HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.OpenCSVSerde", "HiveCSV"),
+ HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.orc.OrcSerde", "HiveORC")
+ )
+
+ def isHiveTableInsertNode(nodeName: String): Boolean = {
+ nodeName.contains(INSERT_INTO_HIVE_LABEL)
+ }
+
+ def isHiveTableScanNode(nodeName: String): Boolean = {
+ nodeName.toLowerCase.startsWith(SCAN_HIVE_LABEL)
+ }
+
+ def isHiveTableScanNode(node: SparkPlanGraphNode): Boolean = {
+ isHiveTableScanNode(node.name)
+ }
+
+ // Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class.
+ // If the SerDe class does not match the lookups, it returns an "unknown" format.
+ def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
+ LOADED_SERDE_CLASSES.find(k => node.desc.contains(k.className)).map(
+ _.parseReadNode(node)).getOrElse(ReadMetaData("", "HiveTableRelation", "unknown", "unknown"))
+ }
+
+ // Given a "scan hive" NodeGraph, construct the MetaData of the write operation based on the
+ // SerDe class. If the SerDe class does not match the lookups, it returns an "unknown" format.
+ def getWriteFormat(node: SparkPlanGraphNode): String = {
+ val readMetaData = parseReadNode(node)
+ readMetaData.format
+ }
+
+ def isHiveEnabled(properties: collection.Map[String, String]): Boolean = {
+ EventUtils.isPropertyMatch(properties, "spark.sql.catalogImplementation", "", "hive")
+ }
+
+ // Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
+ // recommendations regarding ORC optimizations.
+ def isORCNativeEnabled(properties: collection.Map[String, String]): Boolean = {
+ EventUtils.isPropertyMatch(properties, "spark.sql.orc.impl", "native", "native") ||
+ EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreOrc", "true", "true")
+ }
+
+ // Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
+ // recommendations regarding Parquet optimizations.
+ def isConvertParquetEnabled(properties: collection.Map[String, String]): Boolean = {
+ EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreParquet", "true", "true")
+ }
+
+ // Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
+ // recommendations regarding Text optimizations for GPU.
+ def isRAPIDSTextHiveEnabled(properties: collection.Map[String, String]): Boolean = {
+ EventUtils.isPropertyMatch(properties,
+ "spark.rapids.sql.format.hive.text.enabled", "true", "true")
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala
index 3778718fd..eb15bea7c 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,29 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
case class ReadMetaData(schema: String, location: String, filters: String, format: String)
object ReadParser extends Logging {
+ // It was found that some eventlogs could have "NativeScan" instead of "Scan"
+ val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan")
+ // DatasourceV2 node names that exactly match the following labels
+ val DATASOURCE_V2_NODE_EXACT_PREF = Set(
+ "BatchScan")
+ // DatasourceV2 node names that match partially on the following labels
+ val DATASOURCE_V2_NODE_PREF = Set(
+ "GpuScan",
+ "GpuBatchScan",
+ "JDBCRelation")
+
+ def isScanNode(nodeName: String): Boolean = {
+ SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_))
+ }
+
+ def isScanNode(node: SparkPlanGraphNode): Boolean = {
+ isScanNode(node.name)
+ }
+
+ def isDataSourceV2Node(node: SparkPlanGraphNode): Boolean = {
+ DATASOURCE_V2_NODE_EXACT_PREF.exists(node.name.equals(_)) ||
+ DATASOURCE_V2_NODE_PREF.exists(node.name.contains(_))
+ }
// strip off the struct<> part that Spark adds to the ReadSchema
def formatSchemaStr(schema: String): String = {
@@ -48,60 +71,65 @@ object ReadParser extends Logging {
}
def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
- val schemaTag = "ReadSchema: "
- val schema = if (node.desc.contains(schemaTag)) {
- formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag))
+ if (HiveParseHelper.isHiveTableScanNode(node)) {
+ HiveParseHelper.parseReadNode(node)
} else {
- ""
- }
- val locationTag = "Location: "
- val location = if (node.desc.contains(locationTag)) {
- val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag)
- // Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[
- // and return only location
- // Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet,
- // PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1]
- if (stringWithBrackets.contains("[")) {
- stringWithBrackets.split("\\[", 2).last.replace("]", "")
+ val schemaTag = "ReadSchema: "
+ val schema = if (node.desc.contains(schemaTag)) {
+ formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag))
} else {
- stringWithBrackets
+ ""
}
- } else if (node.name.contains("JDBCRelation")) {
- // see if we can report table or query
- val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r
- node.name match {
- case JDBCPattern(tableName) => tableName
- case _ => "unknown"
+ val locationTag = "Location: "
+ val location = if (node.desc.contains(locationTag)) {
+ val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag)
+ // Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[
+ // and return only location
+ // Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet,
+ // PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1]
+ if (stringWithBrackets.contains("[")) {
+ stringWithBrackets.split("\\[", 2).last.replace("]", "")
+ } else {
+ stringWithBrackets
+ }
+ } else if (node.name.contains("JDBCRelation")) {
+ // see if we can report table or query
+ val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r
+ node.name match {
+ case JDBCPattern(tableName) => tableName
+ case _ => "unknown"
+ }
+ } else {
+ "unknown"
}
- } else {
- "unknown"
- }
- val pushedFilterTag = "PushedFilters: "
- val pushedFilters = if (node.desc.contains(pushedFilterTag)) {
- val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag)
- // Remove prepended [ from the string if exists
- if (stringWithBrackets.contains("[")) {
- stringWithBrackets.split("\\[", 2).last.replace("]", "")
+ val pushedFilterTag = "PushedFilters: "
+ val pushedFilters = if (node.desc.contains(pushedFilterTag)) {
+ val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag)
+ // Remove prepended [ from the string if exists
+ if (stringWithBrackets.contains("[")) {
+ stringWithBrackets.split("\\[", 2).last.replace("]", "")
+ } else {
+ stringWithBrackets
+ }
} else {
- stringWithBrackets
+ "unknown"
}
- } else {
- "unknown"
- }
- val formatTag = "Format: "
- val fileFormat = if (node.desc.contains(formatTag)) {
- val format = getFieldWithoutTag(node.desc, formatTag)
- if (node.name.startsWith("Gpu")) {
- s"${format}(GPU)"
+ val formatTag = "Format: "
+ val fileFormat = if (node.desc.contains(formatTag)) {
+ val format = getFieldWithoutTag(node.desc, formatTag)
+ if (node.name.startsWith("Gpu")) {
+ s"${format}(GPU)"
+ } else {
+ format
+ }
+ } else if (node.name.contains("JDBCRelation")) {
+ "JDBC"
} else {
- format
+ "unknown"
}
- } else if (node.name.contains("JDBCRelation")) {
- "JDBC"
- } else {
- "unknown"
+ ReadMetaData(schema, location, pushedFilters, fileFormat)
}
- ReadMetaData(schema, location, pushedFilters, fileFormat)
+
}
// For the read score we look at the read format and datatypes for each
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala
index 47b38d847..ce556b8a3 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -257,7 +257,7 @@ object SQLPlanParser extends Logging {
ShuffledHashJoinExecParser(node, checker, sqlID, app).parse
case "Sort" =>
SortExecParser(node, checker, sqlID).parse
- case s if (s.startsWith("Scan")) =>
+ case s if ReadParser.isScanNode(s) =>
FileSourceScanExecParser(node, checker, sqlID, app).parse
case "SortAggregate" =>
SortAggregateExecParser(node, checker, sqlID).parse
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala
index 43ced9605..3a4423bdc 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.collection.mutable.{ArrayBuffer, Buffer, LinkedHashMap, ListBuffer}
import com.nvidia.spark.rapids.tool.ToolTextFileWriter
-import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo}
+import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, HiveParseHelper, PlanInfo}
import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER}
import org.apache.hadoop.conf.Configuration
@@ -1015,11 +1015,17 @@ object QualOutputWriter {
// Unsupported Execs and Execs that are not supported due to unsupported expressions, or if
// the operation is from a dataset, or if the operation contains a UDF.
+ // Note that we remove "scan hive" and "insertIntoHive" execs because it is already reported by
+ // the readFormatTypes and writeFormatTypes. Otherwise, we end up reporting the same exec
+ // twice.
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet
val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains)
val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x)
- || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x))
+ || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)
+ || HiveParseHelper.isHiveTableScanNode(x)
+ || HiveParseHelper.isHiveTableInsertNode(x)
+ )
val unsupportedExecRows = actualunsupportedExecs.map { exec =>
// If the exec is in the ignore list, then set the ignore operator to true.
if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) {
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
index 7ffea6673..e645bb763 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,27 +19,49 @@ package org.apache.spark.sql.rapids.tool
import java.io.InputStream
import java.util.zip.GZIPInputStream
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.io.{Codec, Source}
import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo}
-import com.nvidia.spark.rapids.tool.planparser.ReadParser
+import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser}
+import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode
import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, DriverAccumCase, JobInfoClass, SQLExecutionInfoClass, StageInfoClass, TaskStageAccumCase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter}
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListenerEvent, StageInfo}
+import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobStart, StageInfo}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.qualification.MLFunctions
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil
import org.apache.spark.util.Utils
+// Handles updating and caching Spark Properties for a Spark application.
+// Properties stored in this container can be accessed to make decision about
+// certain analysis that depends on the context of the Spark properties.
+// TODO: we need to migrate SparkProperties, GpuMode to this trait.
+trait CacheableProps {
+ // A flag whether hive is enabled or not. Note that we assume that the
+ // property is global to the entire application once it is set. a.k.a, it cannot be disabled
+ // once it is was set to true.
+ var hiveEnabled = false
+ def handleEnvUpdateForCachedProps(event: SparkListenerEnvironmentUpdate): Unit = {
+ val sparkProperties = event.environmentDetails("Spark Properties").toMap
+ hiveEnabled ||= HiveParseHelper.isHiveEnabled(sparkProperties)
+ }
+
+ def handleJobStartForCachedProps(event: SparkListenerJobStart): Unit = {
+ // TODO: we need to improve this in order to support per-job-level
+ hiveEnabled ||= HiveParseHelper.isHiveEnabled(event.properties.asScala)
+ }
+}
+
abstract class AppBase(
val eventLogInfo: Option[EventLogInfo],
- val hadoopConf: Option[Configuration]) extends Logging {
+ val hadoopConf: Option[Configuration]) extends Logging with CacheableProps {
var sparkVersion: String = ""
var appEndTime: Option[Long] = None
@@ -272,6 +294,16 @@ abstract class AppBase(
}
}
+ // Finds all the nodes that scan a hive table
+ def getPlanInfoWithHiveScan(planInfo: SparkPlanInfo): Seq[SparkPlanInfo] = {
+ val childRes = planInfo.children.flatMap(getPlanInfoWithHiveScan(_))
+ if (isHiveTableScanNode(planInfo.nodeName)) {
+ childRes :+ planInfo
+ } else {
+ childRes
+ }
+ }
+
private def trimSchema(str: String): String = {
val index = str.lastIndexOf(",")
if (index != -1 && str.contains("...")) {
@@ -295,7 +327,7 @@ abstract class AppBase(
// Get ReadSchema of each Node and sanitize it for comparison
val trimmedNode = trimSchema(ReadParser.parseReadNode(node).schema)
readSchema.contains(trimmedNode)
- }).filter(x => x.name.startsWith("Scan")).head
+ }).filter(ReadParser.isScanNode(_)).head
dataSourceInfo += DataSourceCase(sqlID,
scanNode.id,
@@ -305,15 +337,29 @@ abstract class AppBase(
readSchema
)
}
+ // "scan hive" has no "ReadSchema" defined. So, we need to look explicitly for nodes
+ // that are scan hive and add them one by one to the dataSource
+ if (hiveEnabled) { // only scan for hive when the CatalogImplementation is using hive
+ val allPlanWithHiveScan = getPlanInfoWithHiveScan(planInfo)
+ allPlanWithHiveScan.foreach { hiveReadPlan =>
+ val sqlGraph = SparkPlanGraph(hiveReadPlan)
+ val hiveScanNode = sqlGraph.allNodes.head
+ val scanHiveMeta = HiveParseHelper.parseReadNode(hiveScanNode)
+ dataSourceInfo += DataSourceCase(sqlID,
+ hiveScanNode.id,
+ scanHiveMeta.format,
+ scanHiveMeta.location,
+ scanHiveMeta.filters,
+ scanHiveMeta.schema
+ )
+ }
+ }
}
// This will find scans for DataSource V2, if the schema is very large it
// will likely be incomplete and have ... at the end.
protected def checkGraphNodeForReads(sqlID: Long, node: SparkPlanGraphNode): Unit = {
- if (node.name.equals("BatchScan") ||
- node.name.contains("GpuScan") ||
- node.name.contains("GpuBatchScan") ||
- node.name.contains("JDBCRelation")) {
+ if (ReadParser.isDataSourceV2Node(node)) {
val res = ReadParser.parseReadNode(node)
dataSourceInfo += DataSourceCase(sqlID,
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala
index 15c1b30ef..8bad42407 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -304,6 +304,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
app: T,
event: SparkListenerJobStart): Unit = {
logDebug("Processing event: " + event.getClass)
+ app.handleJobStartForCachedProps(event)
val sqlIDString = event.properties.getProperty("spark.sql.execution.id")
val sqlID = ProfileUtils.stringToLong(sqlIDString)
sqlID.foreach(app.jobIdToSqlID(event.jobId) = _)
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala
index b275a8c6a..36dc4081f 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala
@@ -46,6 +46,7 @@ class FilterAppInfo(
def doSparkListenerEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
logDebug("Processing event: " + event.getClass)
val envSparkProperties = event.environmentDetails("Spark Properties").toMap
+ handleEnvUpdateForCachedProps(event)
sparkProperties = Some(EnvironmentInfo(envSparkProperties))
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala
index 270303f40..6ebdb93cc 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -139,6 +139,7 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati
app: ApplicationInfo,
event: SparkListenerEnvironmentUpdate): Unit = {
logDebug("Processing event: " + event.getClass)
+ app.handleEnvUpdateForCachedProps(event)
app.sparkProperties = event.environmentDetails("Spark Properties").toMap
app.classpathEntries = event.environmentDetails("Classpath Entries").toMap
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala
index 390251177..3752fadac 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
app: QualificationAppInfo,
event: SparkListenerEnvironmentUpdate): Unit = {
logDebug("Processing event: " + event.getClass)
+ app.handleEnvUpdateForCachedProps(event)
val sparkProperties = event.environmentDetails("Spark Properties").toMap
if (ToolUtils.isPluginEnabled(sparkProperties)) {
throw GpuEventLogException(s"Cannot parse event logs from GPU run")
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala
index bdcae9aa5..4674d363b 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2023, NVIDIA CORPORATION.
+ * Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -81,4 +81,11 @@ object EventUtils extends Logging {
taskId, accuInfo.id, accuInfo.name, value, update, accuInfo.internal))
}
}
+
+ // A utility function used to read Spark properties and compare it to a given target.
+ // Note that it takes a default argument as well in case the property is not available.
+ def isPropertyMatch(properties: collection.Map[String, String], propKey: String,
+ defValue: String, targetValue: String): Boolean = {
+ properties.getOrElse(propKey, defValue).equals(targetValue)
+ }
}
diff --git a/core/src/test/resources/key-value-pairs.txt b/core/src/test/resources/key-value-pairs.txt
new file mode 100644
index 000000000..157527249
--- /dev/null
+++ b/core/src/test/resources/key-value-pairs.txt
@@ -0,0 +1,499 @@
+203val_203
+491val_491
+325val_325
+401val_401
+281val_281
+498val_498
+498val_498
+430val_430
+321val_321
+72val_72
+325val_325
+114val_114
+84val_84
+395val_395
+289val_289
+466val_466
+431val_431
+76val_76
+401val_401
+439val_439
+477val_477
+462val_462
+26val_26
+177val_177
+382val_382
+309val_309
+399val_399
+404val_404
+396val_396
+77val_77
+194val_194
+229val_229
+262val_262
+401val_401
+468val_468
+199val_199
+345val_345
+176val_176
+469val_469
+65val_65
+54val_54
+223val_223
+87val_87
+282val_282
+230val_230
+342val_342
+169val_169
+216val_216
+167val_167
+478val_478
+469val_469
+19val_19
+260val_260
+492val_492
+67val_67
+208val_208
+403val_403
+316val_316
+348val_348
+197val_197
+409val_409
+411val_411
+160val_160
+311val_311
+356val_356
+229val_229
+181val_181
+369val_369
+18val_18
+164val_164
+90val_90
+317val_317
+217val_217
+454val_454
+285val_285
+137val_137
+226val_226
+296val_296
+397val_397
+137val_137
+100val_100
+138val_138
+51val_51
+256val_256
+413val_413
+103val_103
+217val_217
+138val_138
+485val_485
+318val_318
+316val_316
+34val_34
+183val_183
+165val_165
+310val_310
+126val_126
+466val_466
+97val_97
+247val_247
+424val_424
+323val_323
+454val_454
+5val_5
+316val_316
+113val_113
+265val_265
+69val_69
+205val_205
+189val_189
+453val_453
+306val_306
+384val_384
+11val_11
+51val_51
+195val_195
+242val_242
+317val_317
+167val_167
+258val_258
+278val_278
+9val_9
+237val_237
+475val_475
+209val_209
+280val_280
+197val_197
+205val_205
+134val_134
+193val_193
+20val_20
+64val_64
+233val_233
+493val_493
+360val_360
+463val_463
+277val_277
+213val_213
+342val_342
+362val_362
+406val_406
+367val_367
+348val_348
+311val_311
+418val_418
+149val_149
+24val_24
+28val_28
+336val_336
+172val_172
+469val_469
+318val_318
+407val_407
+284val_284
+155val_155
+274val_274
+333val_333
+33val_33
+327val_327
+86val_86
+199val_199
+466val_466
+431val_431
+286val_286
+242val_242
+400val_400
+244val_244
+438val_438
+469val_469
+248val_248
+397val_397
+273val_273
+457val_457
+275val_275
+235val_235
+103val_103
+43val_43
+406val_406
+273val_273
+481val_481
+129val_129
+223val_223
+42val_42
+168val_168
+131val_131
+186val_186
+53val_53
+364val_364
+208val_208
+460val_460
+95val_95
+280val_280
+437val_437
+394val_394
+15val_15
+111val_111
+478val_478
+167val_167
+292val_292
+5val_5
+230val_230
+175val_175
+42val_42
+133val_133
+384val_384
+70val_70
+348val_348
+35val_35
+480val_480
+82val_82
+341val_341
+436val_436
+392val_392
+26val_26
+153val_153
+417val_417
+201val_201
+369val_369
+382val_382
+146val_146
+165val_165
+344val_344
+150val_150
+70val_70
+459val_459
+446val_446
+339val_339
+439val_439
+118val_118
+120val_120
+170val_170
+66val_66
+8val_8
+468val_468
+230val_230
+175val_175
+256val_256
+467val_467
+221val_221
+282val_282
+489val_489
+333val_333
+404val_404
+174val_174
+479val_479
+84val_84
+298val_298
+216val_216
+18val_18
+72val_72
+191val_191
+375val_375
+395val_395
+187val_187
+489val_489
+322val_322
+207val_207
+489val_489
+230val_230
+406val_406
+118val_118
+353val_353
+393val_393
+327val_327
+470val_470
+365val_365
+100val_100
+105val_105
+172val_172
+98val_98
+199val_199
+145val_145
+309val_309
+219val_219
+374val_374
+76val_76
+237val_237
+152val_152
+228val_228
+427val_427
+218val_218
+421val_421
+176val_176
+318val_318
+409val_409
+15val_15
+128val_128
+331val_331
+496val_496
+0val_0
+401val_401
+462val_462
+353val_353
+492val_492
+480val_480
+438val_438
+138val_138
+12val_12
+272val_272
+468val_468
+119val_119
+119val_119
+298val_298
+96val_96
+419val_419
+414val_414
+288val_288
+413val_413
+35val_35
+463val_463
+47val_47
+146val_146
+70val_70
+449val_449
+468val_468
+78val_78
+200val_200
+44val_44
+35val_35
+495val_495
+338val_338
+30val_30
+255val_255
+58val_58
+366val_366
+369val_369
+0val_0
+386val_386
+224val_224
+239val_239
+308val_308
+483val_483
+116val_116
+12val_12
+265val_265
+379val_379
+443val_443
+169val_169
+452val_452
+351val_351
+344val_344
+497val_497
+179val_179
+287val_287
+90val_90
+458val_458
+5val_5
+373val_373
+134val_134
+213val_213
+190val_190
+119val_119
+307val_307
+331val_331
+409val_409
+192val_192
+435val_435
+193val_193
+152val_152
+396val_396
+401val_401
+219val_219
+97val_97
+459val_459
+92val_92
+10val_10
+283val_283
+482val_482
+128val_128
+384val_384
+288val_288
+307val_307
+469val_469
+277val_277
+406val_406
+157val_157
+444val_444
+377val_377
+95val_95
+266val_266
+17val_17
+311val_311
+193val_193
+429val_429
+169val_169
+399val_399
+252val_252
+129val_129
+432val_432
+179val_179
+277val_277
+291val_291
+195val_195
+396val_396
+321val_321
+57val_57
+273val_273
+348val_348
+368val_368
+83val_83
+480val_480
+4val_4
+85val_85
+2val_2
+414val_414
+67val_67
+322val_322
+104val_104
+221val_221
+255val_255
+489val_489
+378val_378
+424val_424
+174val_174
+490val_490
+196val_196
+163val_163
+277val_277
+429val_429
+37val_37
+403val_403
+484val_484
+454val_454
+315val_315
+431val_431
+438val_438
+90val_90
+149val_149
+222val_222
+113val_113
+200val_200
+367val_367
+187val_187
+430val_430
+187val_187
+143val_143
+257val_257
+169val_169
+472val_472
+80val_80
+0val_0
+104val_104
+27val_27
+402val_402
+417val_417
+180val_180
+162val_162
+207val_207
+403val_403
+191val_191
+327val_327
+41val_41
+498val_498
+458val_458
+136val_136
+348val_348
+209val_209
+494val_494
+166val_166
+417val_417
+448val_448
+202val_202
+74val_74
+430val_430
+138val_138
+214val_214
+305val_305
+164val_164
+120val_120
+158val_158
+272val_272
+208val_208
+335val_335
+203val_203
+230val_230
+302val_302
+128val_128
+58val_58
+263val_263
+281val_281
+178val_178
+239val_239
+332val_332
+249val_249
+278val_278
+24val_24
+389val_389
+98val_98
+455val_455
+37val_37
+238val_238
+83val_83
+241val_241
+298val_298
+233val_233
+224val_224
+125val_125
+156val_156
+487val_487
+125val_125
\ No newline at end of file
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala
index 8214b3de6..dc63f27cd 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -74,7 +74,8 @@ object ToolTestUtils extends Logging {
}
def generateEventLog(eventLogDir: File, appName: String,
- confs: Option[Map[String, String]] = None)
+ confs: Option[Map[String, String]] = None,
+ enableHive: Boolean = false)
(fun: SparkSession => DataFrame): (String, String) = {
// we need to close any existing sessions to ensure that we can
@@ -87,8 +88,11 @@ object ToolTestUtils extends Logging {
.appName(appName)
.config("spark.eventLog.enabled", "true")
.config("spark.eventLog.dir", eventLogDir.getAbsolutePath)
+ if (enableHive) {
+ sparkBuilder.enableHiveSupport()
+ }
confs.foreach(_.foreach {case (k, v) => sparkBuilder.config(k, v)})
- lazy val spark = sparkBuilder.getOrCreate()
+ lazy val spark = sparkBuilder.getOrCreate()
// execute the query and generate events
val df = fun(spark)
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala
index ca13bd7e4..df43104ee 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -1479,6 +1479,70 @@ class QualificationSuite extends BaseTestSuite {
}
}
}
+
+ test("scan hive text-format is supported") {
+ // The unit test loads text file into Hive table. Then it runs SQL hive query that generates
+ // "Scan hive". If the Qualification fails to support the "Scan hive", then the format would
+ // appear in the unsupportedOperators.csv file or the "non-supported read format" column
+ TrampolineUtil.withTempDir { warehouseDir =>
+ // text file is pair-key-value "key: val_$key"
+ val textFilePath = ToolTestUtils.getTestResourcePath("key-value-pairs.txt")
+ // set the directory where the store is kept
+ TrampolineUtil.withTempDir { outpath =>
+ val derbyDir = s"${outpath.getAbsolutePath}/derby"
+ System.setProperty("derby.system.home", s"$derbyDir")
+ val sparkConfs = Map(
+ "spark.sql.warehouse.dir" -> warehouseDir.getAbsolutePath,
+ "spark.driver.extraJavaOptions" -> s"-Dderby.system.home='$derbyDir'")
+ val allArgs = Array(
+ "--report-read-schema", // enable report read schema
+ "--output-directory",
+ outpath.getAbsolutePath())
+
+ TrampolineUtil.withTempDir { eventLogDir =>
+ // set the name to "hiv3" on purpose to avoid any matches on "hive".
+ val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "scanHiv3App",
+ Some(sparkConfs), enableHive = true) { spark =>
+ // scalastyle:off line.size.limit
+ // the following set of queries will generate the following physical plan:
+ // [{"nodeName":"Scan hive default.src","simpleString":"Scan hive default.src [key#6, value#7],
+ // HiveTableRelation [`default`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
+ // Data Cols: [key#6, value#7], Partition Cols: []]","children":[],"metadata":{},
+ // "metrics":[{"name":"number of output rows","accumulatorId":12,"metricType":"sum"}]}]
+ // scalastyle:on line.size.limit
+ spark.sql("DROP TABLE IF EXISTS src")
+ spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
+ spark.sql(s"LOAD DATA LOCAL INPATH '$textFilePath' INTO TABLE src")
+ spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+ }
+
+ val appArgs = new QualificationArgs(allArgs ++ Array(eventLog))
+ val (exit, appSum) = QualificationMain.mainInternal(appArgs)
+
+ assert(exit == 0)
+ // Verify that results contain a single app
+ assert(appSum.nonEmpty)
+ // Verify that the texthive is listed in the readFileFormats column
+ // The "readSchema" column should be "hivetext[]:hivetext[]:hivetext[]:hivetext[]"
+ assert(appSum.head.readFileFormats.exists(_.contains("hivetext")))
+ // Verify that the texthive is not listed in the unsupported formats
+ // The "Unsupported Read File Formats and Types" column should be empty
+ assert(appSum.head.readFileFormatAndTypesNotSupported.isEmpty)
+ // Next, we check that the content of the unsupportedOps has no entry for "hive".
+ val unsupportedOpsCSV = s"$outpath/rapids_4_spark_qualification_output/" +
+ s"rapids_4_spark_qualification_output_unsupportedOperators.csv"
+ val inputSource = Source.fromFile(unsupportedOpsCSV)
+ try {
+ val unsupportedRows = inputSource.getLines.toSeq
+ assert(unsupportedRows.head.contains("App ID,Unsupported Type,"))
+ assert(!unsupportedRows.exists(_.contains("hive")))
+ } finally {
+ inputSource.close()
+ }
+ }
+ }
+ }
+ }
}
class ToolTestListener extends SparkListener {