From b54ae842f058f4fd27efcd0ff47d39f0bbf456bb Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Fri, 15 Dec 2023 07:04:11 -0800 Subject: [PATCH] Qualification tool: Add more information for unsupported operators (#680) * Qualification tool: Add more information for unsupported operators Signed-off-by: Niranjan Artal * update test * addressed review comments * add some more expressions in ignore list, update output format * update output format * update test results * update ignore exec name * add comment for ignore operator --------- Signed-off-by: Niranjan Artal --- .../tool/planparser/SQLPlanParser.scala | 10 +- .../tool/qualification/QualOutputWriter.scala | 234 ++++++++++++------ .../tool/qualification/Qualification.scala | 1 + .../spark/sql/rapids/tool/ToolUtils.scala | 21 +- .../qualification/QualificationAppInfo.scala | 41 +-- .../complex_dec_expectation.csv | 2 +- .../jdbc_expectation.csv | 2 +- .../nds_q86_fail_test_expectation.csv | 2 +- .../nds_q86_test_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 6 +- .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../qualification/QualificationSuite.scala | 14 +- 14 files changed, 234 insertions(+), 107 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 96829df86..b809d9521 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -39,7 +39,9 @@ class ExecInfo( val children: Option[Seq[ExecInfo]], // only one level deep val stages: Set[Int] = Set.empty, val shouldRemove: Boolean = false, - val unsupportedExprs: Array[String] = Array.empty) { + val unsupportedExprs: Array[String] = Array.empty, + val dataSet: Boolean = false, + val udf: Boolean = false) { private def childrenToString = { val str = children.map { c => c.map(" " + _.toString).mkString("\n") @@ -76,7 +78,8 @@ object SQLPlanParser extends Logging { val windowFunctionPattern = """(\w+)\(""".r - val ignoreExpressions = Array("any", "cast", "decimal", "decimaltype", "every", "some", + val ignoreExpressions = Array("any", "cast", "ansi_cast", "decimal", "decimaltype", "every", + "some", "merge_max", "merge_min", "merge_sum", "merge_count", "merge_avg", "merge_first", "list", // current_database does not cause any CPU fallbacks "current_database", @@ -301,13 +304,12 @@ object SQLPlanParser extends Logging { } val stagesInNode = getStagesInSQLNode(node, app) val supported = execInfos.isSupported && !ds && !containsUDF - // shouldRemove is set to true if the exec is a member of "execsToBeRemoved" or if the node // is a duplicate val removeFlag = execInfos.shouldRemove || isDupNode || execsToBeRemoved.contains(node.name) Seq(new ExecInfo(execInfos.sqlID, execInfos.exec, execInfos.expr, execInfos.speedupFactor, execInfos.duration, execInfos.nodeId, supported, execInfos.children, - stagesInNode, removeFlag, execInfos.unsupportedExprs)) + stagesInNode, removeFlag, execInfos.unsupportedExprs, ds, containsUDF)) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index d4eff43fc..bd1b91654 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER} import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.{IgnoreExecs, ToolUtils} import org.apache.spark.sql.rapids.tool.qualification.{EstimatedPerSQLSummaryInfo, EstimatedSummaryInfo, QualificationAppInfo, QualificationSummaryInfo, StatusSummaryInfo} import org.apache.spark.sql.rapids.tool.util._ @@ -151,13 +151,38 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, val csvFileWriter = new ToolTextFileWriter(outputDir, s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperators.csv", "Unsupported Operators CSV Report", hadoopConf) - val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums) - csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, - QualOutputWriter.CSV_DELIMITER, false)) - sums.foreach { sum => - val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes, - QualOutputWriter.CSV_DELIMITER, false) - rows.foreach(row => csvFileWriter.write(row)) + try { + val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums) + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false)) + sums.foreach { sum => + val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false) + rows.foreach(row => csvFileWriter.write(row)) + } + } finally { + csvFileWriter.close() + } + } + + def writeUnsupportedOperatorsDetailedStageCSVReport( + sums: Seq[QualificationSummaryInfo], + order: String): Unit = { + val csvFileWriter = new ToolTextFileWriter(outputDir, + s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperatorsStageDuration.csv", + "Unsupported Operators StageDuration CSV Report", hadoopConf) + try { + val headersAndSizes = + QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums) + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false)) + sums.foreach { sum => + val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false) + rows.foreach(row => csvFileWriter.write(row)) + } + } finally { + csvFileWriter.close() } } @@ -361,6 +386,7 @@ object QualOutputWriter { val SQL_DUR_STR = "SQL DF Duration" val TASK_DUR_STR = "SQL Dataframe Task Duration" val STAGE_DUR_STR = "Stage Task Duration" + val STAGE_WALLCLOCK_DUR_STR = "Stage Duration" val POT_PROBLEM_STR = "Potential Problems" val EXEC_CPU_PERCENT_STR = "Executor CPU Time Percent" val APP_DUR_ESTIMATED_STR = "App Duration Estimated" @@ -400,6 +426,7 @@ object QualOutputWriter { val UNSUPPORTED_TYPE = "Unsupported Type" val DETAILS = "Details" val NOTES = "Notes" + val IGNORE_OPERATOR = "Ignore Operator" val RUN_NAME = "RunName" val ESTIMATED_FREQUENCY = "Estimated Job Frequency (monthly)" val ML_FUNCTIONS = "ML Functions" @@ -540,11 +567,25 @@ object QualOutputWriter { APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos), UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size, DETAILS -> DETAILS.size, - NOTES -> NOTES.size + NOTES -> NOTES.size, + IGNORE_OPERATOR -> IGNORE_OPERATOR.size ) detailedHeaderAndFields } + def getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes( + appInfos: Seq[QualificationSummaryInfo]): LinkedHashMap[String, Int] = { + val detailedHeaderAndFields = LinkedHashMap[String, Int]( + APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos), + UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size, + STAGE_ID_STR -> STAGE_ID_STR.size, + STAGE_WALLCLOCK_DUR_STR -> STAGE_WALLCLOCK_DUR_STR.size, + APP_DUR_STR -> APP_DUR_STR.size, + SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE, + IGNORE_OPERATOR -> IGNORE_OPERATOR.size + ) + detailedHeaderAndFields + } def getDetailedHeaderStringsAndSizes(appInfos: Seq[QualificationSummaryInfo], reportReadSchema: Boolean): LinkedHashMap[String, Int] = { @@ -886,7 +927,7 @@ object QualOutputWriter { } } - def constructUnsupportedOperatorsInfo( + def constructUnsupportedStagesDurationInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], delimiter: String = TEXT_DELIMITER, @@ -895,81 +936,130 @@ object QualOutputWriter { val reformatCSVFunc: String => String = if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str) val appId = sumInfo.appId - val readFormat = sumInfo.readFileFormatAndTypesNotSupported - val writeFormat = sumInfo.writeDataFormat - val unsupportedExecs = sumInfo.unSupportedExecs - val unsupportedExprs = sumInfo.unSupportedExprs - val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap - val unsupportedOperatorsOutputRows = new ArrayBuffer[String]() + val appDuration = sumInfo.sparkSqlDFWallClockDuration + val recommendation = sumInfo.estimatedInfo.recommendation - if (readFormat.nonEmpty) { - val unsupportedReadFormatRows = readFormat.map { format => - val readFormatAndType = format.split("\\[") - val readFormat = readFormatAndType(0) - val readType = if (readFormatAndType.size > 1) { - s"Types not supported - ${readFormatAndType(1).replace("]", "")}" - } else { + sumInfo.stageInfo.collect { + case info if info.unsupportedExecs.nonEmpty => + val stageAppDuration = info.stageWallclockDuration + val allUnsupportedExecs = info.unsupportedExecs + if (allUnsupportedExecs.nonEmpty) { + allUnsupportedExecs.map { unsupportedExecsStr => + // Ignore operator is a boolean value which indicates if the operator should be + // considered for GPU acceleration or not. If the value is true, the operator will + // be ignored. + val ignoreUnsupportedExec = if ( + IgnoreExecs.getAllIgnoreExecs.contains(unsupportedExecsStr)) { + IgnoreExecs.True + } else { + IgnoreExecs.False + } + val data = ListBuffer[(String, Int)]( + reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), + reformatCSVFunc(unsupportedExecsStr) -> headersAndSizes(UNSUPPORTED_TYPE), + info.stageId.toString -> headersAndSizes(STAGE_ID_STR), + stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), + appDuration.toString -> headersAndSizes(APP_DUR_STR), + recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR), + ignoreUnsupportedExec -> headersAndSizes(IGNORE_OPERATOR) + ) + constructOutputRow(data, delimiter, prettyPrint) + }.mkString + } + else { "" } - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Read")-> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(readFormat) -> headersAndSizes(DETAILS), - reformatCSVFunc(readType) -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) + } + } + + def constructUnsupportedOperatorsInfo( + sumInfo: QualificationSummaryInfo, + headersAndSizes: LinkedHashMap[String, Int], + delimiter: String = TEXT_DELIMITER, + prettyPrint: Boolean, + reformatCSV: Boolean = true): Seq[String] = { + + val reformatCSVFunc: String => String = + if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str) + + val appId = reformatCSVFunc(sumInfo.appId) + val unsupportedOperatorsOutputRows = new ArrayBuffer[String]() + val unsupportedExprs = sumInfo.unSupportedExprs + val allExecs = getAllExecsFromPlan(sumInfo.planInfo) + val dataSetExecs = allExecs.collect { case x if x.dataSet => x.exec } + val udfExecs = allExecs.collect { case x if x.udf => x.exec } + + def createUnsupportedRow(exec: String, execType: String, notes: String, + ignoreOperator: String = IgnoreExecs.False): String = { + val data = ListBuffer( + appId -> headersAndSizes(APP_ID_STR), + reformatCSVFunc(execType) -> headersAndSizes(UNSUPPORTED_TYPE), + reformatCSVFunc(exec) -> headersAndSizes(DETAILS), + reformatCSVFunc(notes) -> headersAndSizes(NOTES), + reformatCSVFunc(ignoreOperator) -> headersAndSizes(IGNORE_OPERATOR) + ) + constructOutputRow(data, delimiter, prettyPrint) + } + + val readFormatRows = sumInfo.readFileFormatAndTypesNotSupported.map { format => + val readFormatAndType = format.split("\\[") + val readFormat = readFormatAndType(0) + val readType = if (readFormatAndType.size > 1) { + s"Types not supported - ${readFormatAndType(1).replace("]", "")}" + } else { + "" } - unsupportedOperatorsOutputRows ++= unsupportedReadFormatRows + createUnsupportedRow(readFormat,"Read", readType) } - if (unsupportedExecs.nonEmpty) { - val unsupportedExecRows = unsupportedExecs.split(";").map { exec => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(exec) -> headersAndSizes(DETAILS), - reformatCSVFunc("") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) + unsupportedOperatorsOutputRows ++= readFormatRows + + // Unsupported Execs and Execs that are not supported due to unsupported expressions, or if + // the operation is from a dataset, or if the operation contains a UDF. + val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap + val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet + val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains) + val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x) + || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)) + val unsupportedExecRows = actualunsupportedExecs.map { exec => + // If the exec is in the ignore list, then set the ignore operator to true. + if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) { + createUnsupportedRow(exec, "Exec", "", IgnoreExecs.True) + } else { + createUnsupportedRow(exec, "Exec", "", IgnoreExecs.False) } - unsupportedOperatorsOutputRows ++= unsupportedExecRows } - if (unsupportedExecExprsMap.nonEmpty) { - val unsupportedExecExprMapRows = unsupportedExecExprsMap.map { case (exec, exprs) => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(exec) -> headersAndSizes(DETAILS), - reformatCSVFunc(s"$exec Exec is not supported as expressions are " + - s"not supported - `$exprs`") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) - }.toArray - unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows + unsupportedOperatorsOutputRows ++= unsupportedExecRows + + val unsupportedDatasetExecRows = dataSetExecs.map { exec => + createUnsupportedRow(exec, "Exec", s"$exec Exec is not supported as " + + s"this operation is from dataset which is not supported") + } + unsupportedOperatorsOutputRows ++= unsupportedDatasetExecRows + + val unsupportedUdfExecRows = udfExecs.map { exec => + createUnsupportedRow(exec, "Exec", s"$exec Exec is " + + s"not supported as it contains UDF which is not supported") } + unsupportedOperatorsOutputRows ++= unsupportedUdfExecRows + + val unsupportedExecExprMapRows = sumInfo.unsupportedExecstoExprsMap.map { case (exec, exprs) => + createUnsupportedRow(exec, "Exec", s"$exec Exec is not" + + s" supported as expressions are not supported - `$exprs`") + }.toArray + unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows + if (unsupportedExprs.nonEmpty) { - val unsupportedExprRows = unsupportedExprs.split(";").map { expr => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Expression") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(expr) -> headersAndSizes(DETAILS), - reformatCSVFunc("") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) + val unsupportedExprRows = sumInfo.unSupportedExprs.split(";").map { expr => + createUnsupportedRow(expr, "Expression", "") } unsupportedOperatorsOutputRows ++= unsupportedExprRows } - if (writeFormat.nonEmpty) { - val unsupportedwriteFormatRows = writeFormat.map { format => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Write") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(format) -> headersAndSizes(DETAILS), - reformatCSVFunc("") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) - } - unsupportedOperatorsOutputRows ++= unsupportedwriteFormatRows + + val unsupportedWriteFormatRows = sumInfo.writeDataFormat.map { format => + createUnsupportedRow(format, "Write", "") } + unsupportedOperatorsOutputRows ++= unsupportedWriteFormatRows + unsupportedOperatorsOutputRows } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index e733d73c2..0286cd582 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -93,6 +93,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, qWriter.writeExecReport(allAppsSum, order) qWriter.writeStageReport(allAppsSum, order) qWriter.writeUnsupportedOperatorsCSVReport(allAppsSum, order) + qWriter.writeUnsupportedOperatorsDetailedStageCSVReport(allAppsSum, order) val appStatusResult = generateStatusSummary(appStatusReporter.asScala.values.toSeq) qWriter.writeStatusReport(appStatusResult, order) if (mlOpsEnabled) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 0bddd520a..faeaf78c2 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -332,12 +332,29 @@ object IgnoreExecs { // Collect Limit replacement can be slower on the GPU. Disabled by default. private val CollectLimit = "CollectLimit" private val ScanExistingRDD = "Scan ExistingRDD" - private val ExecuteCreateViewCommand = "Execute CreateViewCommand" private val ExistingRDD = "ExistingRDD" + // Some DDL's and table commands which can be ignored + private val ExecuteCreateViewCommand = "Execute CreateViewCommand" private val LocalTableScan = "LocalTableScan" + private val ExecuteCreateDatabaseCommand = "Execute CreateDatabaseCommand" + private val ExecuteDropDatabaseCommand = "Execute DropDatabaseCommand" + private val ExecuteCreateTableAsSelectCommand = "Execute CreateTableAsSelectCommand" + private val ExecuteCreateTableCommand = "Execute CreateTableCommand" + private val ExecuteDropTableCommand = "Execute DropTableCommand" + private val ExecuteCreateDataSourceTableAsSelectCommand = "Execute " + + "CreateDataSourceTableAsSelectCommand" + private val SetCatalogAndNamespace = "SetCatalogAndNamespace" + private val ExecuteSetCommand = "Execute SetCommand" + + + val True = "true" + val False = "false" def getAllIgnoreExecs: Set[String] = Set(AdaptiveSparkPlan, CollectLimit, ScanExistingRDD, - ExecuteCreateViewCommand, ExistingRDD, LocalTableScan) + ExecuteCreateViewCommand, ExistingRDD, LocalTableScan, ExecuteCreateTableCommand, + ExecuteDropTableCommand, ExecuteCreateDatabaseCommand, ExecuteDropDatabaseCommand, + ExecuteCreateTableAsSelectCommand, ExecuteCreateDataSourceTableAsSelectCommand, + SetCatalogAndNamespace, ExecuteSetCommand) } object MlOps { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 6847da083..a58d1e646 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -296,6 +296,7 @@ class QualificationAppInfo( val allSpeedupFactorAvg = SQLPlanParser.averageSpeedup(execInfos.map(_.speedupFactor)) val allFlattenedExecs = flattenedExecs(execInfos) val numUnsupported = allFlattenedExecs.filterNot(_.isSupported) + val unsupportedExecs = numUnsupported.map(_.exec) // if we have unsupported try to guess at how much time. For now divide // time by number of execs and give each one equal weight val eachExecTime = allStageTaskTime / allFlattenedExecs.size @@ -339,8 +340,13 @@ class QualificationAppInfo( eachStageUnsupported } + // Get stage info for the given stageId. + val stageInfos = stageIdToInfo.filterKeys { case (id, _) => id == stageId } + val wallclockStageDuration = stageInfos.values.map(x => x.duration.getOrElse(0L)).sum + StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - finalEachStageUnsupported, numTransitions, transitionsTime, estimated) + finalEachStageUnsupported, numTransitions, transitionsTime, estimated, + wallclockStageDuration, unsupportedExecs) }.toSet } @@ -459,7 +465,7 @@ class QualificationAppInfo( c.filterNot(_.shouldRemove) } new ExecInfo(e.sqlID, e.exec, e.expr, e.speedupFactor, e.duration, - e.nodeId, e.isSupported, filteredChildren, e.stages, e.shouldRemove) + e.nodeId, e.isSupported, filteredChildren, e.stages, e.shouldRemove, e.unsupportedExprs) } val filteredPlanInfos = execFilteredChildren.filterNot(_.shouldRemove) p.copy(execInfo = filteredPlanInfos) @@ -586,10 +592,7 @@ class QualificationAppInfo( e.children.map(x => x.filterNot(_.isSupported)) }.flatten topLevelExecs ++ childrenExecs - }.collect { - case x if !IgnoreExecs.getAllIgnoreExecs.contains(x.exec) => x.exec - }.toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") - + }.map(_.exec).toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") // Get all the unsupported Expressions from the plan val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap( @@ -639,6 +642,9 @@ class QualificationAppInfo( 1 } + val wallClockSqlDFToUse = QualificationAppInfo.wallClockSqlDataFrameToUse( + sparkSQLDFWallClockDuration, appDuration) + val estimatedInfo = QualificationAppInfo.calculateEstimatedInfoSummary(estimatedGPURatio, sparkSQLDFWallClockDuration, appDuration, taskSpeedupFactor, appName, appId, sqlIdsWithFailures.nonEmpty, mlSpeedup, unSupportedExecs, unSupportedExprs, @@ -649,8 +655,8 @@ class QualificationAppInfo( notSupportFormatAndTypesString, getAllReadFileFormats, writeFormat, allComplexTypes, nestedComplexTypes, longestSQLDuration, sqlDataframeTaskDuration, nonSQLTaskDuration, unsupportedSQLTaskDuration, supportedSQLTaskDuration, - taskSpeedupFactor, info.sparkUser, info.startTime, origPlanInfos, - perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos, + taskSpeedupFactor, info.sparkUser, info.startTime, wallClockSqlDFToUse, + origPlanInfos, perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos, unSupportedExecs, unSupportedExprs, clusterTags, allClusterTagsMap, mlFunctions, mlTotalStageDuration, unsupportedExecExprsMap) } @@ -864,6 +870,7 @@ case class QualificationSummaryInfo( taskSpeedupFactor: Double, user: String, startTime: Long, + sparkSqlDFWallClockDuration: Long, planInfo: Seq[PlanInfo], stageInfo: Seq[StageQualSummaryInfo], estimatedInfo: EstimatedAppInfo, @@ -884,7 +891,9 @@ case class StageQualSummaryInfo( unsupportedTaskDur: Long, numTransitions: Int, transitionTime: Long, - estimated: Boolean = false) + estimated: Boolean = false, + stageWallclockDuration: Long = 0, + unsupportedExecs: Seq[String] = Seq.empty) object QualificationAppInfo extends Logging { // define recommendation constants @@ -929,19 +938,19 @@ object QualificationAppInfo extends Logging { } } + def wallClockSqlDataFrameToUse(sqlDataFrameDuration: Long, appDuration: Long): Long = { + // If our app duration is shorter than our sql duration, estimate the sql duration down + // to app duration + math.min(sqlDataFrameDuration, appDuration) + } + // Summarize and estimate based on wall clock times def calculateEstimatedInfoSummary(estimatedRatio: Double, sqlDataFrameDuration: Long, appDuration: Long, sqlSpeedupFactor: Double, appName: String, appId: String, hasFailures: Boolean, mlSpeedupFactor: Option[MLFuncsSpeedupAndDuration] = None, unsupportedExecs: String = "", unsupportedExprs: String = "", allClusterTagsMap: Map[String, String] = Map.empty[String, String]): EstimatedAppInfo = { - val sqlDataFrameDurationToUse = if (sqlDataFrameDuration > appDuration) { - // our app duration is shorter then our sql duration, estimate the sql duration down - // to app duration - appDuration - } else { - sqlDataFrameDuration - } + val sqlDataFrameDurationToUse = wallClockSqlDataFrameToUse(sqlDataFrameDuration, appDuration) // get the average speedup and duration for ML funcs supported on GPU val (mlSpeedup, mlDuration) = if (mlSpeedupFactor.isDefined) { diff --git a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv index a80462cbc..753bdcacf 100644 --- a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"","",30 +"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index c4f57a9c2..7dbea470a 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1]","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv index 6943e0418..84fe8ae97 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv index 5637f2123..3cd646abf 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19230.84,6940.15,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.64,false,"","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19230.84,6940.15,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index d629f3206..e41039a69 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) "Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8488.68,7830.31,12434,132257,16319,10577,37.7,"","","JSON","","","",7143,4717,19744,112513,3.85,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651187225439","Not Recommended",1.0,355550.33,86.66,760,180,355637,253,87.88,"","JSON[string:bigint:int]","","","","",498,343411,120,60,1.52,false,"SerializeFromObject;DeserializeToObject;Scan json;Filter;MapElements","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166205.19,9.8,911,283,166215,38,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,271,12,1.34,false,"Scan json;Project","UDF",1 -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,2,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4661,5,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;Scan json;Project","UDF",1 +"Spark shell","local-1651187225439","Not Recommended",1.0,355550.33,86.66,760,180,355637,253,87.88,"","JSON[string:bigint:int]","","","","",498,343411,120,60,1.52,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166205.19,9.8,911,283,166215,38,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,271,12,1.34,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,2,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4661,5,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index fa1f44f49..318ea6b28 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174006.51,1286.48,6695,20421,175293,2268,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13469,6952,2.31,false,"Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174006.51,1286.48,6695,20421,175293,2268,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13469,6952,2.31,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 78a44302b..17c155356 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.01,82304.74,1433.25,6760,21802,83738,2388,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.01,82304.74,1433.25,6760,21802,83738,2388,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 7fe283958..f300a45a5 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46542.98,520.01,1520,359,47063,817,67.64,"","Text[*]","","","","",1068,44935,166,193,2.75,false,"Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46542.98,520.01,1520,359,47063,817,67.64,"","Text[*]","","","","",1068,44935,166,193,2.75,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index a1b441a96..447c203c9 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1211,22 +1211,30 @@ class QualificationSuite extends BaseTestSuite { val filename = s"$outpath/rapids_4_spark_qualification_output/" + s"rapids_4_spark_qualification_output_unsupportedOperators.csv" + val stageDurationFile = s"$outpath/rapids_4_spark_qualification_output/" + + s"rapids_4_spark_qualification_output_unsupportedOperatorsStageDuration.csv" val inputSource = Source.fromFile(filename) + val unsupportedStageDuration = Source.fromFile(stageDurationFile) try { val lines = inputSource.getLines.toSeq // 1 for header, 1 for values val expLinesSize = if (ToolUtils.isSpark340OrLater()) { - 6 + 8 } else if (!ToolUtils.isSpark320OrLater()) { - 5 + 6 } else { - 5 + 7 } assert(lines.size == expLinesSize) assert(lines.head.contains("App ID,Unsupported Type,")) assert(lines(1).contains("\"Read\",\"JSON\",\"Types not supported - bigint:int\"")) + + val stageDurationLines = unsupportedStageDuration.getLines.toSeq + assert(stageDurationLines.head.contains("" + + "Stage Duration,App Duration,Recommendation")) + assert(stageDurationLines(1).contains("Not Recommended")) } finally { inputSource.close() }