From 0f42635035c4af1593855eb0362590cc439386c1 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 15 Nov 2024 04:43:02 +0000 Subject: [PATCH 1/3] Fix aqe_test failures on [databricks] 14.3. Fixes #11643. This commit fixes the AQE/DPP tests that were reported in #11643 to be failing on Databricks 14.3. This is the result of a deficient shim for GpuSubqueryBroadcastMeta being active for Databricks 14.3. The deficient shim errantly extended the non-Databricks base shim. This commit moves the commonality in Databricks shims to a common base class that is then customized for the changes in Databricks 14.3. Signed-off-by: MithunR --- integration_tests/src/main/python/aqe_test.py | 6 +- .../execution/GpuSubqueryBroadcastExec.scala | 6 +- .../execution/GpuSubqueryBroadcastMeta.scala | 93 +------------- .../GpuSubqueryBroadcastMeta330DBBase.scala | 121 ++++++++++++++++++ .../execution/GpuSubqueryBroadcastMeta.scala | 2 +- 5 files changed, 134 insertions(+), 94 deletions(-) create mode 100644 sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index f9dddfae038..5b3b04efdfb 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -338,10 +338,10 @@ def do_it(spark): # this should be fixed by https://github.com/NVIDIA/spark-rapids/issues/11120 aqe_join_with_dpp_fallback=["FilterExec"] if (is_databricks_runtime() or is_before_spark_330()) else [] +if is_databricks_version_or_later(14, 3): + aqe_join_with_dpp_fallback.append("CollectLimitExec") # Verify that DPP and AQE can coexist in even some odd cases involving multiple tables -@pytest.mark.skipif(condition=is_databricks_version_or_later(14, 3), - reason="https://github.com/NVIDIA/spark-rapids/issues/11643") @ignore_order(local=True) @allow_non_gpu(*aqe_join_with_dpp_fallback) def test_aqe_join_with_dpp(spark_tmp_path): @@ -395,8 +395,6 @@ def run_test(spark): assert_gpu_and_cpu_are_equal_collect(run_test, conf=_adaptive_conf) # Verify that DPP and AQE can coexist in even some odd cases involving 2 tables with multiple columns -@pytest.mark.skipif(condition=is_databricks_version_or_later(14, 3), - reason="https://github.com/NVIDIA/spark-rapids/issues/11643") @ignore_order(local=True) @allow_non_gpu(*aqe_join_with_dpp_fallback) def test_aqe_join_with_dpp_multi_columns(spark_tmp_path): diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala index 72ed0e79504..e529e268f3f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala @@ -126,8 +126,10 @@ abstract class GpuSubqueryBroadcastMetaBase( } else { willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") } - case _ => - throw new AssertionError("should not reach here") + + case unexpected => + throw new AssertionError("Unexpected child exec in AdaptiveSparkPlan: " + + s"${unexpected.getClass.getName}") } case _ => diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala index 76255b3e5a6..f0ce5754f6d 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala @@ -30,96 +30,15 @@ import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode -class GpuSubqueryBroadcastMeta( - s: SubqueryBroadcastExec, - conf: RapidsConf, - p: Option[RapidsMeta[_, _, _]], - r: DataFromReplacementRule) extends - SparkPlanMeta[SubqueryBroadcastExec](s, conf, p, r) { - private var broadcastBuilder: () => SparkPlan = _ - - override val childExprs: Seq[BaseExprMeta[_]] = Nil - - override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil - - override def tagPlanForGpu(): Unit = s.child match { - // DPP: For AQE off, in this case, we handle DPP by converting the underlying - // BroadcastExchangeExec to GpuBroadcastExchangeExec. - // This is slightly different from the Apache Spark case, because Spark - // sends the underlying plan into the plugin in advance via the PlanSubqueries rule. - // Here, we have the full non-GPU subquery plan, so we convert the whole - // thing. - case ex @ BroadcastExchangeExec(_, child) => - val exMeta = new GpuBroadcastMeta(ex.copy(child = child), conf, p, r) - exMeta.tagForGpu() - if (exMeta.canThisBeReplaced) { - broadcastBuilder = () => exMeta.convertToGpu() - } else { - willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") - } - // DPP: For AQE on, we have an almost completely different scenario then before, - // Databricks uses a BroadcastQueryStageExec and either: - // 1) provide an underlying BroadcastExchangeExec that we will have to convert - // somehow - // 2) might already do the reuse work for us. The ReusedExchange is now a - // part of the SubqueryBroadcast, so we send it back here as underlying the - // GpuSubqueryBroadcastExchangeExec - case bqse: BroadcastQueryStageExec => - bqse.plan match { - case ex: BroadcastExchangeExec => - val exMeta = new GpuBroadcastMeta(ex, conf, p, r) - exMeta.tagForGpu() - if (exMeta.canThisBeReplaced) { - broadcastBuilder = () => exMeta.convertToGpu() - } else { - willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") - } - case reuse: ReusedExchangeExec => - reuse.child match { - case _: GpuBroadcastExchangeExec => - // A BroadcastExchange has already been replaced, so it can run on the GPU - broadcastBuilder = () => reuse - case _ => - willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") - } - } - case _ => - willNotWorkOnGpu("the subquery to broadcast can not entirely run in the GPU.") - } - /** - * Simply returns the original plan. Because its only child, BroadcastExchange, doesn't - * need to change if SubqueryBroadcastExec falls back to the CPU. - */ - override def convertToCpu(): SparkPlan = s +class GpuSubqueryBroadcastMeta(s: SubqueryBroadcastExec, + conf: RapidsConf, + p: Option[RapidsMeta[_, _, _]], + r: DataFromReplacementRule) + extends GpuSubqueryBroadcastMeta330DBBase(s, conf, p, r) { override def convertToGpu(): GpuExec = { GpuSubqueryBroadcastExec(s.name, Seq(s.index), s.buildKeys, broadcastBuilder())( getBroadcastModeKeyExprs) } - /** Extract the broadcast mode key expressions if there are any. */ - private def getBroadcastModeKeyExprs: Option[Seq[Expression]] = { - val broadcastMode = s.child match { - case b: BroadcastExchangeExec => - b.mode - case bqse: BroadcastQueryStageExec => - bqse.plan match { - case b: BroadcastExchangeExec => - b.mode - case reuse: ReusedExchangeExec => - reuse.child match { - case g: GpuBroadcastExchangeExec => - g.mode - } - case _ => - throw new AssertionError("should not reach here") - } - } - - broadcastMode match { - case HashedRelationBroadcastMode(keys, _) => Some(keys) - case IdentityBroadcastMode => None - case m => throw new UnsupportedOperationException(s"Unknown broadcast mode $m") - } - } -} +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala new file mode 100644 index 00000000000..a6248127bad --- /dev/null +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330db"} +{"spark": "332db"} +{"spark": "341db"} +{"spark": "350db143"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution + +import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, RapidsConf, RapidsMeta, SparkPlanMeta} + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode +import org.apache.spark.sql.execution.{SparkPlan, SubqueryBroadcastExec} +import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} +import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode + +abstract class GpuSubqueryBroadcastMeta330DBBase(s: SubqueryBroadcastExec, + conf: RapidsConf, + p: Option[RapidsMeta[_, _, _]], + r: DataFromReplacementRule) extends + SparkPlanMeta[SubqueryBroadcastExec](s, conf, p, r) { + protected var broadcastBuilder: () => SparkPlan = _ + + override val childExprs: Seq[BaseExprMeta[_]] = Nil + + override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil + + override def tagPlanForGpu(): Unit = s.child match { + // DPP: For AQE off, in this case, we handle DPP by converting the underlying + // BroadcastExchangeExec to GpuBroadcastExchangeExec. + // This is slightly different from the Apache Spark case, because Spark + // sends the underlying plan into the plugin in advance via the PlanSubqueries rule. + // Here, we have the full non-GPU subquery plan, so we convert the whole + // thing. + case ex @ BroadcastExchangeExec(_, child) => + val exMeta = new GpuBroadcastMeta(ex.copy(child = child), conf, p, r) + exMeta.tagForGpu() + if (exMeta.canThisBeReplaced) { + broadcastBuilder = () => exMeta.convertToGpu() + } else { + willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") + } + // DPP: For AQE on, we have an almost completely different scenario then before, + // Databricks uses a BroadcastQueryStageExec and either: + // 1) provide an underlying BroadcastExchangeExec that we will have to convert + // somehow + // 2) might already do the reuse work for us. The ReusedExchange is now a + // part of the SubqueryBroadcast, so we send it back here as underlying the + // GpuSubqueryBroadcastExchangeExec + case bqse: BroadcastQueryStageExec => + bqse.plan match { + case ex: BroadcastExchangeExec => + val exMeta = new GpuBroadcastMeta(ex, conf, p, r) + exMeta.tagForGpu() + if (exMeta.canThisBeReplaced) { + broadcastBuilder = () => exMeta.convertToGpu() + } else { + willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") + } + case reuse: ReusedExchangeExec => + reuse.child match { + case _: GpuBroadcastExchangeExec => + // A BroadcastExchange has already been replaced, so it can run on the GPU + broadcastBuilder = () => reuse + case _ => + willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.") + } + } + case _ => + willNotWorkOnGpu("the subquery to broadcast can not entirely run in the GPU.") + } + /** + * Simply returns the original plan. Because its only child, BroadcastExchange, doesn't + * need to change if SubqueryBroadcastExec falls back to the CPU. + */ + override def convertToCpu(): SparkPlan = s + + /** Extract the broadcast mode key expressions if there are any. */ + protected def getBroadcastModeKeyExprs: Option[Seq[Expression]] = { + val broadcastMode = s.child match { + case b: BroadcastExchangeExec => + b.mode + case bqse: BroadcastQueryStageExec => + bqse.plan match { + case b: BroadcastExchangeExec => + b.mode + case reuse: ReusedExchangeExec => + reuse.child match { + case g: GpuBroadcastExchangeExec => + g.mode + } + case _ => + throw new AssertionError("should not reach here") + } + } + + broadcastMode match { + case HashedRelationBroadcastMode(keys, _) => Some(keys) + case IdentityBroadcastMode => None + case m => throw new UnsupportedOperationException(s"Unknown broadcast mode $m") + } + } +} + diff --git a/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala b/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala index 2f362531646..f2781c06340 100644 --- a/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala +++ b/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala @@ -28,7 +28,7 @@ class GpuSubqueryBroadcastMeta( conf: RapidsConf, p: Option[RapidsMeta[_, _, _]], r: DataFromReplacementRule) extends - GpuSubqueryBroadcastMetaBase(s, conf, p, r) { + GpuSubqueryBroadcastMeta330DBBase(s, conf, p, r) { override def convertToGpu(): GpuExec = { GpuSubqueryBroadcastExec(s.name, s.indices, s.buildKeys, broadcastBuilder())( getBroadcastModeKeyExprs) From 6596a358a54672173d8138ce4a270beb118e6d6a Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 22 Nov 2024 05:06:52 +0000 Subject: [PATCH 2/3] Fix shim400 break. --- .../rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala index a6248127bad..87098966486 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala @@ -19,6 +19,7 @@ {"spark": "332db"} {"spark": "341db"} {"spark": "350db143"} +{"spark": "400"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution From 6e9b09e89edb50a644e6c886076fd7a5e61c514c Mon Sep 17 00:00:00 2001 From: MithunR Date: Sat, 23 Nov 2024 01:08:26 +0000 Subject: [PATCH 3/3] Fixed 400 shim extension, imports. --- .../execution/GpuSubqueryBroadcastMeta.scala | 9 ++--- .../GpuSubqueryBroadcastMeta330DBBase.scala | 1 - .../execution/GpuSubqueryBroadcastMeta.scala | 1 - .../execution/GpuSubqueryBroadcastMeta.scala | 35 +++++++++++++++++++ 4 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 sql-plugin/src/main/spark400/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala index f0ce5754f6d..ae32800e77a 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta.scala @@ -21,14 +21,9 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution -import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, GpuExec, RapidsConf, RapidsMeta, SparkPlanMeta} +import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuExec, RapidsConf, RapidsMeta} -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode -import org.apache.spark.sql.execution.{SparkPlan, SubqueryBroadcastExec} -import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} -import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode +import org.apache.spark.sql.execution.SubqueryBroadcastExec class GpuSubqueryBroadcastMeta(s: SubqueryBroadcastExec, conf: RapidsConf, diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala index 87098966486..a6248127bad 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/rapids/execution/GpuSubqueryBroadcastMeta330DBBase.scala @@ -19,7 +19,6 @@ {"spark": "332db"} {"spark": "341db"} {"spark": "350db143"} -{"spark": "400"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution diff --git a/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala b/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala index f2781c06340..10e3fa68b76 100644 --- a/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala +++ b/sql-plugin/src/main/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala @@ -15,7 +15,6 @@ */ /*** spark-rapids-shim-json-lines {"spark": "350db143"} -{"spark": "400"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution diff --git a/sql-plugin/src/main/spark400/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala b/sql-plugin/src/main/spark400/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala new file mode 100644 index 00000000000..c16564f523e --- /dev/null +++ b/sql-plugin/src/main/spark400/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "400"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution + +import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuExec, RapidsConf, RapidsMeta} + +import org.apache.spark.sql.execution.SubqueryBroadcastExec + +class GpuSubqueryBroadcastMeta( + s: SubqueryBroadcastExec, + conf: RapidsConf, + p: Option[RapidsMeta[_, _, _]], + r: DataFromReplacementRule) extends + GpuSubqueryBroadcastMetaBase(s, conf, p, r) { + override def convertToGpu(): GpuExec = { + GpuSubqueryBroadcastExec(s.name, s.indices, s.buildKeys, broadcastBuilder())( + getBroadcastModeKeyExprs) + } +}