Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add more information for unsupported operators #680

Merged
merged 10 commits into from
Dec 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class ExecInfo(
val children: Option[Seq[ExecInfo]], // only one level deep
val stages: Set[Int] = Set.empty,
val shouldRemove: Boolean = false,
val unsupportedExprs: Array[String] = Array.empty) {
val unsupportedExprs: Array[String] = Array.empty,
val dataSet: Boolean = false,
val udf: Boolean = false) {
private def childrenToString = {
val str = children.map { c =>
c.map(" " + _.toString).mkString("\n")
Expand Down Expand Up @@ -76,7 +78,8 @@ object SQLPlanParser extends Logging {

val windowFunctionPattern = """(\w+)\(""".r

val ignoreExpressions = Array("any", "cast", "decimal", "decimaltype", "every", "some",
val ignoreExpressions = Array("any", "cast", "ansi_cast", "decimal", "decimaltype", "every",
"some", "merge_max", "merge_min", "merge_sum", "merge_count", "merge_avg", "merge_first",
"list",
// current_database does not cause any CPU fallbacks
"current_database",
Expand Down Expand Up @@ -301,13 +304,12 @@ object SQLPlanParser extends Logging {
}
val stagesInNode = getStagesInSQLNode(node, app)
val supported = execInfos.isSupported && !ds && !containsUDF

// shouldRemove is set to true if the exec is a member of "execsToBeRemoved" or if the node
// is a duplicate
val removeFlag = execInfos.shouldRemove || isDupNode || execsToBeRemoved.contains(node.name)
Seq(new ExecInfo(execInfos.sqlID, execInfos.exec, execInfos.expr, execInfos.speedupFactor,
execInfos.duration, execInfos.nodeId, supported, execInfos.children,
stagesInNode, removeFlag, execInfos.unsupportedExprs))
stagesInNode, removeFlag, execInfos.unsupportedExprs, ds, containsUDF))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.{CLUSTER_ID, CLUSTER_ID_STR_SIZE, JOB_ID, JOB_ID_STR_SIZE, RUN_NAME, RUN_NAME_STR_SIZE, TEXT_DELIMITER}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.{IgnoreExecs, ToolUtils}
import org.apache.spark.sql.rapids.tool.qualification.{EstimatedPerSQLSummaryInfo, EstimatedSummaryInfo, QualificationAppInfo, QualificationSummaryInfo, StatusSummaryInfo}
import org.apache.spark.sql.rapids.tool.util._

Expand Down Expand Up @@ -151,13 +151,38 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean,
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperators.csv",
"Unsupported Operators CSV Report", hadoopConf)
val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
try {
val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
}
} finally {
csvFileWriter.close()
}
}

def writeUnsupportedOperatorsDetailedStageCSVReport(
sums: Seq[QualificationSummaryInfo],
order: String): Unit = {
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperatorsStageDuration.csv",
"Unsupported Operators StageDuration CSV Report", hadoopConf)
try {
val headersAndSizes =
QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
}
} finally {
csvFileWriter.close()
}
parthosa marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -361,6 +386,7 @@ object QualOutputWriter {
val SQL_DUR_STR = "SQL DF Duration"
val TASK_DUR_STR = "SQL Dataframe Task Duration"
val STAGE_DUR_STR = "Stage Task Duration"
val STAGE_WALLCLOCK_DUR_STR = "Stage Duration"
val POT_PROBLEM_STR = "Potential Problems"
val EXEC_CPU_PERCENT_STR = "Executor CPU Time Percent"
val APP_DUR_ESTIMATED_STR = "App Duration Estimated"
Expand Down Expand Up @@ -400,6 +426,7 @@ object QualOutputWriter {
val UNSUPPORTED_TYPE = "Unsupported Type"
val DETAILS = "Details"
val NOTES = "Notes"
val IGNORE_OPERATOR = "Ignore Operator"
val RUN_NAME = "RunName"
val ESTIMATED_FREQUENCY = "Estimated Job Frequency (monthly)"
val ML_FUNCTIONS = "ML Functions"
Expand Down Expand Up @@ -540,11 +567,25 @@ object QualOutputWriter {
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size,
DETAILS -> DETAILS.size,
NOTES -> NOTES.size
NOTES -> NOTES.size,
IGNORE_OPERATOR -> IGNORE_OPERATOR.size
)
detailedHeaderAndFields
}

def getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(
appInfos: Seq[QualificationSummaryInfo]): LinkedHashMap[String, Int] = {
val detailedHeaderAndFields = LinkedHashMap[String, Int](
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size,
STAGE_ID_STR -> STAGE_ID_STR.size,
STAGE_WALLCLOCK_DUR_STR -> STAGE_WALLCLOCK_DUR_STR.size,
APP_DUR_STR -> APP_DUR_STR.size,
SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE,
IGNORE_OPERATOR -> IGNORE_OPERATOR.size
)
detailedHeaderAndFields
}

def getDetailedHeaderStringsAndSizes(appInfos: Seq[QualificationSummaryInfo],
reportReadSchema: Boolean): LinkedHashMap[String, Int] = {
Expand Down Expand Up @@ -886,7 +927,7 @@ object QualOutputWriter {
}
}

def constructUnsupportedOperatorsInfo(
def constructUnsupportedStagesDurationInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
Expand All @@ -895,81 +936,130 @@ object QualOutputWriter {
val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)
val appId = sumInfo.appId
val readFormat = sumInfo.readFileFormatAndTypesNotSupported
val writeFormat = sumInfo.writeDataFormat
val unsupportedExecs = sumInfo.unSupportedExecs
val unsupportedExprs = sumInfo.unSupportedExprs
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()
val appDuration = sumInfo.sparkSqlDFWallClockDuration
val recommendation = sumInfo.estimatedInfo.recommendation

if (readFormat.nonEmpty) {
val unsupportedReadFormatRows = readFormat.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
sumInfo.stageInfo.collect {
case info if info.unsupportedExecs.nonEmpty =>
val stageAppDuration = info.stageWallclockDuration
val allUnsupportedExecs = info.unsupportedExecs
if (allUnsupportedExecs.nonEmpty) {
allUnsupportedExecs.map { unsupportedExecsStr =>
// Ignore operator is a boolean value which indicates if the operator should be
// considered for GPU acceleration or not. If the value is true, the operator will
// be ignored.
val ignoreUnsupportedExec = if (
IgnoreExecs.getAllIgnoreExecs.contains(unsupportedExecsStr)) {
IgnoreExecs.True
} else {
IgnoreExecs.False
}
val data = ListBuffer[(String, Int)](
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc(unsupportedExecsStr) -> headersAndSizes(UNSUPPORTED_TYPE),
info.stageId.toString -> headersAndSizes(STAGE_ID_STR),
stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR),
appDuration.toString -> headersAndSizes(APP_DUR_STR),
recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR),
ignoreUnsupportedExec -> headersAndSizes(IGNORE_OPERATOR)
)
constructOutputRow(data, delimiter, prettyPrint)
}.mkString
}
else {
""
}
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Read")-> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(readFormat) -> headersAndSizes(DETAILS),
reformatCSVFunc(readType) -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
}

def constructUnsupportedOperatorsInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
prettyPrint: Boolean,
reformatCSV: Boolean = true): Seq[String] = {

val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)

val appId = reformatCSVFunc(sumInfo.appId)
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()
val unsupportedExprs = sumInfo.unSupportedExprs
val allExecs = getAllExecsFromPlan(sumInfo.planInfo)
val dataSetExecs = allExecs.collect { case x if x.dataSet => x.exec }
val udfExecs = allExecs.collect { case x if x.udf => x.exec }

def createUnsupportedRow(exec: String, execType: String, notes: String,
ignoreOperator: String = IgnoreExecs.False): String = {
val data = ListBuffer(
appId -> headersAndSizes(APP_ID_STR),
reformatCSVFunc(execType) -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc(notes) -> headersAndSizes(NOTES),
reformatCSVFunc(ignoreOperator) -> headersAndSizes(IGNORE_OPERATOR)
)
constructOutputRow(data, delimiter, prettyPrint)
}

val readFormatRows = sumInfo.readFileFormatAndTypesNotSupported.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
""
}
unsupportedOperatorsOutputRows ++= unsupportedReadFormatRows
createUnsupportedRow(readFormat,"Read", readType)
}
if (unsupportedExecs.nonEmpty) {
val unsupportedExecRows = unsupportedExecs.split(";").map { exec =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
unsupportedOperatorsOutputRows ++= readFormatRows

// Unsupported Execs and Execs that are not supported due to unsupported expressions, or if
// the operation is from a dataset, or if the operation contains a UDF.
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet
val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains)
val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x)
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x))
val unsupportedExecRows = actualunsupportedExecs.map { exec =>
// If the exec is in the ignore list, then set the ignore operator to true.
if (IgnoreExecs.getAllIgnoreExecs.contains(exec)) {
createUnsupportedRow(exec, "Exec", "", IgnoreExecs.True)
} else {
createUnsupportedRow(exec, "Exec", "", IgnoreExecs.False)
}
unsupportedOperatorsOutputRows ++= unsupportedExecRows
}
if (unsupportedExecExprsMap.nonEmpty) {
val unsupportedExecExprMapRows = unsupportedExecExprsMap.map { case (exec, exprs) =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc(s"$exec Exec is not supported as expressions are " +
s"not supported - `$exprs`") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows
unsupportedOperatorsOutputRows ++= unsupportedExecRows

val unsupportedDatasetExecRows = dataSetExecs.map { exec =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is not supported as " +
s"this operation is from dataset which is not supported")
}
unsupportedOperatorsOutputRows ++= unsupportedDatasetExecRows

val unsupportedUdfExecRows = udfExecs.map { exec =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is " +
s"not supported as it contains UDF which is not supported")
}
unsupportedOperatorsOutputRows ++= unsupportedUdfExecRows

val unsupportedExecExprMapRows = sumInfo.unsupportedExecstoExprsMap.map { case (exec, exprs) =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is not" +
s" supported as expressions are not supported - `$exprs`")
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows

if (unsupportedExprs.nonEmpty) {
val unsupportedExprRows = unsupportedExprs.split(";").map { expr =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Expression") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(expr) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
val unsupportedExprRows = sumInfo.unSupportedExprs.split(";").map { expr =>
createUnsupportedRow(expr, "Expression", "")
}
unsupportedOperatorsOutputRows ++= unsupportedExprRows
}
if (writeFormat.nonEmpty) {
val unsupportedwriteFormatRows = writeFormat.map { format =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Write") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(format) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedwriteFormatRows

val unsupportedWriteFormatRows = sumInfo.writeDataFormat.map { format =>
createUnsupportedRow(format, "Write", "")
}
unsupportedOperatorsOutputRows ++= unsupportedWriteFormatRows

unsupportedOperatorsOutputRows
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
qWriter.writeExecReport(allAppsSum, order)
qWriter.writeStageReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsCSVReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsDetailedStageCSVReport(allAppsSum, order)
val appStatusResult = generateStatusSummary(appStatusReporter.asScala.values.toSeq)
qWriter.writeStatusReport(appStatusResult, order)
if (mlOpsEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,29 @@ object IgnoreExecs {
// Collect Limit replacement can be slower on the GPU. Disabled by default.
private val CollectLimit = "CollectLimit"
private val ScanExistingRDD = "Scan ExistingRDD"
private val ExecuteCreateViewCommand = "Execute CreateViewCommand"
private val ExistingRDD = "ExistingRDD"
// Some DDL's and table commands which can be ignored
private val ExecuteCreateViewCommand = "Execute CreateViewCommand"
private val LocalTableScan = "LocalTableScan"
private val ExecuteCreateDatabaseCommand = "Execute CreateDatabaseCommand"
private val ExecuteDropDatabaseCommand = "Execute DropDatabaseCommand"
private val ExecuteCreateTableAsSelectCommand = "Execute CreateTableAsSelectCommand"
private val ExecuteCreateTableCommand = "Execute CreateTableCommand"
private val ExecuteDropTableCommand = "Execute DropTableCommand"
private val ExecuteCreateDataSourceTableAsSelectCommand = "Execute " +
"CreateDataSourceTableAsSelectCommand"
private val SetCatalogAndNamespace = "SetCatalogAndNamespace"
private val ExecuteSetCommand = "Execute SetCommand"


val True = "true"
val False = "false"

def getAllIgnoreExecs: Set[String] = Set(AdaptiveSparkPlan, CollectLimit, ScanExistingRDD,
ExecuteCreateViewCommand, ExistingRDD, LocalTableScan)
ExecuteCreateViewCommand, ExistingRDD, LocalTableScan, ExecuteCreateTableCommand,
ExecuteDropTableCommand, ExecuteCreateDatabaseCommand, ExecuteDropDatabaseCommand,
ExecuteCreateTableAsSelectCommand, ExecuteCreateDataSourceTableAsSelectCommand,
SetCatalogAndNamespace, ExecuteSetCommand)
}

object MlOps {
Expand Down
Loading
Loading