From 319e25efd8b5f254706ef0572ccf1f122e281805 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 6 Dec 2023 14:59:53 -0800 Subject: [PATCH 1/8] Qualification tool: Add more information for unsupported operators Signed-off-by: Niranjan Artal --- .../tool/planparser/SQLPlanParser.scala | 7 +- .../tool/qualification/QualOutputWriter.scala | 181 ++++++++++++------ .../tool/qualification/Qualification.scala | 1 + .../qualification/QualificationAppInfo.scala | 41 ++-- 4 files changed, 151 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 96829df86..595ed7783 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -39,7 +39,9 @@ class ExecInfo( val children: Option[Seq[ExecInfo]], // only one level deep val stages: Set[Int] = Set.empty, val shouldRemove: Boolean = false, - val unsupportedExprs: Array[String] = Array.empty) { + val unsupportedExprs: Array[String] = Array.empty, + val dataSet: Boolean = false, + val udf: Boolean = false) { private def childrenToString = { val str = children.map { c => c.map(" " + _.toString).mkString("\n") @@ -301,13 +303,12 @@ object SQLPlanParser extends Logging { } val stagesInNode = getStagesInSQLNode(node, app) val supported = execInfos.isSupported && !ds && !containsUDF - // shouldRemove is set to true if the exec is a member of "execsToBeRemoved" or if the node // is a duplicate val removeFlag = execInfos.shouldRemove || isDupNode || execsToBeRemoved.contains(node.name) Seq(new ExecInfo(execInfos.sqlID, execInfos.exec, execInfos.expr, execInfos.speedupFactor, execInfos.duration, execInfos.nodeId, supported, execInfos.children, - stagesInNode, removeFlag, execInfos.unsupportedExprs)) + stagesInNode, removeFlag, execInfos.unsupportedExprs, ds, containsUDF)) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index d4eff43fc..52b74e9bc 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -161,6 +161,23 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, } } + def writeUnsupportedOperatorsDetailedStageCSVReport( + sums: Seq[QualificationSummaryInfo], + order: String): Unit = { + val csvFileWriter = new ToolTextFileWriter(outputDir, + s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperatorsStageDuration.csv", + "Unsupported Operators StageDuration CSV Report", hadoopConf) + val headersAndSizes = + QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums) + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false)) + sums.foreach { sum => + val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false) + rows.foreach(row => csvFileWriter.write(row)) + } + } + def writePerSqlCSVReport(sums: Seq[QualificationSummaryInfo], maxSQLDescLength: Int): Unit = { val csvFileWriter = new ToolTextFileWriter(outputDir, s"${QualOutputWriter.LOGFILE_NAME}_persql.csv", @@ -361,6 +378,7 @@ object QualOutputWriter { val SQL_DUR_STR = "SQL DF Duration" val TASK_DUR_STR = "SQL Dataframe Task Duration" val STAGE_DUR_STR = "Stage Task Duration" + val STAGE_WALLCLOCK_DUR_STR = "Stage Wall Clock Duration" val POT_PROBLEM_STR = "Potential Problems" val EXEC_CPU_PERCENT_STR = "Executor CPU Time Percent" val APP_DUR_ESTIMATED_STR = "App Duration Estimated" @@ -545,6 +563,18 @@ object QualOutputWriter { detailedHeaderAndFields } + def getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes( + appInfos: Seq[QualificationSummaryInfo]): LinkedHashMap[String, Int] = { + val detailedHeaderAndFields = LinkedHashMap[String, Int]( + APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos), + UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size, + STAGE_ID_STR -> STAGE_ID_STR.size, + STAGE_WALLCLOCK_DUR_STR -> STAGE_WALLCLOCK_DUR_STR.size, + APP_DUR_STR -> APP_DUR_STR.size, + SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE + ) + detailedHeaderAndFields + } def getDetailedHeaderStringsAndSizes(appInfos: Seq[QualificationSummaryInfo], reportReadSchema: Boolean): LinkedHashMap[String, Int] = { @@ -886,7 +916,7 @@ object QualOutputWriter { } } - def constructUnsupportedOperatorsInfo( + def constructUnsupportedStagesDurationInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], delimiter: String = TEXT_DELIMITER, @@ -895,81 +925,106 @@ object QualOutputWriter { val reformatCSVFunc: String => String = if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str) val appId = sumInfo.appId - val readFormat = sumInfo.readFileFormatAndTypesNotSupported - val writeFormat = sumInfo.writeDataFormat - val unsupportedExecs = sumInfo.unSupportedExecs - val unsupportedExprs = sumInfo.unSupportedExprs - val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap - val unsupportedOperatorsOutputRows = new ArrayBuffer[String]() + val appDuration = sumInfo.sparkSqlDFWallClockDuration + val recommendation = sumInfo.estimatedInfo.recommendation - if (readFormat.nonEmpty) { - val unsupportedReadFormatRows = readFormat.map { format => - val readFormatAndType = format.split("\\[") - val readFormat = readFormatAndType(0) - val readType = if (readFormatAndType.size > 1) { - s"Types not supported - ${readFormatAndType(1).replace("]", "")}" - } else { - "" - } - val data = ListBuffer( + sumInfo.stageInfo.collect { + case info if info.stageWallclockDuration > 0 && info.unsupportedExecs.nonEmpty => + val stageAppDuration = info.stageWallclockDuration + val unsupportedExecs = info.unsupportedExecs.mkString(";") + val data = ListBuffer[(String, Int)]( reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Read")-> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(readFormat) -> headersAndSizes(DETAILS), - reformatCSVFunc(readType) -> headersAndSizes(NOTES) + reformatCSVFunc(unsupportedExecs) -> headersAndSizes(UNSUPPORTED_TYPE), + info.stageId.toString -> headersAndSizes(STAGE_ID_STR), + stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), + appDuration.toString -> headersAndSizes(APP_DUR_STR), + recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR) ) constructOutputRow(data, delimiter, prettyPrint) - } - unsupportedOperatorsOutputRows ++= unsupportedReadFormatRows } - if (unsupportedExecs.nonEmpty) { - val unsupportedExecRows = unsupportedExecs.split(";").map { exec => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(exec) -> headersAndSizes(DETAILS), - reformatCSVFunc("") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) + } + + def constructUnsupportedOperatorsInfo( + sumInfo: QualificationSummaryInfo, + headersAndSizes: LinkedHashMap[String, Int], + delimiter: String = TEXT_DELIMITER, + prettyPrint: Boolean, + reformatCSV: Boolean = true): Seq[String] = { + + val reformatCSVFunc: String => String = + if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str) + + val appId = reformatCSVFunc(sumInfo.appId) + val unsupportedOperatorsOutputRows = new ArrayBuffer[String]() + val unsupportedExprs = sumInfo.unSupportedExprs + val allExecs = getAllExecsFromPlan(sumInfo.planInfo) + val dataSetExecs = allExecs.collect { case x if x.dataSet => x.exec } + val udfExecs = allExecs.collect { case x if x.udf => x.exec } + + def createUnsupportedRow(exec: String, execType: String, notes: String): String = { + val data = ListBuffer( + appId -> headersAndSizes(APP_ID_STR), + reformatCSVFunc(execType) -> headersAndSizes(UNSUPPORTED_TYPE), + reformatCSVFunc(exec) -> headersAndSizes(DETAILS), + reformatCSVFunc(notes) -> headersAndSizes(NOTES) + ) + constructOutputRow(data, delimiter, prettyPrint) + } + + val readFormatRows = sumInfo.readFileFormatAndTypesNotSupported.map { format => + val readFormatAndType = format.split("\\[") + val readFormat = readFormatAndType(0) + val readType = if (readFormatAndType.size > 1) { + s"Types not supported - ${readFormatAndType(1).replace("]", "")}" + } else { + "" } - unsupportedOperatorsOutputRows ++= unsupportedExecRows + createUnsupportedRow(readFormat,"Read", readType) } - if (unsupportedExecExprsMap.nonEmpty) { - val unsupportedExecExprMapRows = unsupportedExecExprsMap.map { case (exec, exprs) => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(exec) -> headersAndSizes(DETAILS), - reformatCSVFunc(s"$exec Exec is not supported as expressions are " + - s"not supported - `$exprs`") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) - }.toArray - unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows + unsupportedOperatorsOutputRows ++= readFormatRows + + // Unsupported Execs and Execs that are not supported due to unsupported expressions, or if + // the operation is from a dataset, or if the operation contains a UDF. + val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap + val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet + val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains) + val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x) + || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)) + val unsupportedExecRows = actualunsupportedExecs.map { exec => + createUnsupportedRow(exec, "Exec", "") } + unsupportedOperatorsOutputRows ++= unsupportedExecRows + + val unsupportedDatasetExecRows = dataSetExecs.map { exec => + createUnsupportedRow(exec, "Exec", s"$exec Exec is not supported as " + + s"this operation is from dataset which is not supported") + } + unsupportedOperatorsOutputRows ++= unsupportedDatasetExecRows + + val unsupportedUdfExecRows = udfExecs.map { exec => + createUnsupportedRow(exec, "Exec", s"$exec Exec is " + + s"not supported as it contains UDF which is not supported") + } + unsupportedOperatorsOutputRows ++= unsupportedUdfExecRows + + val unsupportedExecExprMapRows = sumInfo.unsupportedExecstoExprsMap.map { case (exec, exprs) => + createUnsupportedRow(exec, "Exec", s"$exec Exec is not" + + s" supported as expressions are not supported - `$exprs`") + }.toArray + unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows + if (unsupportedExprs.nonEmpty) { - val unsupportedExprRows = unsupportedExprs.split(";").map { expr => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Expression") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(expr) -> headersAndSizes(DETAILS), - reformatCSVFunc("") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) + val unsupportedExprRows = sumInfo.unSupportedExprs.split(";").map { expr => + createUnsupportedRow(expr, "Expression", "") } unsupportedOperatorsOutputRows ++= unsupportedExprRows } - if (writeFormat.nonEmpty) { - val unsupportedwriteFormatRows = writeFormat.map { format => - val data = ListBuffer( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc("Write") -> headersAndSizes(UNSUPPORTED_TYPE), - reformatCSVFunc(format) -> headersAndSizes(DETAILS), - reformatCSVFunc("") -> headersAndSizes(NOTES) - ) - constructOutputRow(data, delimiter, prettyPrint) - } - unsupportedOperatorsOutputRows ++= unsupportedwriteFormatRows + + val unsupportedWriteFormatRows = sumInfo.writeDataFormat.map { format => + createUnsupportedRow(format, "Write", "") } + unsupportedOperatorsOutputRows ++= unsupportedWriteFormatRows + unsupportedOperatorsOutputRows } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index e733d73c2..0286cd582 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -93,6 +93,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, qWriter.writeExecReport(allAppsSum, order) qWriter.writeStageReport(allAppsSum, order) qWriter.writeUnsupportedOperatorsCSVReport(allAppsSum, order) + qWriter.writeUnsupportedOperatorsDetailedStageCSVReport(allAppsSum, order) val appStatusResult = generateStatusSummary(appStatusReporter.asScala.values.toSeq) qWriter.writeStatusReport(appStatusResult, order) if (mlOpsEnabled) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 6847da083..9a237931f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -296,6 +296,7 @@ class QualificationAppInfo( val allSpeedupFactorAvg = SQLPlanParser.averageSpeedup(execInfos.map(_.speedupFactor)) val allFlattenedExecs = flattenedExecs(execInfos) val numUnsupported = allFlattenedExecs.filterNot(_.isSupported) + val unsupportedExecs = numUnsupported.map(_.exec) // if we have unsupported try to guess at how much time. For now divide // time by number of execs and give each one equal weight val eachExecTime = allStageTaskTime / allFlattenedExecs.size @@ -339,8 +340,13 @@ class QualificationAppInfo( eachStageUnsupported } + // Get stage info for the given stageId. + val stageInfos = stageIdToInfo.filterKeys { case (id, _) => id == stageId } + val wallclockStageDuration = stageInfos.values.map(x => x.duration.getOrElse(0L)).sum + StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, - finalEachStageUnsupported, numTransitions, transitionsTime, estimated) + finalEachStageUnsupported, numTransitions, transitionsTime, estimated, + wallclockStageDuration, unsupportedExecs) }.toSet } @@ -459,7 +465,7 @@ class QualificationAppInfo( c.filterNot(_.shouldRemove) } new ExecInfo(e.sqlID, e.exec, e.expr, e.speedupFactor, e.duration, - e.nodeId, e.isSupported, filteredChildren, e.stages, e.shouldRemove) + e.nodeId, e.isSupported, filteredChildren, e.stages, e.shouldRemove, e.unsupportedExprs) } val filteredPlanInfos = execFilteredChildren.filterNot(_.shouldRemove) p.copy(execInfo = filteredPlanInfos) @@ -590,7 +596,6 @@ class QualificationAppInfo( case x if !IgnoreExecs.getAllIgnoreExecs.contains(x.exec) => x.exec }.toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") - // Get all the unsupported Expressions from the plan val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap( _.unsupportedExprs)).flatten.filter(_.nonEmpty).toSet.mkString(";") @@ -639,6 +644,9 @@ class QualificationAppInfo( 1 } + val wallClockSqlDFToUse = QualificationAppInfo.wallClocksqlDataFrameToUse( + sparkSQLDFWallClockDuration, appDuration) + val estimatedInfo = QualificationAppInfo.calculateEstimatedInfoSummary(estimatedGPURatio, sparkSQLDFWallClockDuration, appDuration, taskSpeedupFactor, appName, appId, sqlIdsWithFailures.nonEmpty, mlSpeedup, unSupportedExecs, unSupportedExprs, @@ -649,8 +657,8 @@ class QualificationAppInfo( notSupportFormatAndTypesString, getAllReadFileFormats, writeFormat, allComplexTypes, nestedComplexTypes, longestSQLDuration, sqlDataframeTaskDuration, nonSQLTaskDuration, unsupportedSQLTaskDuration, supportedSQLTaskDuration, - taskSpeedupFactor, info.sparkUser, info.startTime, origPlanInfos, - perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos, + taskSpeedupFactor, info.sparkUser, info.startTime, wallClockSqlDFToUse, + origPlanInfos, perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos, unSupportedExecs, unSupportedExprs, clusterTags, allClusterTagsMap, mlFunctions, mlTotalStageDuration, unsupportedExecExprsMap) } @@ -864,6 +872,7 @@ case class QualificationSummaryInfo( taskSpeedupFactor: Double, user: String, startTime: Long, + sparkSqlDFWallClockDuration: Long, planInfo: Seq[PlanInfo], stageInfo: Seq[StageQualSummaryInfo], estimatedInfo: EstimatedAppInfo, @@ -884,7 +893,9 @@ case class StageQualSummaryInfo( unsupportedTaskDur: Long, numTransitions: Int, transitionTime: Long, - estimated: Boolean = false) + estimated: Boolean = false, + stageWallclockDuration: Long = 0, + unsupportedExecs: Seq[String] = Seq.empty) object QualificationAppInfo extends Logging { // define recommendation constants @@ -929,19 +940,23 @@ object QualificationAppInfo extends Logging { } } + def wallClocksqlDataFrameToUse(sqlDataFrameDuration: Long, appDuration: Long): Long = { + if (sqlDataFrameDuration > appDuration) { + // our app duration is shorter than our sql duration, estimate the sql duration down + // to app duration + appDuration + } else { + sqlDataFrameDuration + } + } + // Summarize and estimate based on wall clock times def calculateEstimatedInfoSummary(estimatedRatio: Double, sqlDataFrameDuration: Long, appDuration: Long, sqlSpeedupFactor: Double, appName: String, appId: String, hasFailures: Boolean, mlSpeedupFactor: Option[MLFuncsSpeedupAndDuration] = None, unsupportedExecs: String = "", unsupportedExprs: String = "", allClusterTagsMap: Map[String, String] = Map.empty[String, String]): EstimatedAppInfo = { - val sqlDataFrameDurationToUse = if (sqlDataFrameDuration > appDuration) { - // our app duration is shorter then our sql duration, estimate the sql duration down - // to app duration - appDuration - } else { - sqlDataFrameDuration - } + val sqlDataFrameDurationToUse = wallClocksqlDataFrameToUse(sqlDataFrameDuration, appDuration) // get the average speedup and duration for ML funcs supported on GPU val (mlSpeedup, mlDuration) = if (mlSpeedupFactor.isDefined) { From de8111f9d7effb50d934e5d2fcda7afd4df8a7e4 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 6 Dec 2023 16:13:21 -0800 Subject: [PATCH 2/8] update test --- .../rapids/tool/qualification/QualificationSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 08c0fead2..1bd9f605f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1150,7 +1150,10 @@ class QualificationSuite extends BaseTestSuite { val filename = s"$outpath/rapids_4_spark_qualification_output/" + s"rapids_4_spark_qualification_output_unsupportedOperators.csv" + val stageDurationFile = s"$outpath/rapids_4_spark_qualification_output/" + + s"rapids_4_spark_qualification_output_unsupportedOperatorsStageDuration.csv" val inputSource = Source.fromFile(filename) + val unsupportedStageDuration = Source.fromFile(stageDurationFile) try { val lines = inputSource.getLines.toSeq // 1 for header, 1 for values @@ -1166,6 +1169,11 @@ class QualificationSuite extends BaseTestSuite { assert(lines.size == expLinesSize) assert(lines.head.contains("App ID,Unsupported Type,")) assert(lines(1).contains("\"Read\",\"JSON\",\"Types not supported - bigint:int\"")) + + val stageDurationLines = unsupportedStageDuration.getLines.toSeq + assert(stageDurationLines.head.contains("" + + "Stage Wall Clock Duration,App Duration,Recommendation")) + assert(stageDurationLines(1).contains("Not Recommended")) } finally { inputSource.close() } From 246fe4c3487deea553e53055b346a1506a5f79b2 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 7 Dec 2023 18:22:28 -0800 Subject: [PATCH 3/8] addressed review comments --- .../tool/qualification/QualOutputWriter.scala | 40 +++++++++++-------- .../qualification/QualificationAppInfo.scala | 16 +++----- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 52b74e9bc..bdd97cfa6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -151,13 +151,17 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, val csvFileWriter = new ToolTextFileWriter(outputDir, s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperators.csv", "Unsupported Operators CSV Report", hadoopConf) - val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums) - csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, - QualOutputWriter.CSV_DELIMITER, false)) - sums.foreach { sum => - val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes, - QualOutputWriter.CSV_DELIMITER, false) - rows.foreach(row => csvFileWriter.write(row)) + try { + val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums) + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false)) + sums.foreach { sum => + val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false) + rows.foreach(row => csvFileWriter.write(row)) + } + } finally { + csvFileWriter.close() } } @@ -167,14 +171,18 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, val csvFileWriter = new ToolTextFileWriter(outputDir, s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperatorsStageDuration.csv", "Unsupported Operators StageDuration CSV Report", hadoopConf) - val headersAndSizes = - QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums) - csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, - QualOutputWriter.CSV_DELIMITER, false)) - sums.foreach { sum => - val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes, - QualOutputWriter.CSV_DELIMITER, false) - rows.foreach(row => csvFileWriter.write(row)) + try { + val headersAndSizes = + QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums) + csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false)) + sums.foreach { sum => + val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes, + QualOutputWriter.CSV_DELIMITER, false) + rows.foreach(row => csvFileWriter.write(row)) + } + } finally { + csvFileWriter.close() } } @@ -929,7 +937,7 @@ object QualOutputWriter { val recommendation = sumInfo.estimatedInfo.recommendation sumInfo.stageInfo.collect { - case info if info.stageWallclockDuration > 0 && info.unsupportedExecs.nonEmpty => + case info if info.unsupportedExecs.nonEmpty => val stageAppDuration = info.stageWallclockDuration val unsupportedExecs = info.unsupportedExecs.mkString(";") val data = ListBuffer[(String, Int)]( diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 9a237931f..a691ba3b9 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -644,7 +644,7 @@ class QualificationAppInfo( 1 } - val wallClockSqlDFToUse = QualificationAppInfo.wallClocksqlDataFrameToUse( + val wallClockSqlDFToUse = QualificationAppInfo.wallClockSqlDataFrameToUse( sparkSQLDFWallClockDuration, appDuration) val estimatedInfo = QualificationAppInfo.calculateEstimatedInfoSummary(estimatedGPURatio, @@ -940,14 +940,10 @@ object QualificationAppInfo extends Logging { } } - def wallClocksqlDataFrameToUse(sqlDataFrameDuration: Long, appDuration: Long): Long = { - if (sqlDataFrameDuration > appDuration) { - // our app duration is shorter than our sql duration, estimate the sql duration down - // to app duration - appDuration - } else { - sqlDataFrameDuration - } + def wallClockSqlDataFrameToUse(sqlDataFrameDuration: Long, appDuration: Long): Long = { + // If our app duration is shorter than our sql duration, estimate the sql duration down + // to app duration + math.min(sqlDataFrameDuration, appDuration) } // Summarize and estimate based on wall clock times @@ -956,7 +952,7 @@ object QualificationAppInfo extends Logging { hasFailures: Boolean, mlSpeedupFactor: Option[MLFuncsSpeedupAndDuration] = None, unsupportedExecs: String = "", unsupportedExprs: String = "", allClusterTagsMap: Map[String, String] = Map.empty[String, String]): EstimatedAppInfo = { - val sqlDataFrameDurationToUse = wallClocksqlDataFrameToUse(sqlDataFrameDuration, appDuration) + val sqlDataFrameDurationToUse = wallClockSqlDataFrameToUse(sqlDataFrameDuration, appDuration) // get the average speedup and duration for ML funcs supported on GPU val (mlSpeedup, mlDuration) = if (mlSpeedupFactor.isDefined) { From 9a649c0b0cdb1c2c913c11b273f0e2355d17bbd7 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 12 Dec 2023 12:14:55 -0800 Subject: [PATCH 4/8] add some more expressions in ignore list, update output format --- .../tool/planparser/SQLPlanParser.scala | 3 +- .../tool/qualification/QualOutputWriter.scala | 29 ++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 595ed7783..b809d9521 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -78,7 +78,8 @@ object SQLPlanParser extends Logging { val windowFunctionPattern = """(\w+)\(""".r - val ignoreExpressions = Array("any", "cast", "decimal", "decimaltype", "every", "some", + val ignoreExpressions = Array("any", "cast", "ansi_cast", "decimal", "decimaltype", "every", + "some", "merge_max", "merge_min", "merge_sum", "merge_count", "merge_avg", "merge_first", "list", // current_database does not cause any CPU fallbacks "current_database", diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index bdd97cfa6..fd6dabb42 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER} import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.rapids.tool.ToolUtils +import org.apache.spark.sql.rapids.tool.{IgnoreExecs, ToolUtils} import org.apache.spark.sql.rapids.tool.qualification.{EstimatedPerSQLSummaryInfo, EstimatedSummaryInfo, QualificationAppInfo, QualificationSummaryInfo, StatusSummaryInfo} import org.apache.spark.sql.rapids.tool.util._ @@ -939,19 +939,26 @@ object QualOutputWriter { sumInfo.stageInfo.collect { case info if info.unsupportedExecs.nonEmpty => val stageAppDuration = info.stageWallclockDuration - val unsupportedExecs = info.unsupportedExecs.mkString(";") - val data = ListBuffer[(String, Int)]( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc(unsupportedExecs) -> headersAndSizes(UNSUPPORTED_TYPE), - info.stageId.toString -> headersAndSizes(STAGE_ID_STR), - stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), - appDuration.toString -> headersAndSizes(APP_DUR_STR), - recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR) - ) - constructOutputRow(data, delimiter, prettyPrint) + val filteredUnsupportedExecs = info.unsupportedExecs.filterNot( + x => IgnoreExecs.getAllIgnoreExecs.contains(x)) + val unsupportedExecsStr = filteredUnsupportedExecs.mkString(";") + if (unsupportedExecsStr.nonEmpty) { + val data = ListBuffer[(String, Int)]( + reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), + reformatCSVFunc(unsupportedExecsStr) -> headersAndSizes(UNSUPPORTED_TYPE), + info.stageId.toString -> headersAndSizes(STAGE_ID_STR), + stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), + appDuration.toString -> headersAndSizes(APP_DUR_STR), + recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR) + ) + constructOutputRow(data, delimiter, prettyPrint) + } else { + "" + } } } + def constructUnsupportedOperatorsInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], From ef42dc4545201abb2f987ceadfa829ee57f71bf6 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 12 Dec 2023 16:25:35 -0800 Subject: [PATCH 5/8] update output format --- .../tool/qualification/QualOutputWriter.scala | 59 ++++++++++++------- .../spark/sql/rapids/tool/ToolUtils.scala | 21 ++++++- .../qualification/QualificationAppInfo.scala | 4 +- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index fd6dabb42..1661c9039 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -386,7 +386,7 @@ object QualOutputWriter { val SQL_DUR_STR = "SQL DF Duration" val TASK_DUR_STR = "SQL Dataframe Task Duration" val STAGE_DUR_STR = "Stage Task Duration" - val STAGE_WALLCLOCK_DUR_STR = "Stage Wall Clock Duration" + val STAGE_WALLCLOCK_DUR_STR = "Stage Duration" val POT_PROBLEM_STR = "Potential Problems" val EXEC_CPU_PERCENT_STR = "Executor CPU Time Percent" val APP_DUR_ESTIMATED_STR = "App Duration Estimated" @@ -426,6 +426,7 @@ object QualOutputWriter { val UNSUPPORTED_TYPE = "Unsupported Type" val DETAILS = "Details" val NOTES = "Notes" + val IGNORE_OPERATOR = "Ignore Operator" val RUN_NAME = "RunName" val ESTIMATED_FREQUENCY = "Estimated Job Frequency (monthly)" val ML_FUNCTIONS = "ML Functions" @@ -566,7 +567,8 @@ object QualOutputWriter { APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos), UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size, DETAILS -> DETAILS.size, - NOTES -> NOTES.size + NOTES -> NOTES.size, + IGNORE_OPERATOR -> IGNORE_OPERATOR.size ) detailedHeaderAndFields } @@ -579,7 +581,8 @@ object QualOutputWriter { STAGE_ID_STR -> STAGE_ID_STR.size, STAGE_WALLCLOCK_DUR_STR -> STAGE_WALLCLOCK_DUR_STR.size, APP_DUR_STR -> APP_DUR_STR.size, - SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE + SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE, + IGNORE_OPERATOR -> IGNORE_OPERATOR.size ) detailedHeaderAndFields } @@ -939,26 +942,33 @@ object QualOutputWriter { sumInfo.stageInfo.collect { case info if info.unsupportedExecs.nonEmpty => val stageAppDuration = info.stageWallclockDuration - val filteredUnsupportedExecs = info.unsupportedExecs.filterNot( - x => IgnoreExecs.getAllIgnoreExecs.contains(x)) - val unsupportedExecsStr = filteredUnsupportedExecs.mkString(";") - if (unsupportedExecsStr.nonEmpty) { - val data = ListBuffer[(String, Int)]( - reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), - reformatCSVFunc(unsupportedExecsStr) -> headersAndSizes(UNSUPPORTED_TYPE), - info.stageId.toString -> headersAndSizes(STAGE_ID_STR), - stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), - appDuration.toString -> headersAndSizes(APP_DUR_STR), - recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR) - ) - constructOutputRow(data, delimiter, prettyPrint) - } else { + val allUnsupportedExecs = info.unsupportedExecs + if (allUnsupportedExecs.nonEmpty) { + allUnsupportedExecs.map { unsupportedExecsStr => + val ignoreUnsupportedExec = if ( + IgnoreExecs.getAllIgnoreExecs.contains(unsupportedExecsStr)) { + IgnoreExecs.True + } else { + IgnoreExecs.False + } + val data = ListBuffer[(String, Int)]( + reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR), + reformatCSVFunc(unsupportedExecsStr) -> headersAndSizes(UNSUPPORTED_TYPE), + info.stageId.toString -> headersAndSizes(STAGE_ID_STR), + stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR), + appDuration.toString -> headersAndSizes(APP_DUR_STR), + recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR), + ignoreUnsupportedExec -> headersAndSizes(IGNORE_OPERATOR) + ) + constructOutputRow(data, delimiter, prettyPrint) + }.mkString + } + else { "" } } } - def constructUnsupportedOperatorsInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], @@ -976,12 +986,14 @@ object QualOutputWriter { val dataSetExecs = allExecs.collect { case x if x.dataSet => x.exec } val udfExecs = allExecs.collect { case x if x.udf => x.exec } - def createUnsupportedRow(exec: String, execType: String, notes: String): String = { + def createUnsupportedRow(exec: String, execType: String, notes: String, + ignoreOperator: String = IgnoreExecs.False): String = { val data = ListBuffer( appId -> headersAndSizes(APP_ID_STR), reformatCSVFunc(execType) -> headersAndSizes(UNSUPPORTED_TYPE), reformatCSVFunc(exec) -> headersAndSizes(DETAILS), - reformatCSVFunc(notes) -> headersAndSizes(NOTES) + reformatCSVFunc(notes) -> headersAndSizes(NOTES), + reformatCSVFunc(ignoreOperator) -> headersAndSizes(IGNORE_OPERATOR) ) constructOutputRow(data, delimiter, prettyPrint) } @@ -1006,7 +1018,12 @@ object QualOutputWriter { val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x) || udfExecs.contains(x) || unsupportedExecExprsMap.contains(x)) val unsupportedExecRows = actualunsupportedExecs.map { exec => - createUnsupportedRow(exec, "Exec", "") + // If the exec is in the ignore list, then set the ignore operator to true. + if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) { + createUnsupportedRow(exec, "Exec", "", IgnoreExecs.True) + } else { + createUnsupportedRow(exec, "Exec", "", IgnoreExecs.False) + } } unsupportedOperatorsOutputRows ++= unsupportedExecRows diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index c84261c18..8d7c0b88c 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -323,12 +323,29 @@ object IgnoreExecs { // Collect Limit replacement can be slower on the GPU. Disabled by default. private val CollectLimit = "CollectLimit" private val ScanExistingRDD = "Scan ExistingRDD" - private val ExecuteCreateViewCommand = "Execute CreateViewCommand" private val ExistingRDD = "ExistingRDD" + // Some DDL's and table commands which can be ignored + private val ExecuteCreateViewCommand = "Execute CreateViewCommand" private val LocalTableScan = "LocalTableScan" + private val ExecuteCreateDatabaseCommand = "Execute CreateDatabaseCommand" + private val ExecuteDropDatabaseCommand = "Execute DropDatabaseCommand" + private val ExecuteCreateTableAsSelectCommand = "Execute CreateTableAsSelectCommand" + private val ExecuteCreateTableCommand = "Execute CreateTableCommand" + private val ExecuteDropTableCommand = "Execute DropTableCommand" + private val ExecuteCreateDataSourceTableAsSelectCommand = "Execute " + + "CreateDataSourceTableAsSelectCommand" + private val SetCatalogAndNamespace = "SetCatalogAndNamespace" + private val SetCommand = "Set Command" + + + val True = "true" + val False = "false" def getAllIgnoreExecs: Set[String] = Set(AdaptiveSparkPlan, CollectLimit, ScanExistingRDD, - ExecuteCreateViewCommand, ExistingRDD, LocalTableScan) + ExecuteCreateViewCommand, ExistingRDD, LocalTableScan, ExecuteCreateTableCommand, + ExecuteDropTableCommand, ExecuteCreateDatabaseCommand, ExecuteDropDatabaseCommand, + ExecuteCreateTableAsSelectCommand, ExecuteCreateDataSourceTableAsSelectCommand, + SetCatalogAndNamespace) } object MlOps { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index a691ba3b9..a58d1e646 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -592,9 +592,7 @@ class QualificationAppInfo( e.children.map(x => x.filterNot(_.isSupported)) }.flatten topLevelExecs ++ childrenExecs - }.collect { - case x if !IgnoreExecs.getAllIgnoreExecs.contains(x.exec) => x.exec - }.toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") + }.map(_.exec).toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":") // Get all the unsupported Expressions from the plan val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap( From b65ce914bbc64b488aac5be2243dae0545f0e1e6 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 12 Dec 2023 18:02:56 -0800 Subject: [PATCH 6/8] update test results --- .../QualificationExpectations/complex_dec_expectation.csv | 2 +- .../QualificationExpectations/jdbc_expectation.csv | 2 +- .../nds_q86_fail_test_expectation.csv | 2 +- .../nds_q86_test_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 6 +++--- .../QualificationExpectations/read_dsv1_expectation.csv | 2 +- .../QualificationExpectations/read_dsv2_expectation.csv | 2 +- .../QualificationExpectations/spark2_expectation.csv | 2 +- .../rapids/tool/qualification/QualificationSuite.scala | 8 ++++---- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv index a80462cbc..753bdcacf 100644 --- a/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/complex_dec_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"","",30 +"Spark shell","local-1626104300434","Not Recommended",1.0,129898.52,1205.47,2429,1469,131104,1923,88.35,"","","","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map;map>;map>;array>;array","struct,lastname:string>;struct,previous:struct,city:string>>;array>;map>;map>;array>","NESTED COMPLEX TYPE",1260,128847,306,1163,2.68,false,"CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index c4f57a9c2..7dbea470a 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1]","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569439.65,2527.34,3627,19894,571967,3470,28.41,"","JDBC[*]","","","","",1812,544575,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv index 6943e0418..84fe8ae97 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19230.84,6940.15,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv index 5637f2123..3cd646abf 100644 --- a/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19230.84,6940.15,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.64,false,"","",30 +"TPC-DS Like Bench q86","app-20210319163812-1778","Recommended",1.36,19230.84,6940.15,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,3.64,false,"Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index d629f3206..e41039a69 100644 --- a/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) "Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8488.68,7830.31,12434,132257,16319,10577,37.7,"","","JSON","","","",7143,4717,19744,112513,3.85,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1 -"Spark shell","local-1651187225439","Not Recommended",1.0,355550.33,86.66,760,180,355637,253,87.88,"","JSON[string:bigint:int]","","","","",498,343411,120,60,1.52,false,"SerializeFromObject;DeserializeToObject;Scan json;Filter;MapElements","",1 -"Spark shell","local-1651188809790","Not Recommended",1.0,166205.19,9.8,911,283,166215,38,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,271,12,1.34,false,"Scan json;Project","UDF",1 -"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,2,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4661,5,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;Scan json;Project","UDF",1 +"Spark shell","local-1651187225439","Not Recommended",1.0,355550.33,86.66,760,180,355637,253,87.88,"","JSON[string:bigint:int]","","","","",498,343411,120,60,1.52,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1 +"Spark shell","local-1651188809790","Not Recommended",1.0,166205.19,9.8,911,283,166215,38,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,271,12,1.34,false,"CollectLimit;Scan json;Project","UDF",1 +"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,2,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4661,5,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index fa1f44f49..318ea6b28 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174006.51,1286.48,6695,20421,175293,2268,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13469,6952,2.31,false,"Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174006.51,1286.48,6695,20421,175293,2268,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,175953,13469,6952,2.31,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 78a44302b..17c155356 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.01,82304.74,1433.25,6760,21802,83738,2388,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.01,82304.74,1433.25,6760,21802,83738,2388,71.3,"","Text[*];json[double]","JSON","","","",1984,82601,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 diff --git a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv index 7fe283958..f300a45a5 100644 --- a/core/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1634253215009","Not Recommended",1.01,46542.98,520.01,1520,359,47063,817,67.64,"","Text[*]","","","","",1068,44935,166,193,2.75,false,"Scan text","",30 +"Spark shell","local-1634253215009","Not Recommended",1.01,46542.98,520.01,1520,359,47063,817,67.64,"","Text[*]","","","","",1068,44935,166,193,2.75,false,"CollectLimit;Scan text","",30 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 1bd9f605f..fd0efa25b 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1160,11 +1160,11 @@ class QualificationSuite extends BaseTestSuite { val expLinesSize = if (ToolUtils.isSpark340OrLater()) { - 6 + 8 } else if (!ToolUtils.isSpark320OrLater()) { - 5 + 6 } else { - 5 + 7 } assert(lines.size == expLinesSize) assert(lines.head.contains("App ID,Unsupported Type,")) @@ -1172,7 +1172,7 @@ class QualificationSuite extends BaseTestSuite { val stageDurationLines = unsupportedStageDuration.getLines.toSeq assert(stageDurationLines.head.contains("" + - "Stage Wall Clock Duration,App Duration,Recommendation")) + "Stage Duration,App Duration,Recommendation")) assert(stageDurationLines(1).contains("Not Recommended")) } finally { inputSource.close() From d31d714979369659c33239bda111c5f17c2b315b Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 13 Dec 2023 10:56:02 -0800 Subject: [PATCH 7/8] update ignore exec name --- .../scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 8d7c0b88c..4fa388ec4 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -335,7 +335,7 @@ object IgnoreExecs { private val ExecuteCreateDataSourceTableAsSelectCommand = "Execute " + "CreateDataSourceTableAsSelectCommand" private val SetCatalogAndNamespace = "SetCatalogAndNamespace" - private val SetCommand = "Set Command" + private val ExecuteSetCommand = "Execute SetCommand" val True = "true" @@ -345,7 +345,7 @@ object IgnoreExecs { ExecuteCreateViewCommand, ExistingRDD, LocalTableScan, ExecuteCreateTableCommand, ExecuteDropTableCommand, ExecuteCreateDatabaseCommand, ExecuteDropDatabaseCommand, ExecuteCreateTableAsSelectCommand, ExecuteCreateDataSourceTableAsSelectCommand, - SetCatalogAndNamespace) + SetCatalogAndNamespace, ExecuteSetCommand) } object MlOps { From 8946b07e5829c357463be768ce93016d7f461ebe Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 14 Dec 2023 14:23:00 -0800 Subject: [PATCH 8/8] add comment for ignore operator --- .../spark/rapids/tool/qualification/QualOutputWriter.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 1661c9039..bd1b91654 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -945,6 +945,9 @@ object QualOutputWriter { val allUnsupportedExecs = info.unsupportedExecs if (allUnsupportedExecs.nonEmpty) { allUnsupportedExecs.map { unsupportedExecsStr => + // Ignore operator is a boolean value which indicates if the operator should be + // considered for GPU acceleration or not. If the value is true, the operator will + // be ignored. val ignoreUnsupportedExec = if ( IgnoreExecs.getAllIgnoreExecs.contains(unsupportedExecsStr)) { IgnoreExecs.True