diff --git a/core/src/main/resources/operatorsScore-databricks-aws-t4.csv b/core/src/main/resources/operatorsScore-databricks-aws-t4.csv index afd6848ce..4ce7087e7 100644 --- a/core/src/main/resources/operatorsScore-databricks-aws-t4.csv +++ b/core/src/main/resources/operatorsScore-databricks-aws-t4.csv @@ -260,6 +260,7 @@ WindowExpression,2.45 WindowSpecDefinition,2.45 XxHash64,2.45 Year,2.45 +WriteFilesExec,2.45 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-databricks-azure-t4.csv b/core/src/main/resources/operatorsScore-databricks-azure-t4.csv index 6b7396c0d..0c7c65f51 100644 --- a/core/src/main/resources/operatorsScore-databricks-azure-t4.csv +++ b/core/src/main/resources/operatorsScore-databricks-azure-t4.csv @@ -260,6 +260,7 @@ WindowExpression,2.73 WindowSpecDefinition,2.73 XxHash64,2.73 Year,2.73 +WriteFilesExec,2.73 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv b/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv index 204866e8a..dbbfec22f 100644 --- a/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv @@ -253,6 +253,7 @@ WindowExpression,3.74 WindowSpecDefinition,3.74 XxHash64,3.74 Year,3.74 +WriteFilesExec,3.74 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv b/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv index bf6395056..cd4bdb55e 100644 --- a/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv @@ -253,6 +253,7 @@ WindowExpression,3.65 WindowSpecDefinition,3.65 XxHash64,3.65 Year,3.65 +WriteFilesExec,3.65 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-dataproc-l4.csv b/core/src/main/resources/operatorsScore-dataproc-l4.csv index 251c67c9f..9c13792fe 100644 --- a/core/src/main/resources/operatorsScore-dataproc-l4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-l4.csv @@ -260,6 +260,7 @@ WindowExpression,4.16 WindowSpecDefinition,4.16 XxHash64,4.16 Year,4.16 +WriteFilesExec,4.16 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv b/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv index eb22ba760..6874994eb 100644 --- a/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv @@ -253,6 +253,7 @@ WindowExpression,4.25 WindowSpecDefinition,4.25 XxHash64,4.25 Year,4.25 +WriteFilesExec,4.25 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-dataproc-t4.csv b/core/src/main/resources/operatorsScore-dataproc-t4.csv index 789829e69..d0234e2fd 100644 --- a/core/src/main/resources/operatorsScore-dataproc-t4.csv +++ b/core/src/main/resources/operatorsScore-dataproc-t4.csv @@ -260,6 +260,7 @@ WindowExpression,4.88 WindowSpecDefinition,4.88 XxHash64,4.88 Year,4.88 +WriteFilesExec,4.88 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-emr-a10.csv b/core/src/main/resources/operatorsScore-emr-a10.csv index 600614084..9c48cc9a4 100644 --- a/core/src/main/resources/operatorsScore-emr-a10.csv +++ b/core/src/main/resources/operatorsScore-emr-a10.csv @@ -260,6 +260,7 @@ WindowExpression,2.59 WindowSpecDefinition,2.59 XxHash64,2.59 Year,2.59 +WriteFilesExec,2.59 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-emr-t4.csv b/core/src/main/resources/operatorsScore-emr-t4.csv index bf4c818ae..503b05994 100644 --- a/core/src/main/resources/operatorsScore-emr-t4.csv +++ b/core/src/main/resources/operatorsScore-emr-t4.csv @@ -260,6 +260,7 @@ WindowExpression,2.07 WindowSpecDefinition,2.07 XxHash64,2.07 Year,2.07 +WriteFilesExec,2.07 AggregateInPandasExec,1.2 ArrowEvalPythonExec,1.2 FlatMapGroupsInPandasExec,1.2 diff --git a/core/src/main/resources/operatorsScore-onprem-a100.csv b/core/src/main/resources/operatorsScore-onprem-a100.csv index d5638c96e..f4367f388 100644 --- a/core/src/main/resources/operatorsScore-onprem-a100.csv +++ b/core/src/main/resources/operatorsScore-onprem-a100.csv @@ -265,6 +265,7 @@ WindowExpression,4 WindowSpecDefinition,4 XxHash64,4 Year,4 +WriteFilesExec,4 KMeans-pyspark,8.86 KMeans-scala,1 PCA-pyspark,2.24 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index e8adb1ff9..683ed18c1 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -496,6 +496,8 @@ object SQLPlanParser extends Logging { WindowExecParser(node, checker, sqlID).parse case "WindowInPandas" => WindowInPandasExecParser(node, checker, sqlID).parse + case wfe if WriteFilesExecParser.accepts(wfe) => + WriteFilesExecParser(node, checker, sqlID).parse case _ => // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as // supported but with shouldRemove flag set to True. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala new file mode 100644 index 000000000..bcd2272e0 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WriteFilesExecParser.scala @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker + +import org.apache.spark.sql.execution.ui.SparkPlanGraphNode + +case class WriteFilesExecParser( + node: SparkPlanGraphNode, + checker: PluginTypeChecker, + sqlID: Long) extends ExecParser { + // WriteFiles was added in Spark3.4+. + // The purpose was to create that operator to contain information from V1 write operators. + // Basically, it is supported IFF its child is supported. The GPU plan will fallBack to the CPU + // if the child is not supported. + // For Q tool, we will treat the WriteFilesExec as supported regardless of the child. + // Then the child is evaluated on its own . This results in the WriteFilesExec being incorrectly + // marked as supported, but the error is should not a big deal since the operator has no + // duration associated with it. + override val fullExecName: String = WriteFilesExecParser.execName + "Exec" + + override def parse: ExecInfo = { + // the WriteFiles does not have duration + val duration = None + val speedupFactor = checker.getSpeedupFactor(fullExecName) + ExecInfo.createExecNoNode( + sqlID, + WriteFilesExecParser.execName, + "", + speedupFactor, + duration, + node.id, opType = OpTypes.WriteExec, true, None) + } +} + +object WriteFilesExecParser { + val execName = "WriteFiles" + def accepts(nodeName: String): Boolean = { + nodeName.contains(execName) + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/BaseTestSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/BaseTestSuite.scala index 6f18946f3..ef7192a97 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/BaseTestSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/BaseTestSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,11 @@ class BaseTestSuite extends FunSuite with BeforeAndAfterEach with Logging { "Spark340+ does not support the expression") } + protected def execsSupportedSparkGTE340(): (Boolean, String) = { + (ToolUtils.isSpark340OrLater(), + "Spark340+ supports the Exec/Expression") + } + def runConditionalTest(testName: String, assumeCondition: () => (Boolean, String)) (fun: => Unit): Unit = { val (isAllowed, ignoreMessage) = assumeCondition() diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index dfba232f4..3a451973f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -366,6 +366,50 @@ class SQLPlanParserSuite extends BaseTestSuite { } } + runConditionalTest("WriteFilesExec is marked as Supported", + execsSupportedSparkGTE340) { + val dataWriteCMD = DataWritingCommandExecParser.insertIntoHadoopCMD + TrampolineUtil.withTempDir { outputLoc => + TrampolineUtil.withTempDir { eventLogDir => + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, dataWriteCMD) { spark => + import spark.implicits._ + val df = spark.sparkContext.makeRDD(1 to 10000, 6).toDF + val dfWithStrings = df.select(col("value").cast("string")) + dfWithStrings.write.text(s"$outputLoc/testtext") + df.write.parquet(s"$outputLoc/testparquet") + df.write.orc(s"$outputLoc/testorc") + df.write.json(s"$outputLoc/testjson") + df.write.csv(s"$outputLoc/testcsv") + df + } + val pluginTypeChecker = new PluginTypeChecker() + val app = createAppFromEventlog(eventLog) + assert(app.sqlPlans.size == 6) + val parsedPlans = app.sqlPlans.map { case (sqlID, plan) => + SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) + } + val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) + val writeExecs = allExecInfo.filter(_.exec.contains(s"$dataWriteCMD")) + val text = writeExecs.filter(_.expr.contains("text")) + val json = writeExecs.filter(_.expr.contains("json")) + val orc = writeExecs.filter(_.expr.contains("orc")) + val parquet = writeExecs.filter(_.expr.contains("parquet")) + val csv = writeExecs.filter(_.expr.contains("csv")) + for (t <- Seq(json, csv, text)) { + assertSizeAndNotSupported(1, t) + } + for (t <- Seq(orc, parquet)) { + assertSizeAndSupported(1, t) + } + // For Spark.3.4.0+, the new operator WriteFilesExec is added. + // Q tool handles this operator as supported regardless of the type of the exec operation. + val writeFileExecs = allExecInfo.filter(_.exec.contains(WriteFilesExecParser.execName)) + // we have 5 write operations, so we should have 5 WriteFilesExec. + assertSizeAndSupported(5, writeFileExecs) + } + } + } + test("CreateDataSourceTableAsSelectCommand") { // using event log to not deal with enabling hive support val eventLog = s"$qualLogDir/createdatasourcetable_eventlog.zstd"