Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of HiveTableScan and InsertIntoHive text-format #723

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,13 @@
<version>${delta.core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- add hive jars to test hive Ops -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,10 +68,12 @@ object DataWritingCommandExecParser {
// - InsertIntoHadoopFsRelationCommand is a logical command that does not have an entry in the
// supported execs file
// - SaveIntoDataSourceCommand is generated by DeltaLake Table writes
// - InsertIntoHiveTable is generated by Hive catalogue implementation
private val logicalWriteCommands = Set(
dataWriteCMD,
insertIntoHadoopCMD,
saveIntoDataSrcCMD
saveIntoDataSrcCMD,
HiveParseHelper.INSERT_INTO_HIVE_LABEL
)

// Note: Defines a list of the execs that include formatted data.
Expand All @@ -82,7 +84,8 @@ object DataWritingCommandExecParser {
// have speedup entry for the deltaLake write operation
private val logicalToPhysicalCmdMap = Map(
insertIntoHadoopCMD -> defaultPhysicalCMD,
saveIntoDataSrcCMD -> defaultPhysicalCMD
saveIntoDataSrcCMD -> defaultPhysicalCMD,
HiveParseHelper.INSERT_INTO_HIVE_LABEL-> defaultPhysicalCMD
)

// Map to hold the relation between writeExecCmd and the format.
Expand All @@ -105,7 +108,15 @@ object DataWritingCommandExecParser {
// Otherwise, fallback to the string parser
val dataFormat = specialWriteFormatMap.get(wCmd) match {
case Some(f) => f
case None => getWriteFormatString(node.desc)
case None =>
if (HiveParseHelper.isHiveTableInsertNode(node.name)) {
// Use Hive Utils to extract the format from insertIntoHiveTable based on the SerDe
// class.
HiveParseHelper.getWriteFormat(node)
} else {
// USe the default parser to extract the write-format
getWriteFormatString(node.desc)
}
}
val physicalCmd = logicalToPhysicalCmdMap(wCmd)
Some(DataWritingCmdWrapper(wCmd, physicalCmd, dataFormat))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,12 +37,17 @@ case class FileSourceScanExecParser(
val nodeName = node.name.trim
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)

val readInfo = ReadParser.parseReadNode(node)
val (execName, readInfo) = if (HiveParseHelper.isHiveTableScanNode(node)) {
// Use the hive parser
(HiveParseHelper.SCAN_HIVE_EXEC_NAME, HiveParseHelper.parseReadNode(node))
} else {
// Use the default parser
(fullExecName, ReadParser.parseReadNode(node))
}
val speedupFactor = checker.getSpeedupFactor(execName)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score), 1.0)
val overallSpeedup = Math.max(speedupFactor * score, 1.0)

// TODO - add in parsing expressions - average speedup across?
new ExecInfo(sqlID, nodeName, "", overallSpeedup, maxDuration, node.id, score > 0, None)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.EventUtils

// A wrapper class to map between
case class HiveScanSerdeClasses(className: String, format: String) extends Logging {
def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
logDebug(s"Parsing node as ScanHiveTable: ${node.desc}")
// Schema, pushedFilters empty for now as we cannot extract them yet from eventlogs
ReadMetaData("", "HiveTableRelation", "unknown", format)
}
}

// Utilities used to handle Hive Ops.
object HiveParseHelper extends Logging {
val SCAN_HIVE_LABEL = "scan hive"
val SCAN_HIVE_EXEC_NAME = "HiveTableScanExec"
val INSERT_INTO_HIVE_LABEL = "InsertIntoHiveTable"

// The following is a list of Classes we can look for is SerDe.
// We should maintain this table with custom classes as needed.
// Note that we map each SerDe to a format "Hive*" because the hive formats are still different
// compared to the native onces according to the documentation. For example, this is why the
// the "supportedDataSource.csv" has a "HiveText" entry.
private val LOADED_SERDE_CLASSES = Seq(
HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "HiveText"),
HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"HiveParquet"),
HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.avro.AvroSerDe", "HiveAvro"),
HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.OpenCSVSerde", "HiveCSV"),
HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.orc.OrcSerde", "HiveORC")
)

def isHiveTableInsertNode(nodeName: String): Boolean = {
nodeName.contains(INSERT_INTO_HIVE_LABEL)
}

def isHiveTableScanNode(nodeName: String): Boolean = {
nodeName.toLowerCase.startsWith(SCAN_HIVE_LABEL)
}

def isHiveTableScanNode(node: SparkPlanGraphNode): Boolean = {
isHiveTableScanNode(node.name)
}

// Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class.
// If the SerDe class does not match the lookups, it returns an "unknown" format.
def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
LOADED_SERDE_CLASSES.find(k => node.desc.contains(k.className)).map(
_.parseReadNode(node)).getOrElse(ReadMetaData("", "HiveTableRelation", "unknown", "unknown"))
}

// Given a "scan hive" NodeGraph, construct the MetaData of the write operation based on the
// SerDe class. If the SerDe class does not match the lookups, it returns an "unknown" format.
def getWriteFormat(node: SparkPlanGraphNode): String = {
val readMetaData = parseReadNode(node)
readMetaData.format
}

def isHiveEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties, "spark.sql.catalogImplementation", "", "hive")
}

// Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
// recommendations regarding ORC optimizations.
def isORCNativeEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties, "spark.sql.orc.impl", "native", "native") ||
EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreOrc", "true", "true")
}

// Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
// recommendations regarding Parquet optimizations.
def isConvertParquetEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreParquet", "true", "true")
}

// Keep for future improvement as we can pass this information to the AutoTuner/user to suggest
// recommendations regarding Text optimizations for GPU.
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
def isRAPIDSTextHiveEnabled(properties: collection.Map[String, String]): Boolean = {
EventUtils.isPropertyMatch(properties,
"spark.rapids.sql.format.hive.text.enabled", "true", "true")
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,29 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
case class ReadMetaData(schema: String, location: String, filters: String, format: String)

object ReadParser extends Logging {
// It was found that some eventlogs could have "NativeScan" instead of "Scan"
val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan")
// DatasourceV2 node names that exactly match the following labels
val DATASOURCE_V2_NODE_EXACT_PREF = Set(
"BatchScan")
// DatasourceV2 node names that match partially on the following labels
val DATASOURCE_V2_NODE_PREF = Set(
"GpuScan",
"GpuBatchScan",
"JDBCRelation")

def isScanNode(nodeName: String): Boolean = {
SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_))
}

def isScanNode(node: SparkPlanGraphNode): Boolean = {
isScanNode(node.name)
}

def isDataSourceV2Node(node: SparkPlanGraphNode): Boolean = {
DATASOURCE_V2_NODE_EXACT_PREF.exists(node.name.equals(_)) ||
DATASOURCE_V2_NODE_PREF.exists(node.name.contains(_))
}

// strip off the struct<> part that Spark adds to the ReadSchema
def formatSchemaStr(schema: String): String = {
Expand All @@ -48,60 +71,65 @@ object ReadParser extends Logging {
}

def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = {
val schemaTag = "ReadSchema: "
val schema = if (node.desc.contains(schemaTag)) {
formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag))
if (HiveParseHelper.isHiveTableScanNode(node)) {
HiveParseHelper.parseReadNode(node)
} else {
""
}
val locationTag = "Location: "
val location = if (node.desc.contains(locationTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag)
// Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[
// and return only location
// Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet,
// PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1]
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
val schemaTag = "ReadSchema: "
val schema = if (node.desc.contains(schemaTag)) {
formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag))
} else {
stringWithBrackets
""
}
} else if (node.name.contains("JDBCRelation")) {
// see if we can report table or query
val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r
node.name match {
case JDBCPattern(tableName) => tableName
case _ => "unknown"
val locationTag = "Location: "
val location = if (node.desc.contains(locationTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag)
// Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[
// and return only location
// Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet,
// PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1]
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
} else {
stringWithBrackets
}
} else if (node.name.contains("JDBCRelation")) {
// see if we can report table or query
val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r
node.name match {
case JDBCPattern(tableName) => tableName
case _ => "unknown"
}
} else {
"unknown"
}
} else {
"unknown"
}
val pushedFilterTag = "PushedFilters: "
val pushedFilters = if (node.desc.contains(pushedFilterTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag)
// Remove prepended [ from the string if exists
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
val pushedFilterTag = "PushedFilters: "
val pushedFilters = if (node.desc.contains(pushedFilterTag)) {
val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag)
// Remove prepended [ from the string if exists
if (stringWithBrackets.contains("[")) {
stringWithBrackets.split("\\[", 2).last.replace("]", "")
} else {
stringWithBrackets
}
} else {
stringWithBrackets
"unknown"
}
} else {
"unknown"
}
val formatTag = "Format: "
val fileFormat = if (node.desc.contains(formatTag)) {
val format = getFieldWithoutTag(node.desc, formatTag)
if (node.name.startsWith("Gpu")) {
s"${format}(GPU)"
val formatTag = "Format: "
val fileFormat = if (node.desc.contains(formatTag)) {
val format = getFieldWithoutTag(node.desc, formatTag)
if (node.name.startsWith("Gpu")) {
s"${format}(GPU)"
} else {
format
}
} else if (node.name.contains("JDBCRelation")) {
"JDBC"
} else {
format
"unknown"
}
} else if (node.name.contains("JDBCRelation")) {
"JDBC"
} else {
"unknown"
ReadMetaData(schema, location, pushedFilters, fileFormat)
}
ReadMetaData(schema, location, pushedFilters, fileFormat)

}

// For the read score we look at the read format and datatypes for each
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -257,7 +257,7 @@ object SQLPlanParser extends Logging {
ShuffledHashJoinExecParser(node, checker, sqlID, app).parse
case "Sort" =>
SortExecParser(node, checker, sqlID).parse
case s if (s.startsWith("Scan")) =>
case s if ReadParser.isScanNode(s) =>
FileSourceScanExecParser(node, checker, sqlID, app).parse
case "SortAggregate" =>
SortAggregateExecParser(node, checker, sqlID).parse
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification
import scala.collection.mutable.{ArrayBuffer, Buffer, LinkedHashMap, ListBuffer}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo}
import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, HiveParseHelper, PlanInfo}
import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -1015,11 +1015,17 @@ object QualOutputWriter {

// Unsupported Execs and Execs that are not supported due to unsupported expressions, or if
// the operation is from a dataset, or if the operation contains a UDF.
// Note that we remove "scan hive" and "insertIntoHive" execs because it is already reported by
// the readFormatTypes and writeFormatTypes. Otherwise, we end up reporting the same exec
// twice.
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet
val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains)
val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x)
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x))
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)
|| HiveParseHelper.isHiveTableScanNode(x)
|| HiveParseHelper.isHiveTableInsertNode(x)
)
val unsupportedExecRows = actualunsupportedExecs.map { exec =>
// If the exec is in the ignore list, then set the ignore operator to true.
if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) {
Expand Down
Loading
Loading