Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add more information for unsupported operators #680

Merged
merged 10 commits into from
Dec 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class ExecInfo(
val children: Option[Seq[ExecInfo]], // only one level deep
val stages: Set[Int] = Set.empty,
val shouldRemove: Boolean = false,
val unsupportedExprs: Array[String] = Array.empty) {
val unsupportedExprs: Array[String] = Array.empty,
val dataSet: Boolean = false,
val udf: Boolean = false) {
private def childrenToString = {
val str = children.map { c =>
c.map(" " + _.toString).mkString("\n")
Expand Down Expand Up @@ -301,13 +303,12 @@ object SQLPlanParser extends Logging {
}
val stagesInNode = getStagesInSQLNode(node, app)
val supported = execInfos.isSupported && !ds && !containsUDF

// shouldRemove is set to true if the exec is a member of "execsToBeRemoved" or if the node
// is a duplicate
val removeFlag = execInfos.shouldRemove || isDupNode || execsToBeRemoved.contains(node.name)
Seq(new ExecInfo(execInfos.sqlID, execInfos.exec, execInfos.expr, execInfos.speedupFactor,
execInfos.duration, execInfos.nodeId, supported, execInfos.children,
stagesInNode, removeFlag, execInfos.unsupportedExprs))
stagesInNode, removeFlag, execInfos.unsupportedExprs, ds, containsUDF))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean,
}
}

def writeUnsupportedOperatorsDetailedStageCSVReport(
sums: Seq[QualificationSummaryInfo],
order: String): Unit = {
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperatorsStageDuration.csv",
"Unsupported Operators StageDuration CSV Report", hadoopConf)
val headersAndSizes =
QualOutputWriter.getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedStagesDurationInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
}
parthosa marked this conversation as resolved.
Show resolved Hide resolved
}

def writePerSqlCSVReport(sums: Seq[QualificationSummaryInfo], maxSQLDescLength: Int): Unit = {
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_persql.csv",
Expand Down Expand Up @@ -361,6 +378,7 @@ object QualOutputWriter {
val SQL_DUR_STR = "SQL DF Duration"
val TASK_DUR_STR = "SQL Dataframe Task Duration"
val STAGE_DUR_STR = "Stage Task Duration"
val STAGE_WALLCLOCK_DUR_STR = "Stage Wall Clock Duration"
val POT_PROBLEM_STR = "Potential Problems"
val EXEC_CPU_PERCENT_STR = "Executor CPU Time Percent"
val APP_DUR_ESTIMATED_STR = "App Duration Estimated"
Expand Down Expand Up @@ -545,6 +563,18 @@ object QualOutputWriter {
detailedHeaderAndFields
}

def getUnsupportedOperatorsStageDurationsHeaderStringsAndSizes(
appInfos: Seq[QualificationSummaryInfo]): LinkedHashMap[String, Int] = {
val detailedHeaderAndFields = LinkedHashMap[String, Int](
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size,
STAGE_ID_STR -> STAGE_ID_STR.size,
STAGE_WALLCLOCK_DUR_STR -> STAGE_WALLCLOCK_DUR_STR.size,
APP_DUR_STR -> APP_DUR_STR.size,
SPEEDUP_BUCKET_STR -> SPEEDUP_BUCKET_STR_SIZE
)
detailedHeaderAndFields
}

def getDetailedHeaderStringsAndSizes(appInfos: Seq[QualificationSummaryInfo],
reportReadSchema: Boolean): LinkedHashMap[String, Int] = {
Expand Down Expand Up @@ -886,7 +916,7 @@ object QualOutputWriter {
}
}

def constructUnsupportedOperatorsInfo(
def constructUnsupportedStagesDurationInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
Expand All @@ -895,81 +925,106 @@ object QualOutputWriter {
val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)
val appId = sumInfo.appId
val readFormat = sumInfo.readFileFormatAndTypesNotSupported
val writeFormat = sumInfo.writeDataFormat
val unsupportedExecs = sumInfo.unSupportedExecs
val unsupportedExprs = sumInfo.unSupportedExprs
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()
val appDuration = sumInfo.sparkSqlDFWallClockDuration
val recommendation = sumInfo.estimatedInfo.recommendation

if (readFormat.nonEmpty) {
val unsupportedReadFormatRows = readFormat.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
""
}
val data = ListBuffer(
sumInfo.stageInfo.collect {
case info if info.stageWallclockDuration > 0 && info.unsupportedExecs.nonEmpty =>
parthosa marked this conversation as resolved.
Show resolved Hide resolved
val stageAppDuration = info.stageWallclockDuration
val unsupportedExecs = info.unsupportedExecs.mkString(";")
val data = ListBuffer[(String, Int)](
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Read")-> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(readFormat) -> headersAndSizes(DETAILS),
reformatCSVFunc(readType) -> headersAndSizes(NOTES)
reformatCSVFunc(unsupportedExecs) -> headersAndSizes(UNSUPPORTED_TYPE),
info.stageId.toString -> headersAndSizes(STAGE_ID_STR),
stageAppDuration.toString -> headersAndSizes(STAGE_WALLCLOCK_DUR_STR),
appDuration.toString -> headersAndSizes(APP_DUR_STR),
recommendation -> headersAndSizes(SPEEDUP_BUCKET_STR)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedReadFormatRows
}
if (unsupportedExecs.nonEmpty) {
val unsupportedExecRows = unsupportedExecs.split(";").map { exec =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}

def constructUnsupportedOperatorsInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
prettyPrint: Boolean,
reformatCSV: Boolean = true): Seq[String] = {

val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)

val appId = reformatCSVFunc(sumInfo.appId)
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()
val unsupportedExprs = sumInfo.unSupportedExprs
val allExecs = getAllExecsFromPlan(sumInfo.planInfo)
val dataSetExecs = allExecs.collect { case x if x.dataSet => x.exec }
val udfExecs = allExecs.collect { case x if x.udf => x.exec }

def createUnsupportedRow(exec: String, execType: String, notes: String): String = {
val data = ListBuffer(
appId -> headersAndSizes(APP_ID_STR),
reformatCSVFunc(execType) -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc(notes) -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}

val readFormatRows = sumInfo.readFileFormatAndTypesNotSupported.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
""
}
unsupportedOperatorsOutputRows ++= unsupportedExecRows
createUnsupportedRow(readFormat,"Read", readType)
}
if (unsupportedExecExprsMap.nonEmpty) {
val unsupportedExecExprMapRows = unsupportedExecExprsMap.map { case (exec, exprs) =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc(s"$exec Exec is not supported as expressions are " +
s"not supported - `$exprs`") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows
unsupportedOperatorsOutputRows ++= readFormatRows

// Unsupported Execs and Execs that are not supported due to unsupported expressions, or if
// the operation is from a dataset, or if the operation contains a UDF.
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedExecsSet = sumInfo.unSupportedExecs.split(";").toSet
val unsupportedExecsFiltered = unsupportedExecsSet.filterNot(unsupportedExecExprsMap.contains)
val actualunsupportedExecs = unsupportedExecsFiltered.filterNot(x => dataSetExecs.contains(x)
|| udfExecs.contains(x) || unsupportedExecExprsMap.contains(x))
val unsupportedExecRows = actualunsupportedExecs.map { exec =>
createUnsupportedRow(exec, "Exec", "")
}
unsupportedOperatorsOutputRows ++= unsupportedExecRows

val unsupportedDatasetExecRows = dataSetExecs.map { exec =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is not supported as " +
s"this operation is from dataset which is not supported")
}
unsupportedOperatorsOutputRows ++= unsupportedDatasetExecRows

val unsupportedUdfExecRows = udfExecs.map { exec =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is " +
s"not supported as it contains UDF which is not supported")
}
unsupportedOperatorsOutputRows ++= unsupportedUdfExecRows

val unsupportedExecExprMapRows = sumInfo.unsupportedExecstoExprsMap.map { case (exec, exprs) =>
createUnsupportedRow(exec, "Exec", s"$exec Exec is not" +
s" supported as expressions are not supported - `$exprs`")
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows

if (unsupportedExprs.nonEmpty) {
val unsupportedExprRows = unsupportedExprs.split(";").map { expr =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Expression") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(expr) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
val unsupportedExprRows = sumInfo.unSupportedExprs.split(";").map { expr =>
createUnsupportedRow(expr, "Expression", "")
}
unsupportedOperatorsOutputRows ++= unsupportedExprRows
}
if (writeFormat.nonEmpty) {
val unsupportedwriteFormatRows = writeFormat.map { format =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Write") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(format) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedwriteFormatRows

val unsupportedWriteFormatRows = sumInfo.writeDataFormat.map { format =>
createUnsupportedRow(format, "Write", "")
}
unsupportedOperatorsOutputRows ++= unsupportedWriteFormatRows

unsupportedOperatorsOutputRows
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
qWriter.writeExecReport(allAppsSum, order)
qWriter.writeStageReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsCSVReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsDetailedStageCSVReport(allAppsSum, order)
val appStatusResult = generateStatusSummary(appStatusReporter.asScala.values.toSeq)
qWriter.writeStatusReport(appStatusResult, order)
if (mlOpsEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class QualificationAppInfo(
val allSpeedupFactorAvg = SQLPlanParser.averageSpeedup(execInfos.map(_.speedupFactor))
val allFlattenedExecs = flattenedExecs(execInfos)
val numUnsupported = allFlattenedExecs.filterNot(_.isSupported)
val unsupportedExecs = numUnsupported.map(_.exec)
// if we have unsupported try to guess at how much time. For now divide
// time by number of execs and give each one equal weight
val eachExecTime = allStageTaskTime / allFlattenedExecs.size
Expand Down Expand Up @@ -339,8 +340,13 @@ class QualificationAppInfo(
eachStageUnsupported
}

// Get stage info for the given stageId.
val stageInfos = stageIdToInfo.filterKeys { case (id, _) => id == stageId }
val wallclockStageDuration = stageInfos.values.map(x => x.duration.getOrElse(0L)).sum

StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime,
finalEachStageUnsupported, numTransitions, transitionsTime, estimated)
finalEachStageUnsupported, numTransitions, transitionsTime, estimated,
wallclockStageDuration, unsupportedExecs)
}.toSet
}

Expand Down Expand Up @@ -459,7 +465,7 @@ class QualificationAppInfo(
c.filterNot(_.shouldRemove)
}
new ExecInfo(e.sqlID, e.exec, e.expr, e.speedupFactor, e.duration,
e.nodeId, e.isSupported, filteredChildren, e.stages, e.shouldRemove)
e.nodeId, e.isSupported, filteredChildren, e.stages, e.shouldRemove, e.unsupportedExprs)
}
val filteredPlanInfos = execFilteredChildren.filterNot(_.shouldRemove)
p.copy(execInfo = filteredPlanInfos)
Expand Down Expand Up @@ -590,7 +596,6 @@ class QualificationAppInfo(
case x if !IgnoreExecs.getAllIgnoreExecs.contains(x.exec) => x.exec
}.toSet.mkString(";").trim.replaceAll("\n", "").replace(",", ":")


// Get all the unsupported Expressions from the plan
val unSupportedExprs = origPlanInfos.map(_.execInfo.flatMap(
_.unsupportedExprs)).flatten.filter(_.nonEmpty).toSet.mkString(";")
Expand Down Expand Up @@ -639,6 +644,9 @@ class QualificationAppInfo(
1
}

val wallClockSqlDFToUse = QualificationAppInfo.wallClocksqlDataFrameToUse(
sparkSQLDFWallClockDuration, appDuration)

val estimatedInfo = QualificationAppInfo.calculateEstimatedInfoSummary(estimatedGPURatio,
sparkSQLDFWallClockDuration, appDuration, taskSpeedupFactor, appName, appId,
sqlIdsWithFailures.nonEmpty, mlSpeedup, unSupportedExecs, unSupportedExprs,
Expand All @@ -649,8 +657,8 @@ class QualificationAppInfo(
notSupportFormatAndTypesString, getAllReadFileFormats, writeFormat,
allComplexTypes, nestedComplexTypes, longestSQLDuration, sqlDataframeTaskDuration,
nonSQLTaskDuration, unsupportedSQLTaskDuration, supportedSQLTaskDuration,
taskSpeedupFactor, info.sparkUser, info.startTime, origPlanInfos,
perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos,
taskSpeedupFactor, info.sparkUser, info.startTime, wallClockSqlDFToUse,
origPlanInfos, perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos,
unSupportedExecs, unSupportedExprs, clusterTags, allClusterTagsMap, mlFunctions,
mlTotalStageDuration, unsupportedExecExprsMap)
}
Expand Down Expand Up @@ -864,6 +872,7 @@ case class QualificationSummaryInfo(
taskSpeedupFactor: Double,
user: String,
startTime: Long,
sparkSqlDFWallClockDuration: Long,
planInfo: Seq[PlanInfo],
stageInfo: Seq[StageQualSummaryInfo],
estimatedInfo: EstimatedAppInfo,
Expand All @@ -884,7 +893,9 @@ case class StageQualSummaryInfo(
unsupportedTaskDur: Long,
numTransitions: Int,
transitionTime: Long,
estimated: Boolean = false)
estimated: Boolean = false,
stageWallclockDuration: Long = 0,
unsupportedExecs: Seq[String] = Seq.empty)

object QualificationAppInfo extends Logging {
// define recommendation constants
Expand Down Expand Up @@ -929,19 +940,23 @@ object QualificationAppInfo extends Logging {
}
}

def wallClocksqlDataFrameToUse(sqlDataFrameDuration: Long, appDuration: Long): Long = {
if (sqlDataFrameDuration > appDuration) {
// our app duration is shorter than our sql duration, estimate the sql duration down
// to app duration
appDuration
} else {
sqlDataFrameDuration
}
}
parthosa marked this conversation as resolved.
Show resolved Hide resolved

// Summarize and estimate based on wall clock times
def calculateEstimatedInfoSummary(estimatedRatio: Double, sqlDataFrameDuration: Long,
appDuration: Long, sqlSpeedupFactor: Double, appName: String, appId: String,
hasFailures: Boolean, mlSpeedupFactor: Option[MLFuncsSpeedupAndDuration] = None,
unsupportedExecs: String = "", unsupportedExprs: String = "",
allClusterTagsMap: Map[String, String] = Map.empty[String, String]): EstimatedAppInfo = {
val sqlDataFrameDurationToUse = if (sqlDataFrameDuration > appDuration) {
// our app duration is shorter then our sql duration, estimate the sql duration down
// to app duration
appDuration
} else {
sqlDataFrameDuration
}
val sqlDataFrameDurationToUse = wallClocksqlDataFrameToUse(sqlDataFrameDuration, appDuration)

// get the average speedup and duration for ML funcs supported on GPU
val (mlSpeedup, mlDuration) = if (mlSpeedupFactor.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,10 @@ class QualificationSuite extends BaseTestSuite {

val filename = s"$outpath/rapids_4_spark_qualification_output/" +
s"rapids_4_spark_qualification_output_unsupportedOperators.csv"
val stageDurationFile = s"$outpath/rapids_4_spark_qualification_output/" +
s"rapids_4_spark_qualification_output_unsupportedOperatorsStageDuration.csv"
val inputSource = Source.fromFile(filename)
val unsupportedStageDuration = Source.fromFile(stageDurationFile)
try {
val lines = inputSource.getLines.toSeq
// 1 for header, 1 for values
Expand All @@ -1166,6 +1169,11 @@ class QualificationSuite extends BaseTestSuite {
assert(lines.size == expLinesSize)
assert(lines.head.contains("App ID,Unsupported Type,"))
assert(lines(1).contains("\"Read\",\"JSON\",\"Types not supported - bigint:int\""))

val stageDurationLines = unsupportedStageDuration.getLines.toSeq
assert(stageDurationLines.head.contains("" +
"Stage Wall Clock Duration,App Duration,Recommendation"))
assert(stageDurationLines(1).contains("Not Recommended"))
} finally {
inputSource.close()
}
Expand Down
Loading