Skip to content

Commit

Permalink
Qualification tool: Add more information for unsupported operators (#680
Browse files Browse the repository at this point in the history
)

* Qualification tool: Add more information for unsupported operators

Signed-off-by: Niranjan Artal <[email protected]>

* update test

* addressed review comments

* add some more expressions in ignore list, update output format

* update output format

* update test results

* update ignore exec name

* add comment for ignore operator

---------

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 authored Dec 15, 2023
1 parent 96300cd commit b54ae84
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class ExecInfo(
val children: Option[Seq[ExecInfo]], // only one level deep
val stages: Set[Int] = Set.empty,
val shouldRemove: Boolean = false,
val unsupportedExprs: Array[String] = Array.empty) {
val unsupportedExprs: Array[String] = Array.empty,
val dataSet: Boolean = false,
val udf: Boolean = false) {
private def childrenToString = {
val str = children.map { c =>
c.map(" " + _.toString).mkString("\n")
Expand Down Expand Up @@ -76,7 +78,8 @@ object SQLPlanParser extends Logging {

val windowFunctionPattern = """(\w+)\(""".r

val ignoreExpressions = Array("any", "cast", "decimal", "decimaltype", "every", "some",
val ignoreExpressions = Array("any", "cast", "ansi_cast", "decimal", "decimaltype", "every",
"some", "merge_max", "merge_min", "merge_sum", "merge_count", "merge_avg", "merge_first",
"list",
// current_database does not cause any CPU fallbacks
"current_database",
Expand Down Expand Up @@ -301,13 +304,12 @@ object SQLPlanParser extends Logging {
}
val stagesInNode = getStagesInSQLNode(node, app)
val supported = execInfos.isSupported && !ds && !containsUDF

// shouldRemove is set to true if the exec is a member of "execsToBeRemoved" or if the node
// is a duplicate
val removeFlag = execInfos.shouldRemove || isDupNode || execsToBeRemoved.contains(node.name)
Seq(new ExecInfo(execInfos.sqlID, execInfos.exec, execInfos.expr, execInfos.speedupFactor,
execInfos.duration, execInfos.nodeId, supported, execInfos.children,
stagesInNode, removeFlag, execInfos.unsupportedExprs))
stagesInNode, removeFlag, execInfos.unsupportedExprs, ds, containsUDF))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.{IgnoreExecs, ToolUtils}
import org.apache.spark.sql.rapids.tool.qualification.{EstimatedPerSQLSummaryInfo, EstimatedSummaryInfo, QualificationAppInfo, QualificationSummaryInfo, StatusSummaryInfo}
import org.apache.spark.sql.rapids.tool.util._

Expand Down Expand Up @@ -151,13 +151,38 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean,
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperators.csv",
"Unsupported Operators CSV Report", hadoopConf)
val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
try {
val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
}
} finally {
csvFileWriter.close()
}
}

def writeUnsupportedOperatorsDetailedStageCSVReport(
sums: Seq[QualificationSummaryInfo],
order: String): Unit = {
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperatorsStageDuration.csv",
"Unsupported Operators StageDuration CSV Report", hadoopConf)
try {
val headersAndSizes =
QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
}
} finally {
csvFileWriter.close()
}
}

Expand Down Expand Up @@ -361,6 +386,7 @@ object QualOutputWriter {
val SQL_DUR_STR = "SQL DF Duration"
val TASK_DUR_STR = "SQL Dataframe Task Duration"
val STAGE_DUR_STR = "Stage Task Duration"
val STAGE_WALLCLOCK_DUR_STR = "Stage Duration"
val POT_PROBLEM_STR = "Potential Problems"
val EXEC_CPU_PERCENT_STR = "Executor CPU Time Percent"
val APP_DUR_ESTIMATED_STR = "App Duration Estimated"
Expand Down Expand Up @@ -400,6 +426,7 @@ object QualOutputWriter {
val UNSUPPORTED_TYPE = "Unsupported Type"
val DETAILS = "Details"
val NOTES = "Notes"
val IGNORE_OPERATOR = "Ignore Operator"
val RUN_NAME = "RunName"
val ESTIMATED_FREQUENCY = "Estimated Job Frequency (monthly)"
val ML_FUNCTIONS = "ML Functions"
Expand Down Expand Up @@ -540,11 +567,25 @@ object QualOutputWriter {
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size,
DETAILS -> DETAILS.size,
NOTES -> NOTES.size
NOTES -> NOTES.size,
IGNORE_OPERATOR -> IGNORE_OPERATOR.size
)
detailedHeaderAndFields
}

def getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(
appInfos: Seq[QualificationSummaryInfo]): LinkedHashMap[String, Int] = {
val detailedHeaderAndFields = LinkedHashMap[String, Int](
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size,
STAGE_ID_STR -> STAGE_ID_STR.size,
STAGE_WALLCLOCK_DUR_STR -> STAGE_WALLCLOCK_DUR_STR.size,
APP_DUR_STR -> APP_DUR_STR.size,
SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE,
IGNORE_OPERATOR -> IGNORE_OPERATOR.size
)
detailedHeaderAndFields
}

def getDetailedHeaderStringsAndSizes(appInfos: Seq[QualificationSummaryInfo],
reportReadSchema: Boolean): LinkedHashMap[String, Int] = {
Expand Down Expand Up @@ -886,7 +927,7 @@ object QualOutputWriter {
}
}

def constructUnsupportedOperatorsInfo(
def constructUnsupportedStagesDurationInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
Expand All @@ -895,81 +936,130 @@ object QualOutputWriter {
val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)
val appId = sumInfo.appId
val readFormat = sumInfo.readFileFormatAndTypesNotSupported
val writeFormat = sumInfo.writeDataFormat
val unsupportedExecs = sumInfo.unSupportedExecs
val unsupportedExprs = sumInfo.unSupportedExprs
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()
val appDuration = sumInfo.sparkSqlDFWallClockDuration
val recommendation = sumInfo.estimatedInfo.recommendation

if (readFormat.nonEmpty) {
val unsupportedReadFormatRows = readFormat.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
sumInfo.stageInfo.collect {
case info if info.unsupportedExecs.nonEmpty =>
val stageAppDuration = info.stageWallclockDuration
val allUnsupportedExecs = info.unsupportedExecs
if (allUnsupportedExecs.nonEmpty) {
allUnsupportedExecs.map { unsupportedExecsStr =>
// Ignore operator is a boolean value which indicates if the operator should be
// considered for GPU acceleration or not. If the value is true, the operator will
// be ignored.
val ignoreUnsupportedExec = if (
IgnoreExecs.getAllIgnoreExecs.contains(unsupportedExecsStr)) {
IgnoreExecs.True
} else {
IgnoreExecs.False
}
val data = ListBuffer[(String, Int)](
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc(unsupportedExecsStr) -> headersAndSizes(UNSUPPORTED_TYPE),
info.stageId.toString -> headersAndSizes(STAGE_ID_STR),
stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR),
appDuration.toString -> headersAndSizes(APP_DUR_STR),
recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR),
ignoreUnsupportedExec -> headersAndSizes(IGNORE_OPERATOR)
)
constructOutputRow(data, delimiter, prettyPrint)
}.mkString
}
else {
""
}
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Read")-> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(readFormat) -> headersAndSizes(DETAILS),
reformatCSVFunc(readType) -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
}

def constructUnsupportedOperatorsInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
prettyPrint: Boolean,
reformatCSV: Boolean = true): Seq[String] = {

val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)

val appId = reformatCSVFunc(sumInfo.appId)
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()
val unsupportedExprs = sumInfo.unSupportedExprs
val allExecs = getAllExecsFromPlan(sumInfo.planInfo)
val dataSetExecs = allExecs.collect { case x if x.dataSet => x.exec }
val udfExecs = allExecs.collect { case x if x.udf => x.exec }

def createUnsupportedRow(exec: String, execType: String, notes: String,
ignoreOperator: String = IgnoreExecs.False): String = {
val data = ListBuffer(
appId -> headersAndSizes(APP_ID_STR),
reformatCSVFunc(execType) -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc(notes) -> headersAndSizes(NOTES),
reformatCSVFunc(ignoreOperator) -> headersAndSizes(IGNORE_OPERATOR)
)
constructOutputRow(data, delimiter, prettyPrint)
}

val readFormatRows = sumInfo.readFileFormatAndTypesNotSupported.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
""
}
unsupportedOperatorsOutputRows ++= unsupportedReadFormatRows
createUnsupportedRow(readFormat,"Read", readType)
}
if (unsupportedExecs.nonEmpty) {
val unsupportedExecRows = unsupportedExecs.split(";").map { exec =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
unsupportedOperatorsOutputRows ++= readFormatRows

// Unsupported Execs and Execs that are not supported due to unsupported expressions, or if
// the operation is from a dataset, or if the operation contains a UDF.
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet
val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains)
val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x)
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x))
val unsupportedExecRows = actualunsupportedExecs.map { exec =>
// If the exec is in the ignore list, then set the ignore operator to true.
if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) {
createUnsupportedRow(exec, "Exec", "", IgnoreExecs.True)
} else {
createUnsupportedRow(exec, "Exec", "", IgnoreExecs.False)
}
unsupportedOperatorsOutputRows ++= unsupportedExecRows
}
if (unsupportedExecExprsMap.nonEmpty) {
val unsupportedExecExprMapRows = unsupportedExecExprsMap.map { case (exec, exprs) =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc(s"$exec Exec is not supported as expressions are " +
s"not supported - `$exprs`") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows
unsupportedOperatorsOutputRows ++= unsupportedExecRows

val unsupportedDatasetExecRows = dataSetExecs.map { exec =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is not supported as " +
s"this operation is from dataset which is not supported")
}
unsupportedOperatorsOutputRows ++= unsupportedDatasetExecRows

val unsupportedUdfExecRows = udfExecs.map { exec =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is " +
s"not supported as it contains UDF which is not supported")
}
unsupportedOperatorsOutputRows ++= unsupportedUdfExecRows

val unsupportedExecExprMapRows = sumInfo.unsupportedExecstoExprsMap.map { case (exec, exprs) =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is not" +
s" supported as expressions are not supported - `$exprs`")
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows

if (unsupportedExprs.nonEmpty) {
val unsupportedExprRows = unsupportedExprs.split(";").map { expr =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Expression") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(expr) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
val unsupportedExprRows = sumInfo.unSupportedExprs.split(";").map { expr =>
createUnsupportedRow(expr, "Expression", "")
}
unsupportedOperatorsOutputRows ++= unsupportedExprRows
}
if (writeFormat.nonEmpty) {
val unsupportedwriteFormatRows = writeFormat.map { format =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Write") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(format) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedwriteFormatRows

val unsupportedWriteFormatRows = sumInfo.writeDataFormat.map { format =>
createUnsupportedRow(format, "Write", "")
}
unsupportedOperatorsOutputRows ++= unsupportedWriteFormatRows

unsupportedOperatorsOutputRows
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
qWriter.writeExecReport(allAppsSum, order)
qWriter.writeStageReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsCSVReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsDetailedStageCSVReport(allAppsSum, order)
val appStatusResult = generateStatusSummary(appStatusReporter.asScala.values.toSeq)
qWriter.writeStatusReport(appStatusResult, order)
if (mlOpsEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,29 @@ object IgnoreExecs {
// Collect Limit replacement can be slower on the GPU. Disabled by default.
private val CollectLimit = "CollectLimit"
private val ScanExistingRDD = "Scan ExistingRDD"
private val ExecuteCreateViewCommand = "Execute CreateViewCommand"
private val ExistingRDD = "ExistingRDD"
// Some DDL's and table commands which can be ignored
private val ExecuteCreateViewCommand = "Execute CreateViewCommand"
private val LocalTableScan = "LocalTableScan"
private val ExecuteCreateDatabaseCommand = "Execute CreateDatabaseCommand"
private val ExecuteDropDatabaseCommand = "Execute DropDatabaseCommand"
private val ExecuteCreateTableAsSelectCommand = "Execute CreateTableAsSelectCommand"
private val ExecuteCreateTableCommand = "Execute CreateTableCommand"
private val ExecuteDropTableCommand = "Execute DropTableCommand"
private val ExecuteCreateDataSourceTableAsSelectCommand = "Execute " +
"CreateDataSourceTableAsSelectCommand"
private val SetCatalogAndNamespace = "SetCatalogAndNamespace"
private val ExecuteSetCommand = "Execute SetCommand"


val True = "true"
val False = "false"

def getAllIgnoreExecs: Set[String] = Set(AdaptiveSparkPlan, CollectLimit, ScanExistingRDD,
ExecuteCreateViewCommand, ExistingRDD, LocalTableScan)
ExecuteCreateViewCommand, ExistingRDD, LocalTableScan, ExecuteCreateTableCommand,
ExecuteDropTableCommand, ExecuteCreateDatabaseCommand, ExecuteDropDatabaseCommand,
ExecuteCreateTableAsSelectCommand, ExecuteCreateDataSourceTableAsSelectCommand,
SetCatalogAndNamespace, ExecuteSetCommand)
}

object MlOps {
Expand Down
Loading

0 comments on commit b54ae84

Please sign in to comment.