Skip to content

Commit

Permalink
Fix aqe_test failures on [databricks] 14.3.
Browse files Browse the repository at this point in the history
Fixes #11643.

This commit fixes the AQE/DPP tests that were reported in #11643 to
be failing on Databricks 14.3.

This is the result of a deficient shim for GpuSubqueryBroadcastMeta
being active for Databricks 14.3.  The deficient shim errantly
extended the non-Databricks base shim.

This commit moves the commonality in Databricks shims to a common
base class that is then customized for the changes in Databricks 14.3.

Signed-off-by: MithunR <[email protected]>
  • Loading branch information
mythrocks committed Nov 22, 2024
1 parent 9b06ae3 commit 538532f
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 96 deletions.
6 changes: 2 additions & 4 deletions integration_tests/src/main/python/aqe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,10 @@ def do_it(spark):

# this should be fixed by https://github.com/NVIDIA/spark-rapids/issues/11120
aqe_join_with_dpp_fallback=["FilterExec"] if (is_databricks_runtime() or is_before_spark_330()) else []
if is_databricks_version_or_later(14, 3):
aqe_join_with_dpp_fallback.append("CollectLimitExec")

# Verify that DPP and AQE can coexist in even some odd cases involving multiple tables
@pytest.mark.skipif(condition=is_databricks_version_or_later(14, 3),
reason="https://github.com/NVIDIA/spark-rapids/issues/11643")
@ignore_order(local=True)
@allow_non_gpu(*aqe_join_with_dpp_fallback)
def test_aqe_join_with_dpp(spark_tmp_path):
Expand Down Expand Up @@ -395,8 +395,6 @@ def run_test(spark):
assert_gpu_and_cpu_are_equal_collect(run_test, conf=_adaptive_conf)

# Verify that DPP and AQE can coexist in even some odd cases involving 2 tables with multiple columns
@pytest.mark.skipif(condition=is_databricks_version_or_later(14, 3),
reason="https://github.com/NVIDIA/spark-rapids/issues/11643")
@ignore_order(local=True)
@allow_non_gpu(*aqe_join_with_dpp_fallback)
def test_aqe_join_with_dpp_multi_columns(spark_tmp_path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ abstract class GpuSubqueryBroadcastMetaBase(

override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil

override def tagPlanForGpu(): Unit = s.child match {
override def tagPlanForGpu(): Unit = {
s.child match {


// For AQE off:
//
Expand Down Expand Up @@ -126,13 +128,14 @@ abstract class GpuSubqueryBroadcastMetaBase(
} else {
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
case _ =>
throw new AssertionError("should not reach here")

case unexpected =>
throw new AssertionError(s"Unexpected child exec in AdaptiveSparkPlan: ${unexpected.getClass.getName}")
}

case _ =>
willNotWorkOnGpu("the subquery to broadcast can not entirely run in the GPU.")
}
}}

/**
* Simply returns the original plan. Because its only child, BroadcastExchange, doesn't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,96 +30,15 @@ import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode

class GpuSubqueryBroadcastMeta(
s: SubqueryBroadcastExec,
conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends
SparkPlanMeta[SubqueryBroadcastExec](s, conf, p, r) {
private var broadcastBuilder: () => SparkPlan = _

override val childExprs: Seq[BaseExprMeta[_]] = Nil

override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil

override def tagPlanForGpu(): Unit = s.child match {
// DPP: For AQE off, in this case, we handle DPP by converting the underlying
// BroadcastExchangeExec to GpuBroadcastExchangeExec.
// This is slightly different from the Apache Spark case, because Spark
// sends the underlying plan into the plugin in advance via the PlanSubqueries rule.
// Here, we have the full non-GPU subquery plan, so we convert the whole
// thing.
case ex @ BroadcastExchangeExec(_, child) =>
val exMeta = new GpuBroadcastMeta(ex.copy(child = child), conf, p, r)
exMeta.tagForGpu()
if (exMeta.canThisBeReplaced) {
broadcastBuilder = () => exMeta.convertToGpu()
} else {
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
// DPP: For AQE on, we have an almost completely different scenario then before,
// Databricks uses a BroadcastQueryStageExec and either:
// 1) provide an underlying BroadcastExchangeExec that we will have to convert
// somehow
// 2) might already do the reuse work for us. The ReusedExchange is now a
// part of the SubqueryBroadcast, so we send it back here as underlying the
// GpuSubqueryBroadcastExchangeExec
case bqse: BroadcastQueryStageExec =>
bqse.plan match {
case ex: BroadcastExchangeExec =>
val exMeta = new GpuBroadcastMeta(ex, conf, p, r)
exMeta.tagForGpu()
if (exMeta.canThisBeReplaced) {
broadcastBuilder = () => exMeta.convertToGpu()
} else {
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
case reuse: ReusedExchangeExec =>
reuse.child match {
case _: GpuBroadcastExchangeExec =>
// A BroadcastExchange has already been replaced, so it can run on the GPU
broadcastBuilder = () => reuse
case _ =>
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
}
case _ =>
willNotWorkOnGpu("the subquery to broadcast can not entirely run in the GPU.")
}
/**
* Simply returns the original plan. Because its only child, BroadcastExchange, doesn't
* need to change if SubqueryBroadcastExec falls back to the CPU.
*/
override def convertToCpu(): SparkPlan = s
class GpuSubqueryBroadcastMeta(s: SubqueryBroadcastExec,
conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule)
extends GpuSubqueryBroadcastMeta330DBBase(s, conf, p, r) {

override def convertToGpu(): GpuExec = {
GpuSubqueryBroadcastExec(s.name, Seq(s.index), s.buildKeys, broadcastBuilder())(
getBroadcastModeKeyExprs)
}

/** Extract the broadcast mode key expressions if there are any. */
private def getBroadcastModeKeyExprs: Option[Seq[Expression]] = {
val broadcastMode = s.child match {
case b: BroadcastExchangeExec =>
b.mode
case bqse: BroadcastQueryStageExec =>
bqse.plan match {
case b: BroadcastExchangeExec =>
b.mode
case reuse: ReusedExchangeExec =>
reuse.child match {
case g: GpuBroadcastExchangeExec =>
g.mode
}
case _ =>
throw new AssertionError("should not reach here")
}
}

broadcastMode match {
case HashedRelationBroadcastMode(keys, _) => Some(keys)
case IdentityBroadcastMode => None
case m => throw new UnsupportedOperationException(s"Unknown broadcast mode $m")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "330db"}
{"spark": "332db"}
{"spark": "341db"}
{"spark": "350db143"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution

import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, RapidsConf, RapidsMeta, SparkPlanMeta}

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode
import org.apache.spark.sql.execution.{SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode

abstract class GpuSubqueryBroadcastMeta330DBBase(s: SubqueryBroadcastExec,
conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends
SparkPlanMeta[SubqueryBroadcastExec](s, conf, p, r) {
protected var broadcastBuilder: () => SparkPlan = _

override val childExprs: Seq[BaseExprMeta[_]] = Nil

override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil

override def tagPlanForGpu(): Unit = s.child match {
// DPP: For AQE off, in this case, we handle DPP by converting the underlying
// BroadcastExchangeExec to GpuBroadcastExchangeExec.
// This is slightly different from the Apache Spark case, because Spark
// sends the underlying plan into the plugin in advance via the PlanSubqueries rule.
// Here, we have the full non-GPU subquery plan, so we convert the whole
// thing.
case ex @ BroadcastExchangeExec(_, child) =>
val exMeta = new GpuBroadcastMeta(ex.copy(child = child), conf, p, r)
exMeta.tagForGpu()
if (exMeta.canThisBeReplaced) {
broadcastBuilder = () => exMeta.convertToGpu()
} else {
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
// DPP: For AQE on, we have an almost completely different scenario then before,
// Databricks uses a BroadcastQueryStageExec and either:
// 1) provide an underlying BroadcastExchangeExec that we will have to convert
// somehow
// 2) might already do the reuse work for us. The ReusedExchange is now a
// part of the SubqueryBroadcast, so we send it back here as underlying the
// GpuSubqueryBroadcastExchangeExec
case bqse: BroadcastQueryStageExec =>
bqse.plan match {
case ex: BroadcastExchangeExec =>
val exMeta = new GpuBroadcastMeta(ex, conf, p, r)
exMeta.tagForGpu()
if (exMeta.canThisBeReplaced) {
broadcastBuilder = () => exMeta.convertToGpu()
} else {
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
case reuse: ReusedExchangeExec =>
reuse.child match {
case _: GpuBroadcastExchangeExec =>
// A BroadcastExchange has already been replaced, so it can run on the GPU
broadcastBuilder = () => reuse
case _ =>
willNotWorkOnGpu("underlying BroadcastExchange can not run in the GPU.")
}
}
case _ =>
willNotWorkOnGpu("the subquery to broadcast can not entirely run in the GPU.")
}
/**
* Simply returns the original plan. Because its only child, BroadcastExchange, doesn't
* need to change if SubqueryBroadcastExec falls back to the CPU.
*/
override def convertToCpu(): SparkPlan = s

/** Extract the broadcast mode key expressions if there are any. */
protected def getBroadcastModeKeyExprs: Option[Seq[Expression]] = {
val broadcastMode = s.child match {
case b: BroadcastExchangeExec =>
b.mode
case bqse: BroadcastQueryStageExec =>
bqse.plan match {
case b: BroadcastExchangeExec =>
b.mode
case reuse: ReusedExchangeExec =>
reuse.child match {
case g: GpuBroadcastExchangeExec =>
g.mode
}
case _ =>
throw new AssertionError("should not reach here")
}
}

broadcastMode match {
case HashedRelationBroadcastMode(keys, _) => Some(keys)
case IdentityBroadcastMode => None
case m => throw new UnsupportedOperationException(s"Unknown broadcast mode $m")
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class GpuSubqueryBroadcastMeta(
conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends
GpuSubqueryBroadcastMetaBase(s, conf, p, r) {
GpuSubqueryBroadcastMeta330DBBase(s, conf, p, r) {
override def convertToGpu(): GpuExec = {
GpuSubqueryBroadcastExec(s.name, s.indices, s.buildKeys, broadcastBuilder())(
getBroadcastModeKeyExprs)
Expand Down

0 comments on commit 538532f

Please sign in to comment.