Skip to content

Commit

Permalink
Qualification should mark WriteFiles as supported (#784)
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Fixes #783

- WriteFiles exec ius added in Spark3.4.0+
- In fact, the WriteFiles exec will fallback to CPU if the child
  operator is not supported on GPU.
- For simplicity of implementation, the Q tool considers WriteFiles Exec
  as supported all the time. This should be fine because the child
(actual write exec) is evaluated independently.
  • Loading branch information
amahussein authored Feb 13, 2024
1 parent 0e51d5f commit 649a356
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ WindowExpression,2.45
WindowSpecDefinition,2.45
XxHash64,2.45
Year,2.45
WriteFilesExec,2.45
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ WindowExpression,2.73
WindowSpecDefinition,2.73
XxHash64,2.73
Year,2.73
WriteFilesExec,2.73
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ WindowExpression,3.74
WindowSpecDefinition,3.74
XxHash64,3.74
Year,3.74
WriteFilesExec,3.74
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ WindowExpression,3.65
WindowSpecDefinition,3.65
XxHash64,3.65
Year,3.65
WriteFilesExec,3.65
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ WindowExpression,4.16
WindowSpecDefinition,4.16
XxHash64,4.16
Year,4.16
WriteFilesExec,4.16
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ WindowExpression,4.25
WindowSpecDefinition,4.25
XxHash64,4.25
Year,4.25
WriteFilesExec,4.25
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ WindowExpression,4.88
WindowSpecDefinition,4.88
XxHash64,4.88
Year,4.88
WriteFilesExec,4.88
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-a10.csv
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ WindowExpression,2.59
WindowSpecDefinition,2.59
XxHash64,2.59
Year,2.59
WriteFilesExec,2.59
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ WindowExpression,2.07
WindowSpecDefinition,2.07
XxHash64,2.07
Year,2.07
WriteFilesExec,2.07
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-onprem-a100.csv
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ WindowExpression,4
WindowSpecDefinition,4
XxHash64,4
Year,4
WriteFilesExec,4
KMeans-pyspark,8.86
KMeans-scala,1
PCA-pyspark,2.24
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ object SQLPlanParser extends Logging {
WindowExecParser(node, checker, sqlID).parse
case "WindowInPandas" =>
WindowInPandasExecParser(node, checker, sqlID).parse
case wfe if WriteFilesExecParser.accepts(wfe) =>
WriteFilesExecParser(node, checker, sqlID).parse
case _ =>
// Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as
// supported but with shouldRemove flag set to True.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class WriteFilesExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {
// WriteFiles was added in Spark3.4+.
// The purpose was to create that operator to contain information from V1 write operators.
// Basically, it is supported IFF its child is supported. The GPU plan will fallBack to the CPU
// if the child is not supported.
// For Q tool, we will treat the WriteFilesExec as supported regardless of the child.
// Then the child is evaluated on its own . This results in the WriteFilesExec being incorrectly
// marked as supported, but the error is should not a big deal since the operator has no
// duration associated with it.
override val fullExecName: String = WriteFilesExecParser.execName + "Exec"

override def parse: ExecInfo = {
// the WriteFiles does not have duration
val duration = None
val speedupFactor = checker.getSpeedupFactor(fullExecName)
ExecInfo.createExecNoNode(
sqlID,
WriteFilesExecParser.execName,
"",
speedupFactor,
duration,
node.id, opType = OpTypes.WriteExec, true, None)
}
}

object WriteFilesExecParser {
val execName = "WriteFiles"
def accepts(nodeName: String): Boolean = {
nodeName.contains(execName)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +73,11 @@ class BaseTestSuite extends FunSuite with BeforeAndAfterEach with Logging {
"Spark340+ does not support the expression")
}

protected def execsSupportedSparkGTE340(): (Boolean, String) = {
(ToolUtils.isSpark340OrLater(),
"Spark340+ supports the Exec/Expression")
}

def runConditionalTest(testName: String, assumeCondition: () => (Boolean, String))
(fun: => Unit): Unit = {
val (isAllowed, ignoreMessage) = assumeCondition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,50 @@ class SQLPlanParserSuite extends BaseTestSuite {
}
}

runConditionalTest("WriteFilesExec is marked as Supported",
execsSupportedSparkGTE340) {
val dataWriteCMD = DataWritingCommandExecParser.insertIntoHadoopCMD
TrampolineUtil.withTempDir { outputLoc =>
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, dataWriteCMD) { spark =>
import spark.implicits._
val df = spark.sparkContext.makeRDD(1 to 10000, 6).toDF
val dfWithStrings = df.select(col("value").cast("string"))
dfWithStrings.write.text(s"$outputLoc/testtext")
df.write.parquet(s"$outputLoc/testparquet")
df.write.orc(s"$outputLoc/testorc")
df.write.json(s"$outputLoc/testjson")
df.write.csv(s"$outputLoc/testcsv")
df
}
val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
assert(app.sqlPlans.size == 6)
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) =>
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app)
}
val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq)
val writeExecs = allExecInfo.filter(_.exec.contains(s"$dataWriteCMD"))
val text = writeExecs.filter(_.expr.contains("text"))
val json = writeExecs.filter(_.expr.contains("json"))
val orc = writeExecs.filter(_.expr.contains("orc"))
val parquet = writeExecs.filter(_.expr.contains("parquet"))
val csv = writeExecs.filter(_.expr.contains("csv"))
for (t <- Seq(json, csv, text)) {
assertSizeAndNotSupported(1, t)
}
for (t <- Seq(orc, parquet)) {
assertSizeAndSupported(1, t)
}
// For Spark.3.4.0+, the new operator WriteFilesExec is added.
// Q tool handles this operator as supported regardless of the type of the exec operation.
val writeFileExecs = allExecInfo.filter(_.exec.contains(WriteFilesExecParser.execName))
// we have 5 write operations, so we should have 5 WriteFilesExec.
assertSizeAndSupported(5, writeFileExecs)
}
}
}

test("CreateDataSourceTableAsSelectCommand") {
// using event log to not deal with enabling hive support
val eventLog = s"$qualLogDir/createdatasourcetable_eventlog.zstd"
Expand Down

0 comments on commit 649a356

Please sign in to comment.