Skip to content

Commit

Permalink
Add support of HiveTableScan and InsertIntoHive text-format
Browse files Browse the repository at this point in the history
Fixes #681

- Add unit-test to support scan-hive-text
- The check for the SerDe class to determine if the Hive format is
  supported or not
- Only text-hive is supported for now.
- Updated the read-format to include the ScanHive operations.
- Added support to NativeScan as an alternative to the "Node Scan"

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein committed Jan 10, 2024
1 parent 2883413 commit 30ac07a
Show file tree
Hide file tree
Showing 16 changed files with 859 additions and 77 deletions.
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,13 @@
<version>${delta.core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- add hive to test against detective hive Ops -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,10 +68,12 @@ object DataWritingCommandExecParser {
// - InsertIntoHadoopFsRelationCommand is a logical command that does not have an entry in the
// supported execs file
// - SaveIntoDataSourceCommand is generated by DeltaLake Table writes
// - InsertIntoHiveTable is generated by Hive catalogue implementation
private val logicalWriteCommands = Set(
dataWriteCMD,
insertIntoHadoopCMD,
saveIntoDataSrcCMD
saveIntoDataSrcCMD,
HiveParseHelper.INSERT_INTO_HIVE_LABEL
)

// Note: Defines a list of the execs that include formatted data.
Expand All @@ -82,7 +84,8 @@ object DataWritingCommandExecParser {
// have speedup entry for the deltaLake write operation
private val logicalToPhysicalCmdMap = Map(
insertIntoHadoopCMD -> defaultPhysicalCMD,
saveIntoDataSrcCMD -> defaultPhysicalCMD
saveIntoDataSrcCMD -> defaultPhysicalCMD,
HiveParseHelper.INSERT_INTO_HIVE_LABEL-> defaultPhysicalCMD
)

// Map to hold the relation between writeExecCmd and the format.
Expand All @@ -105,7 +108,15 @@ object DataWritingCommandExecParser {
// Otherwise, fallback to the string parser
val dataFormat = specialWriteFormatMap.get(wCmd) match {
case Some(f) => f
case None => getWriteFormatString(node.desc)
case None =>
if (HiveParseHelper.isHiveTableInsertNode(node.name)) {
// Use Hive Utils to extract the format from insertIntoHiveTable based on the SerDe
// class.
HiveParseHelper.getWriteFormat(node)
} else {
// USe the default parser to extract the write-format
getWriteFormatString(node.desc)
}
}
val physicalCmd = logicalToPhysicalCmdMap(wCmd)
Some(DataWritingCmdWrapper(wCmd, physicalCmd, dataFormat))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,12 +37,17 @@ case class FileSourceScanExecParser(
val nodeName = node.name.trim
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)

val readInfo = ReadParser.parseReadNode(node)
val (execName, readInfo) = if (HiveParseHelper.isHiveTableScanNode(node)) {
// Use the hive parser
(HiveParseHelper.SCAN_HIVE_EXEC_NAME, HiveParseHelper.parseReadNode(node))
} else {
// Use the default parser
(fullExecName, ReadParser.parseReadNode(node))
}
val speedupFactor = checker.getSpeedupFactor(execName)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score), 1.0)
val overallSpeedup = Math.max(speedupFactor * score, 1.0)

// TODO - add in parsing expressions - average speedup across?
new ExecInfo(sqlID, nodeName, "", overallSpeedup, maxDuration, node.id, score > 0, None)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.EventUtils

// A wrapper class to map between
case class HiveScanSerdeClasses(className: String, format: String) extends Logging {
def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
logDebug(s"Parsing node as ScanHiveTable: ${node.desc}")
// Schema, pushedFilters empty for now as we cannot extract them yet from eventlogs
ReadMetaData("", "HiveTableRelation", "unknown", format)
}
}

// Utilities used to handle Hive Ops.
object HiveParseHelper extends Logging {
val SCAN_HIVE_LABEL = "scan hive"
val SCAN_HIVE_EXEC_NAME = "HiveTableScanExec"
val INSERT_INTO_HIVE_LABEL = "InsertIntoHiveTable"

// The following is a list of Classes we can look for is SerDe.
// We should maintain this table with custom classes as needed.
// Note that we map each SerDe to a format "Hive*" because the hive formats are still different
// compared to the native onces according to the documentation. For example, this is why the
// the "supportedDataSource.csv" has a "HiveText" entry.
private val LOADED_SERDE_CLASSES = Seq(
HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "HiveText"),
HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"HiveParquet"),
HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.avro.AvroSerDe", "HiveAvro"),
HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.OpenCSVSerde", "HiveCSV"),
HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.orc.OrcSerde", "HiveORC")
)

def isHiveTableInsertNode(nodeName: String): Boolean = {
nodeName.contains(INSERT_INTO_HIVE_LABEL)
}

def isHiveTableScanNode(nodeName: String): Boolean = {
nodeName.toLowerCase.startsWith(SCAN_HIVE_LABEL)
}

def isHiveTableScanNode(node: SparkPlanGraphNode): Boolean = {
isHiveTableScanNode(node.name)
}

// Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class.
// If the SerDe class does not match the lookups, it returns an "unknown" format.
def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
LOADED_SERDE_CLASSES.find(k => node.desc.contains(k.className)).map(
_.parseReadNode(node)).getOrElse(ReadMetaData("", "HiveTableRelation", "unknown", "unknown"))
}

// Given a "scan hive" NodeGraph, construct the MetaData of the write operation based on the
// SerDe class. If the SerDe class does not match the lookups, it returns an "unknown" format.
def getWriteFormat(node: SparkPlanGraphNode): String = {
val readMetaData = parseReadNode(node)
readMetaData.format
}

def isHiveEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties, "spark.sql.catalogImplementation", "", "hive")
}

// Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
// recommendations regarding ORC optimizations.
def isORCNativeEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties, "spark.sql.orc.impl", "native", "native") ||
EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreOrc", "true", "true")
}

// Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
// recommendations regarding Parquet optimizations.
def isConvertParquetEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreParquet", "true", "true")
}

// Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
// recommendations regarding Text optimizations for GPU.
def isRAPIDSTextHiveEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties,
"spark.rapids.sql.format.hive.text.enabled", "true", "true")
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,29 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
case class ReadMetaData(schema: String, location: String, filters: String, format: String)

object ReadParser extends Logging {
// It was found that some eventlogs could have "NativeScan" instead of "Scan"
val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan")
// DatasourceV2 node names that exactly match the following labels
val DATASOURCE_V2_NODE_EXACT_PREF = Set(
"BatchScan")
// DatasourceV2 node names that match partially on the following labels
val DATASOURCE_V2_NODE_PREF = Set(
"GpuScan",
"GpuBatchScan",
"JDBCRelation")

def isScanNode(nodeName: String): Boolean = {
SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_))
}

def isScanNode(node: SparkPlanGraphNode): Boolean = {
isScanNode(node.name)
}

def isDataSourceV2Node(node: SparkPlanGraphNode): Boolean = {
DATASOURCE_V2_NODE_EXACT_PREF.exists(node.name.equals(_)) ||
DATASOURCE_V2_NODE_PREF.exists(node.name.contains(_))
}

// strip off the struct<> part that Spark adds to the ReadSchema
def formatSchemaStr(schema: String): String = {
Expand All @@ -48,60 +71,65 @@ object ReadParser extends Logging {
}

def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
val schemaTag = "ReadSchema: "
val schema = if (node.desc.contains(schemaTag)) {
formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag))
if (HiveParseHelper.isHiveTableScanNode(node)) {
HiveParseHelper.parseReadNode(node)
} else {
""
}
val locationTag = "Location: "
val location = if (node.desc.contains(locationTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag)
// Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[
// and return only location
// Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet,
// PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1]
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
val schemaTag = "ReadSchema: "
val schema = if (node.desc.contains(schemaTag)) {
formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag))
} else {
stringWithBrackets
""
}
} else if (node.name.contains("JDBCRelation")) {
// see if we can report table or query
val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r
node.name match {
case JDBCPattern(tableName) => tableName
case _ => "unknown"
val locationTag = "Location: "
val location = if (node.desc.contains(locationTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag)
// Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[
// and return only location
// Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet,
// PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1]
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
} else {
stringWithBrackets
}
} else if (node.name.contains("JDBCRelation")) {
// see if we can report table or query
val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r
node.name match {
case JDBCPattern(tableName) => tableName
case _ => "unknown"
}
} else {
"unknown"
}
} else {
"unknown"
}
val pushedFilterTag = "PushedFilters: "
val pushedFilters = if (node.desc.contains(pushedFilterTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag)
// Remove prepended [ from the string if exists
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
val pushedFilterTag = "PushedFilters: "
val pushedFilters = if (node.desc.contains(pushedFilterTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag)
// Remove prepended [ from the string if exists
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
} else {
stringWithBrackets
}
} else {
stringWithBrackets
"unknown"
}
} else {
"unknown"
}
val formatTag = "Format: "
val fileFormat = if (node.desc.contains(formatTag)) {
val format = getFieldWithoutTag(node.desc, formatTag)
if (node.name.startsWith("Gpu")) {
s"${format}(GPU)"
val formatTag = "Format: "
val fileFormat = if (node.desc.contains(formatTag)) {
val format = getFieldWithoutTag(node.desc, formatTag)
if (node.name.startsWith("Gpu")) {
s"${format}(GPU)"
} else {
format
}
} else if (node.name.contains("JDBCRelation")) {
"JDBC"
} else {
format
"unknown"
}
} else if (node.name.contains("JDBCRelation")) {
"JDBC"
} else {
"unknown"
ReadMetaData(schema, location, pushedFilters, fileFormat)
}
ReadMetaData(schema, location, pushedFilters, fileFormat)

}

// For the read score we look at the read format and datatypes for each
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -257,7 +257,7 @@ object SQLPlanParser extends Logging {
ShuffledHashJoinExecParser(node, checker, sqlID, app).parse
case "Sort" =>
SortExecParser(node, checker, sqlID).parse
case s if (s.startsWith("Scan")) =>
case s if ReadParser.isScanNode(s) =>
FileSourceScanExecParser(node, checker, sqlID, app).parse
case "SortAggregate" =>
SortAggregateExecParser(node, checker, sqlID).parse
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.collection.mutable.{ArrayBuffer, Buffer, LinkedHashMap, ListBuffer}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo}
import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, HiveParseHelper, PlanInfo}
import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -1015,11 +1015,17 @@ object QualOutputWriter {

// Unsupported Execs and Execs that are not supported due to unsupported expressions, or if
// the operation is from a dataset, or if the operation contains a UDF.
// Note that we remove "scan hive" and "insertIntoHive" execs because it is already reported by
// the readFormatTypes and writeFormatTypes. Otherwise, we end up reporting the same exec
// twice.
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet
val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains)
val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x)
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x))
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)
|| HiveParseHelper.isHiveTableScanNode(x)
|| HiveParseHelper.isHiveTableInsertNode(x)
)
val unsupportedExecRows = actualunsupportedExecs.map { exec =>
// If the exec is in the ignore list, then set the ignore operator to true.
if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) {
Expand Down
Loading

0 comments on commit 30ac07a

Please sign in to comment.