From 30ac07a101759ea35801a05ba3a0a0732fc05019 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Thu, 28 Dec 2023 18:18:06 -0600 Subject: [PATCH 1/2] Add support of HiveTableScan and InsertIntoHive text-format Fixes #681 - Add unit-test to support scan-hive-text - The check for the SerDe class to determine if the Hive format is supported or not - Only text-hive is supported for now. - Updated the read-format to include the ScanHive operations. - Added support to NativeScan as an alternative to the "Node Scan" Signed-off-by: Ahmed Hussein (amahussein) --- core/pom.xml | 7 + .../DataWritingCommandExecParser.scala | 19 +- .../planparser/FileSourceScanExecParser.scala | 15 +- .../tool/planparser/HiveParseHelper.scala | 101 ++++ .../rapids/tool/planparser/ReadParser.scala | 120 +++-- .../tool/planparser/SQLPlanParser.scala | 4 +- .../tool/qualification/QualOutputWriter.scala | 12 +- .../spark/sql/rapids/tool/AppBase.scala | 64 ++- .../sql/rapids/tool/EventProcessorBase.scala | 3 +- .../spark/sql/rapids/tool/FilterAppInfo.scala | 1 + .../tool/profiling/EventsProcessor.scala | 3 +- .../QualificationEventProcessor.scala | 3 +- .../sql/rapids/tool/util/EventUtils.scala | 9 +- core/src/test/resources/key-value-pairs.txt | 499 ++++++++++++++++++ .../spark/rapids/tool/ToolTestUtils.scala | 10 +- .../qualification/QualificationSuite.scala | 66 ++- 16 files changed, 859 insertions(+), 77 deletions(-) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala create mode 100644 core/src/test/resources/key-value-pairs.txt diff --git a/core/pom.xml b/core/pom.xml index c71551a21..5de4986c2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -524,6 +524,13 @@ ${delta.core.version} test + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala index 446eb389c..a75975a3a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,10 +68,12 @@ object DataWritingCommandExecParser { // - InsertIntoHadoopFsRelationCommand is a logical command that does not have an entry in the // supported execs file // - SaveIntoDataSourceCommand is generated by DeltaLake Table writes + // - InsertIntoHiveTable is generated by Hive catalogue implementation private val logicalWriteCommands = Set( dataWriteCMD, insertIntoHadoopCMD, - saveIntoDataSrcCMD + saveIntoDataSrcCMD, + HiveParseHelper.INSERT_INTO_HIVE_LABEL ) // Note: Defines a list of the execs that include formatted data. @@ -82,7 +84,8 @@ object DataWritingCommandExecParser { // have speedup entry for the deltaLake write operation private val logicalToPhysicalCmdMap = Map( insertIntoHadoopCMD -> defaultPhysicalCMD, - saveIntoDataSrcCMD -> defaultPhysicalCMD + saveIntoDataSrcCMD -> defaultPhysicalCMD, + HiveParseHelper.INSERT_INTO_HIVE_LABEL-> defaultPhysicalCMD ) // Map to hold the relation between writeExecCmd and the format. @@ -105,7 +108,15 @@ object DataWritingCommandExecParser { // Otherwise, fallback to the string parser val dataFormat = specialWriteFormatMap.get(wCmd) match { case Some(f) => f - case None => getWriteFormatString(node.desc) + case None => + if (HiveParseHelper.isHiveTableInsertNode(node.name)) { + // Use Hive Utils to extract the format from insertIntoHiveTable based on the SerDe + // class. + HiveParseHelper.getWriteFormat(node) + } else { + // USe the default parser to extract the write-format + getWriteFormatString(node.desc) + } } val physicalCmd = logicalToPhysicalCmdMap(wCmd) Some(DataWritingCmdWrapper(wCmd, physicalCmd, dataFormat)) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala index f1b70b9c3..eb6fb98a3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,12 +37,17 @@ case class FileSourceScanExecParser( val nodeName = node.name.trim val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId) val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) - - val readInfo = ReadParser.parseReadNode(node) + val (execName, readInfo) = if (HiveParseHelper.isHiveTableScanNode(node)) { + // Use the hive parser + (HiveParseHelper.SCAN_HIVE_EXEC_NAME, HiveParseHelper.parseReadNode(node)) + } else { + // Use the default parser + (fullExecName, ReadParser.parseReadNode(node)) + } + val speedupFactor = checker.getSpeedupFactor(execName) // don't use the isExecSupported because we have finer grain. val score = ReadParser.calculateReadScoreRatio(readInfo, checker) - val speedupFactor = checker.getSpeedupFactor(fullExecName) - val overallSpeedup = Math.max((speedupFactor * score), 1.0) + val overallSpeedup = Math.max(speedupFactor * score, 1.0) // TODO - add in parsing expressions - average speedup across? new ExecInfo(sqlID, nodeName, "", overallSpeedup, maxDuration, node.id, score > 0, None) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala new file mode 100644 index 000000000..2826bfe5e --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HiveParseHelper.scala @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.ui.SparkPlanGraphNode +import org.apache.spark.sql.rapids.tool.util.EventUtils + +// A wrapper class to map between +case class HiveScanSerdeClasses(className: String, format: String) extends Logging { + def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = { + logDebug(s"Parsing node as ScanHiveTable: ${node.desc}") + // Schema, pushedFilters empty for now as we cannot extract them yet from eventlogs + ReadMetaData("", "HiveTableRelation", "unknown", format) + } +} + +// Utilities used to handle Hive Ops. +object HiveParseHelper extends Logging { + val SCAN_HIVE_LABEL = "scan hive" + val SCAN_HIVE_EXEC_NAME = "HiveTableScanExec" + val INSERT_INTO_HIVE_LABEL = "InsertIntoHiveTable" + + // The following is a list of Classes we can look for is SerDe. + // We should maintain this table with custom classes as needed. + // Note that we map each SerDe to a format "Hive*" because the hive formats are still different + // compared to the native onces according to the documentation. For example, this is why the + // the "supportedDataSource.csv" has a "HiveText" entry. + private val LOADED_SERDE_CLASSES = Seq( + HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "HiveText"), + HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + "HiveParquet"), + HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.avro.AvroSerDe", "HiveAvro"), + HiveScanSerdeClasses("org.apache.hadoop.hive.serde2.OpenCSVSerde", "HiveCSV"), + HiveScanSerdeClasses("org.apache.hadoop.hive.ql.io.orc.OrcSerde", "HiveORC") + ) + + def isHiveTableInsertNode(nodeName: String): Boolean = { + nodeName.contains(INSERT_INTO_HIVE_LABEL) + } + + def isHiveTableScanNode(nodeName: String): Boolean = { + nodeName.toLowerCase.startsWith(SCAN_HIVE_LABEL) + } + + def isHiveTableScanNode(node: SparkPlanGraphNode): Boolean = { + isHiveTableScanNode(node.name) + } + + // Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class. + // If the SerDe class does not match the lookups, it returns an "unknown" format. + def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = { + LOADED_SERDE_CLASSES.find(k => node.desc.contains(k.className)).map( + _.parseReadNode(node)).getOrElse(ReadMetaData("", "HiveTableRelation", "unknown", "unknown")) + } + + // Given a "scan hive" NodeGraph, construct the MetaData of the write operation based on the + // SerDe class. If the SerDe class does not match the lookups, it returns an "unknown" format. + def getWriteFormat(node: SparkPlanGraphNode): String = { + val readMetaData = parseReadNode(node) + readMetaData.format + } + + def isHiveEnabled(properties: collection.Map[String, String]): Boolean = { + EventUtils.isPropertyMatch(properties, "spark.sql.catalogImplementation", "", "hive") + } + + // Keep for future improvement as we can pass this information to the AutoTuner/user to suggest + // recommendations regarding ORC optimizations. + def isORCNativeEnabled(properties: collection.Map[String, String]): Boolean = { + EventUtils.isPropertyMatch(properties, "spark.sql.orc.impl", "native", "native") || + EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreOrc", "true", "true") + } + + // Keep for future improvement as we can pass this information to the AutoTuner/user to suggest + // recommendations regarding Parquet optimizations. + def isConvertParquetEnabled(properties: collection.Map[String, String]): Boolean = { + EventUtils.isPropertyMatch(properties, "spark.sql.hive.convertMetastoreParquet", "true", "true") + } + + // Keep for future improvement as we can pass this information to the AutoTuner/user to suggest + // recommendations regarding Text optimizations for GPU. + def isRAPIDSTextHiveEnabled(properties: collection.Map[String, String]): Boolean = { + EventUtils.isPropertyMatch(properties, + "spark.rapids.sql.format.hive.text.enabled", "true", "true") + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala index 3778718fd..eb15bea7c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ReadParser.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,29 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraphNode case class ReadMetaData(schema: String, location: String, filters: String, format: String) object ReadParser extends Logging { + // It was found that some eventlogs could have "NativeScan" instead of "Scan" + val SCAN_NODE_PREFIXES = Seq("Scan", "NativeScan") + // DatasourceV2 node names that exactly match the following labels + val DATASOURCE_V2_NODE_EXACT_PREF = Set( + "BatchScan") + // DatasourceV2 node names that match partially on the following labels + val DATASOURCE_V2_NODE_PREF = Set( + "GpuScan", + "GpuBatchScan", + "JDBCRelation") + + def isScanNode(nodeName: String): Boolean = { + SCAN_NODE_PREFIXES.exists(nodeName.startsWith(_)) + } + + def isScanNode(node: SparkPlanGraphNode): Boolean = { + isScanNode(node.name) + } + + def isDataSourceV2Node(node: SparkPlanGraphNode): Boolean = { + DATASOURCE_V2_NODE_EXACT_PREF.exists(node.name.equals(_)) || + DATASOURCE_V2_NODE_PREF.exists(node.name.contains(_)) + } // strip off the struct<> part that Spark adds to the ReadSchema def formatSchemaStr(schema: String): String = { @@ -48,60 +71,65 @@ object ReadParser extends Logging { } def parseReadNode(node: SparkPlanGraphNode): ReadMetaData = { - val schemaTag = "ReadSchema: " - val schema = if (node.desc.contains(schemaTag)) { - formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag)) + if (HiveParseHelper.isHiveTableScanNode(node)) { + HiveParseHelper.parseReadNode(node) } else { - "" - } - val locationTag = "Location: " - val location = if (node.desc.contains(locationTag)) { - val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag) - // Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[ - // and return only location - // Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet, - // PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1] - if (stringWithBrackets.contains("[")) { - stringWithBrackets.split("\\[", 2).last.replace("]", "") + val schemaTag = "ReadSchema: " + val schema = if (node.desc.contains(schemaTag)) { + formatSchemaStr(getFieldWithoutTag(node.desc, schemaTag)) } else { - stringWithBrackets + "" } - } else if (node.name.contains("JDBCRelation")) { - // see if we can report table or query - val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r - node.name match { - case JDBCPattern(tableName) => tableName - case _ => "unknown" + val locationTag = "Location: " + val location = if (node.desc.contains(locationTag)) { + val stringWithBrackets = getFieldWithoutTag(node.desc, locationTag) + // Remove prepended InMemoryFileIndex[ or PreparedDeltaFileIndex[ + // and return only location + // Ex: InMemoryFileIndex[hdfs://bdbl-rpm-1106-57451/numbers.parquet, + // PreparedDeltaFileIndex[file:/tmp/deltatable/delta-table1] + if (stringWithBrackets.contains("[")) { + stringWithBrackets.split("\\[", 2).last.replace("]", "") + } else { + stringWithBrackets + } + } else if (node.name.contains("JDBCRelation")) { + // see if we can report table or query + val JDBCPattern = raw".*JDBCRelation\((.*)\).*".r + node.name match { + case JDBCPattern(tableName) => tableName + case _ => "unknown" + } + } else { + "unknown" } - } else { - "unknown" - } - val pushedFilterTag = "PushedFilters: " - val pushedFilters = if (node.desc.contains(pushedFilterTag)) { - val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag) - // Remove prepended [ from the string if exists - if (stringWithBrackets.contains("[")) { - stringWithBrackets.split("\\[", 2).last.replace("]", "") + val pushedFilterTag = "PushedFilters: " + val pushedFilters = if (node.desc.contains(pushedFilterTag)) { + val stringWithBrackets = getFieldWithoutTag(node.desc, pushedFilterTag) + // Remove prepended [ from the string if exists + if (stringWithBrackets.contains("[")) { + stringWithBrackets.split("\\[", 2).last.replace("]", "") + } else { + stringWithBrackets + } } else { - stringWithBrackets + "unknown" } - } else { - "unknown" - } - val formatTag = "Format: " - val fileFormat = if (node.desc.contains(formatTag)) { - val format = getFieldWithoutTag(node.desc, formatTag) - if (node.name.startsWith("Gpu")) { - s"${format}(GPU)" + val formatTag = "Format: " + val fileFormat = if (node.desc.contains(formatTag)) { + val format = getFieldWithoutTag(node.desc, formatTag) + if (node.name.startsWith("Gpu")) { + s"${format}(GPU)" + } else { + format + } + } else if (node.name.contains("JDBCRelation")) { + "JDBC" } else { - format + "unknown" } - } else if (node.name.contains("JDBCRelation")) { - "JDBC" - } else { - "unknown" + ReadMetaData(schema, location, pushedFilters, fileFormat) } - ReadMetaData(schema, location, pushedFilters, fileFormat) + } // For the read score we look at the read format and datatypes for each diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 47b38d847..ce556b8a3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -257,7 +257,7 @@ object SQLPlanParser extends Logging { ShuffledHashJoinExecParser(node, checker, sqlID, app).parse case "Sort" => SortExecParser(node, checker, sqlID).parse - case s if (s.startsWith("Scan")) => + case s if ReadParser.isScanNode(s) => FileSourceScanExecParser(node, checker, sqlID, app).parse case "SortAggregate" => SortAggregateExecParser(node, checker, sqlID).parse diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 43ced9605..3a4423bdc 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.qualification import scala.collection.mutable.{ArrayBuffer, Buffer, LinkedHashMap, ListBuffer} import com.nvidia.spark.rapids.tool.ToolTextFileWriter -import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo} +import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, HiveParseHelper, PlanInfo} import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER} import org.apache.hadoop.conf.Configuration @@ -1015,11 +1015,17 @@ object QualOutputWriter { // Unsupported Execs and Execs that are not supported due to unsupported expressions, or if // the operation is from a dataset, or if the operation contains a UDF. + // Note that we remove "scan hive" and "insertIntoHive" execs because it is already reported by + // the readFormatTypes and writeFormatTypes. Otherwise, we end up reporting the same exec + // twice. val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains) val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x) - || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)) + || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x) + || HiveParseHelper.isHiveTableScanNode(x) + || HiveParseHelper.isHiveTableInsertNode(x) + ) val unsupportedExecRows = actualunsupportedExecs.map { exec => // If the exec is in the ignore list, then set the ignore operator to true. if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 7ffea6673..4bc026396 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,27 +19,49 @@ package org.apache.spark.sql.rapids.tool import java.io.InputStream import java.util.zip.GZIPInputStream +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.io.{Codec, Source} import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo} -import com.nvidia.spark.rapids.tool.planparser.ReadParser +import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser} +import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, DriverAccumCase, JobInfoClass, SQLExecutionInfoClass, StageInfoClass, TaskStageAccumCase} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter} import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListenerEvent, StageInfo} +import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobStart, StageInfo} import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode} import org.apache.spark.sql.rapids.tool.qualification.MLFunctions import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil import org.apache.spark.util.Utils +// Handles updating and caching Spark Properties for a Spark application. +// Properties stored in this container can be accessed to make decision about +// certain analysis that depends on the context of the Spark properties. +// TODO: we need to migrate SparkProperties, GpuMode to this trait. +trait CashableProps { + // A flag wether hive is enabled or not. Note that we assume that the + // property is global to the entire application once it is set. a.k.a, it cannot be disabled + // once it is was set to true. + var hiveEnabled = false + def handleEnvUpdateForCachedProps(event: SparkListenerEnvironmentUpdate): Unit = { + val sparkProperties = event.environmentDetails("Spark Properties").toMap + hiveEnabled ||= HiveParseHelper.isHiveEnabled(sparkProperties) + } + + def handleJobStartForCachedProps(event: SparkListenerJobStart): Unit = { + // TODO: we need to improve this in order to support per-job-level + hiveEnabled ||= HiveParseHelper.isHiveEnabled(event.properties.asScala) + } +} + abstract class AppBase( val eventLogInfo: Option[EventLogInfo], - val hadoopConf: Option[Configuration]) extends Logging { + val hadoopConf: Option[Configuration]) extends Logging with CashableProps { var sparkVersion: String = "" var appEndTime: Option[Long] = None @@ -272,6 +294,16 @@ abstract class AppBase( } } + // Finds all the nodes that scan a hive table + def getPlanInfoWithHiveScan(planInfo: SparkPlanInfo): Seq[SparkPlanInfo] = { + val childRes = planInfo.children.flatMap(getPlanInfoWithHiveScan(_)) + if (isHiveTableScanNode(planInfo.nodeName)) { + childRes :+ planInfo + } else { + childRes + } + } + private def trimSchema(str: String): String = { val index = str.lastIndexOf(",") if (index != -1 && str.contains("...")) { @@ -295,7 +327,7 @@ abstract class AppBase( // Get ReadSchema of each Node and sanitize it for comparison val trimmedNode = trimSchema(ReadParser.parseReadNode(node).schema) readSchema.contains(trimmedNode) - }).filter(x => x.name.startsWith("Scan")).head + }).filter(ReadParser.isScanNode(_)).head dataSourceInfo += DataSourceCase(sqlID, scanNode.id, @@ -305,15 +337,29 @@ abstract class AppBase( readSchema ) } + // scan hive has no "ReaSchema" defined. So, we need to look explicitly for nodes + // that are scan hive and add them one by one to the dataSource + if (hiveEnabled) { // only scan for hive when the CatalogImplementation is using hive + val allPlanWithHiveScan = getPlanInfoWithHiveScan(planInfo) + allPlanWithHiveScan.foreach { hiveReadPlan => + val sqlGraph = SparkPlanGraph(hiveReadPlan) + val hiveScanNode = sqlGraph.allNodes.head + val scanHiveMeta = HiveParseHelper.parseReadNode(hiveScanNode) + dataSourceInfo += DataSourceCase(sqlID, + hiveScanNode.id, + scanHiveMeta.format, + scanHiveMeta.location, + scanHiveMeta.filters, + scanHiveMeta.schema + ) + } + } } // This will find scans for DataSource V2, if the schema is very large it // will likely be incomplete and have ... at the end. protected def checkGraphNodeForReads(sqlID: Long, node: SparkPlanGraphNode): Unit = { - if (node.name.equals("BatchScan") || - node.name.contains("GpuScan") || - node.name.contains("GpuBatchScan") || - node.name.contains("JDBCRelation")) { + if (ReadParser.isDataSourceV2Node(node)) { val res = ReadParser.parseReadNode(node) dataSourceInfo += DataSourceCase(sqlID, diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala index 15c1b30ef..8bad42407 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -304,6 +304,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi app: T, event: SparkListenerJobStart): Unit = { logDebug("Processing event: " + event.getClass) + app.handleJobStartForCachedProps(event) val sqlIDString = event.properties.getProperty("spark.sql.execution.id") val sqlID = ProfileUtils.stringToLong(sqlIDString) sqlID.foreach(app.jobIdToSqlID(event.jobId) = _) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala index b275a8c6a..36dc4081f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala @@ -46,6 +46,7 @@ class FilterAppInfo( def doSparkListenerEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { logDebug("Processing event: " + event.getClass) val envSparkProperties = event.environmentDetails("Spark Properties").toMap + handleEnvUpdateForCachedProps(event) sparkProperties = Some(EnvironmentInfo(envSparkProperties)) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala index 270303f40..6ebdb93cc 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -139,6 +139,7 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati app: ApplicationInfo, event: SparkListenerEnvironmentUpdate): Unit = { logDebug("Processing event: " + event.getClass) + app.handleEnvUpdateForCachedProps(event) app.sparkProperties = event.environmentDetails("Spark Properties").toMap app.classpathEntries = event.environmentDetails("Classpath Entries").toMap diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala index 390251177..3752fadac 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean app: QualificationAppInfo, event: SparkListenerEnvironmentUpdate): Unit = { logDebug("Processing event: " + event.getClass) + app.handleEnvUpdateForCachedProps(event) val sparkProperties = event.environmentDetails("Spark Properties").toMap if (ToolUtils.isPluginEnabled(sparkProperties)) { throw GpuEventLogException(s"Cannot parse event logs from GPU run") diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala index bdcae9aa5..4674d363b 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,4 +81,11 @@ object EventUtils extends Logging { taskId, accuInfo.id, accuInfo.name, value, update, accuInfo.internal)) } } + + // A utility function used to read Spark properties and compare it to a given target. + // Note that it takes a default argument as well in case the property is not available. + def isPropertyMatch(properties: collection.Map[String, String], propKey: String, + defValue: String, targetValue: String): Boolean = { + properties.getOrElse(propKey, defValue).equals(targetValue) + } } diff --git a/core/src/test/resources/key-value-pairs.txt b/core/src/test/resources/key-value-pairs.txt new file mode 100644 index 000000000..157527249 --- /dev/null +++ b/core/src/test/resources/key-value-pairs.txt @@ -0,0 +1,499 @@ +203val_203 +491val_491 +325val_325 +401val_401 +281val_281 +498val_498 +498val_498 +430val_430 +321val_321 +72val_72 +325val_325 +114val_114 +84val_84 +395val_395 +289val_289 +466val_466 +431val_431 +76val_76 +401val_401 +439val_439 +477val_477 +462val_462 +26val_26 +177val_177 +382val_382 +309val_309 +399val_399 +404val_404 +396val_396 +77val_77 +194val_194 +229val_229 +262val_262 +401val_401 +468val_468 +199val_199 +345val_345 +176val_176 +469val_469 +65val_65 +54val_54 +223val_223 +87val_87 +282val_282 +230val_230 +342val_342 +169val_169 +216val_216 +167val_167 +478val_478 +469val_469 +19val_19 +260val_260 +492val_492 +67val_67 +208val_208 +403val_403 +316val_316 +348val_348 +197val_197 +409val_409 +411val_411 +160val_160 +311val_311 +356val_356 +229val_229 +181val_181 +369val_369 +18val_18 +164val_164 +90val_90 +317val_317 +217val_217 +454val_454 +285val_285 +137val_137 +226val_226 +296val_296 +397val_397 +137val_137 +100val_100 +138val_138 +51val_51 +256val_256 +413val_413 +103val_103 +217val_217 +138val_138 +485val_485 +318val_318 +316val_316 +34val_34 +183val_183 +165val_165 +310val_310 +126val_126 +466val_466 +97val_97 +247val_247 +424val_424 +323val_323 +454val_454 +5val_5 +316val_316 +113val_113 +265val_265 +69val_69 +205val_205 +189val_189 +453val_453 +306val_306 +384val_384 +11val_11 +51val_51 +195val_195 +242val_242 +317val_317 +167val_167 +258val_258 +278val_278 +9val_9 +237val_237 +475val_475 +209val_209 +280val_280 +197val_197 +205val_205 +134val_134 +193val_193 +20val_20 +64val_64 +233val_233 +493val_493 +360val_360 +463val_463 +277val_277 +213val_213 +342val_342 +362val_362 +406val_406 +367val_367 +348val_348 +311val_311 +418val_418 +149val_149 +24val_24 +28val_28 +336val_336 +172val_172 +469val_469 +318val_318 +407val_407 +284val_284 +155val_155 +274val_274 +333val_333 +33val_33 +327val_327 +86val_86 +199val_199 +466val_466 +431val_431 +286val_286 +242val_242 +400val_400 +244val_244 +438val_438 +469val_469 +248val_248 +397val_397 +273val_273 +457val_457 +275val_275 +235val_235 +103val_103 +43val_43 +406val_406 +273val_273 +481val_481 +129val_129 +223val_223 +42val_42 +168val_168 +131val_131 +186val_186 +53val_53 +364val_364 +208val_208 +460val_460 +95val_95 +280val_280 +437val_437 +394val_394 +15val_15 +111val_111 +478val_478 +167val_167 +292val_292 +5val_5 +230val_230 +175val_175 +42val_42 +133val_133 +384val_384 +70val_70 +348val_348 +35val_35 +480val_480 +82val_82 +341val_341 +436val_436 +392val_392 +26val_26 +153val_153 +417val_417 +201val_201 +369val_369 +382val_382 +146val_146 +165val_165 +344val_344 +150val_150 +70val_70 +459val_459 +446val_446 +339val_339 +439val_439 +118val_118 +120val_120 +170val_170 +66val_66 +8val_8 +468val_468 +230val_230 +175val_175 +256val_256 +467val_467 +221val_221 +282val_282 +489val_489 +333val_333 +404val_404 +174val_174 +479val_479 +84val_84 +298val_298 +216val_216 +18val_18 +72val_72 +191val_191 +375val_375 +395val_395 +187val_187 +489val_489 +322val_322 +207val_207 +489val_489 +230val_230 +406val_406 +118val_118 +353val_353 +393val_393 +327val_327 +470val_470 +365val_365 +100val_100 +105val_105 +172val_172 +98val_98 +199val_199 +145val_145 +309val_309 +219val_219 +374val_374 +76val_76 +237val_237 +152val_152 +228val_228 +427val_427 +218val_218 +421val_421 +176val_176 +318val_318 +409val_409 +15val_15 +128val_128 +331val_331 +496val_496 +0val_0 +401val_401 +462val_462 +353val_353 +492val_492 +480val_480 +438val_438 +138val_138 +12val_12 +272val_272 +468val_468 +119val_119 +119val_119 +298val_298 +96val_96 +419val_419 +414val_414 +288val_288 +413val_413 +35val_35 +463val_463 +47val_47 +146val_146 +70val_70 +449val_449 +468val_468 +78val_78 +200val_200 +44val_44 +35val_35 +495val_495 +338val_338 +30val_30 +255val_255 +58val_58 +366val_366 +369val_369 +0val_0 +386val_386 +224val_224 +239val_239 +308val_308 +483val_483 +116val_116 +12val_12 +265val_265 +379val_379 +443val_443 +169val_169 +452val_452 +351val_351 +344val_344 +497val_497 +179val_179 +287val_287 +90val_90 +458val_458 +5val_5 +373val_373 +134val_134 +213val_213 +190val_190 +119val_119 +307val_307 +331val_331 +409val_409 +192val_192 +435val_435 +193val_193 +152val_152 +396val_396 +401val_401 +219val_219 +97val_97 +459val_459 +92val_92 +10val_10 +283val_283 +482val_482 +128val_128 +384val_384 +288val_288 +307val_307 +469val_469 +277val_277 +406val_406 +157val_157 +444val_444 +377val_377 +95val_95 +266val_266 +17val_17 +311val_311 +193val_193 +429val_429 +169val_169 +399val_399 +252val_252 +129val_129 +432val_432 +179val_179 +277val_277 +291val_291 +195val_195 +396val_396 +321val_321 +57val_57 +273val_273 +348val_348 +368val_368 +83val_83 +480val_480 +4val_4 +85val_85 +2val_2 +414val_414 +67val_67 +322val_322 +104val_104 +221val_221 +255val_255 +489val_489 +378val_378 +424val_424 +174val_174 +490val_490 +196val_196 +163val_163 +277val_277 +429val_429 +37val_37 +403val_403 +484val_484 +454val_454 +315val_315 +431val_431 +438val_438 +90val_90 +149val_149 +222val_222 +113val_113 +200val_200 +367val_367 +187val_187 +430val_430 +187val_187 +143val_143 +257val_257 +169val_169 +472val_472 +80val_80 +0val_0 +104val_104 +27val_27 +402val_402 +417val_417 +180val_180 +162val_162 +207val_207 +403val_403 +191val_191 +327val_327 +41val_41 +498val_498 +458val_458 +136val_136 +348val_348 +209val_209 +494val_494 +166val_166 +417val_417 +448val_448 +202val_202 +74val_74 +430val_430 +138val_138 +214val_214 +305val_305 +164val_164 +120val_120 +158val_158 +272val_272 +208val_208 +335val_335 +203val_203 +230val_230 +302val_302 +128val_128 +58val_58 +263val_263 +281val_281 +178val_178 +239val_239 +332val_332 +249val_249 +278val_278 +24val_24 +389val_389 +98val_98 +455val_455 +37val_37 +238val_238 +83val_83 +241val_241 +298val_298 +233val_233 +224val_224 +125val_125 +156val_156 +487val_487 +125val_125 \ No newline at end of file diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala index 8214b3de6..dc63f27cd 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,7 +74,8 @@ object ToolTestUtils extends Logging { } def generateEventLog(eventLogDir: File, appName: String, - confs: Option[Map[String, String]] = None) + confs: Option[Map[String, String]] = None, + enableHive: Boolean = false) (fun: SparkSession => DataFrame): (String, String) = { // we need to close any existing sessions to ensure that we can @@ -87,8 +88,11 @@ object ToolTestUtils extends Logging { .appName(appName) .config("spark.eventLog.enabled", "true") .config("spark.eventLog.dir", eventLogDir.getAbsolutePath) + if (enableHive) { + sparkBuilder.enableHiveSupport() + } confs.foreach(_.foreach {case (k, v) => sparkBuilder.config(k, v)}) - lazy val spark = sparkBuilder.getOrCreate() + lazy val spark = sparkBuilder.getOrCreate() // execute the query and generate events val df = fun(spark) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index ca13bd7e4..df43104ee 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1479,6 +1479,70 @@ class QualificationSuite extends BaseTestSuite { } } } + + test("scan hive text-format is supported") { + // The unit test loads text file into Hive table. Then it runs SQL hive query that generates + // "Scan hive". If the Qualification fails to support the "Scan hive", then the format would + // appear in the unsupportedOperators.csv file or the "non-supported read format" column + TrampolineUtil.withTempDir { warehouseDir => + // text file is pair-key-value "key: val_$key" + val textFilePath = ToolTestUtils.getTestResourcePath("key-value-pairs.txt") + // set the directory where the store is kept + TrampolineUtil.withTempDir { outpath => + val derbyDir = s"${outpath.getAbsolutePath}/derby" + System.setProperty("derby.system.home", s"$derbyDir") + val sparkConfs = Map( + "spark.sql.warehouse.dir" -> warehouseDir.getAbsolutePath, + "spark.driver.extraJavaOptions" -> s"-Dderby.system.home='$derbyDir'") + val allArgs = Array( + "--report-read-schema", // enable report read schema + "--output-directory", + outpath.getAbsolutePath()) + + TrampolineUtil.withTempDir { eventLogDir => + // set the name to "hiv3" on purpose to avoid any matches on "hive". + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "scanHiv3App", + Some(sparkConfs), enableHive = true) { spark => + // scalastyle:off line.size.limit + // the following set of queries will generate the following physical plan: + // [{"nodeName":"Scan hive default.src","simpleString":"Scan hive default.src [key#6, value#7], + // HiveTableRelation [`default`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, + // Data Cols: [key#6, value#7], Partition Cols: []]","children":[],"metadata":{}, + // "metrics":[{"name":"number of output rows","accumulatorId":12,"metricType":"sum"}]}] + // scalastyle:on line.size.limit + spark.sql("DROP TABLE IF EXISTS src") + spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") + spark.sql(s"LOAD DATA LOCAL INPATH '$textFilePath' INTO TABLE src") + spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") + } + + val appArgs = new QualificationArgs(allArgs ++ Array(eventLog)) + val (exit, appSum) = QualificationMain.mainInternal(appArgs) + + assert(exit == 0) + // Verify that results contain a single app + assert(appSum.nonEmpty) + // Verify that the texthive is listed in the readFileFormats column + // The "readSchema" column should be "hivetext[]:hivetext[]:hivetext[]:hivetext[]" + assert(appSum.head.readFileFormats.exists(_.contains("hivetext"))) + // Verify that the texthive is not listed in the unsupported formats + // The "Unsupported Read File Formats and Types" column should be empty + assert(appSum.head.readFileFormatAndTypesNotSupported.isEmpty) + // Next, we check that the content of the unsupportedOps has no entry for "hive". + val unsupportedOpsCSV = s"$outpath/rapids_4_spark_qualification_output/" + + s"rapids_4_spark_qualification_output_unsupportedOperators.csv" + val inputSource = Source.fromFile(unsupportedOpsCSV) + try { + val unsupportedRows = inputSource.getLines.toSeq + assert(unsupportedRows.head.contains("App ID,Unsupported Type,")) + assert(!unsupportedRows.exists(_.contains("hive"))) + } finally { + inputSource.close() + } + } + } + } + } } class ToolTestListener extends SparkListener { From 30e33af9dd3ec0caf8221608d430ed90ae0f61a2 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Thu, 11 Jan 2024 08:37:57 -0600 Subject: [PATCH 2/2] fix typos Signed-off-by: Ahmed Hussein (amahussein) --- core/pom.xml | 2 +- .../scala/org/apache/spark/sql/rapids/tool/AppBase.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 5de4986c2..94614472c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -525,7 +525,7 @@ test - + org.apache.spark spark-hive_${scala.binary.version} ${spark.version} diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 4bc026396..e645bb763 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -43,8 +43,8 @@ import org.apache.spark.util.Utils // Properties stored in this container can be accessed to make decision about // certain analysis that depends on the context of the Spark properties. // TODO: we need to migrate SparkProperties, GpuMode to this trait. -trait CashableProps { - // A flag wether hive is enabled or not. Note that we assume that the +trait CacheableProps { + // A flag whether hive is enabled or not. Note that we assume that the // property is global to the entire application once it is set. a.k.a, it cannot be disabled // once it is was set to true. var hiveEnabled = false @@ -61,7 +61,7 @@ trait CashableProps { abstract class AppBase( val eventLogInfo: Option[EventLogInfo], - val hadoopConf: Option[Configuration]) extends Logging with CashableProps { + val hadoopConf: Option[Configuration]) extends Logging with CacheableProps { var sparkVersion: String = "" var appEndTime: Option[Long] = None @@ -337,7 +337,7 @@ abstract class AppBase( readSchema ) } - // scan hive has no "ReaSchema" defined. So, we need to look explicitly for nodes + // "scan hive" has no "ReadSchema" defined. So, we need to look explicitly for nodes // that are scan hive and add them one by one to the dataSource if (hiveEnabled) { // only scan for hive when the CatalogImplementation is using hive val allPlanWithHiveScan = getPlanInfoWithHiveScan(planInfo)