-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix aqe_test failures on [databricks] 14.3. #11750
Fix aqe_test failures on [databricks] 14.3. #11750
Conversation
538532f
to
aa92d57
Compare
Fixes NVIDIA#11643. This commit fixes the AQE/DPP tests that were reported in NVIDIA#11643 to be failing on Databricks 14.3. This is the result of a deficient shim for GpuSubqueryBroadcastMeta being active for Databricks 14.3. The deficient shim errantly extended the non-Databricks base shim. This commit moves the commonality in Databricks shims to a common base class that is then customized for the changes in Databricks 14.3. Signed-off-by: MithunR <[email protected]>
aa92d57
to
0f42635
Compare
Build |
@@ -338,10 +338,10 @@ def do_it(spark): | |||
|
|||
# this should be fixed by https://github.com/NVIDIA/spark-rapids/issues/11120 | |||
aqe_join_with_dpp_fallback=["FilterExec"] if (is_databricks_runtime() or is_before_spark_330()) else [] | |||
if is_databricks_version_or_later(14, 3): | |||
aqe_join_with_dpp_fallback.append("CollectLimitExec") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does a collect limit show up in these queries? This scares me a bit because CollectLimitExec implies a non-deterministic result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. DB-14.3 seems to be inserting this into the plan, albeit with a large collection limit:
+- Project [toprettystring(site_id#6, Some(UTC)) AS toprettystring(site_id)#42, toprettystring(day#5, Some(UTC)) AS toprettystring(day)#43, toprettystring(test_id#7, Some(UTC)) AS toprettystring(test_id)#44, toprettystring(test_id#0, Some(UTC)) AS toprettystring(test_id)#45, toprettystring(site_id#1, Some(UTC)) AS toprettystring(site_id)#46]
+- BroadcastHashJoin [test_id#7, site_id#6], [test_id#0, site_id#1], Inner, BuildRight, false, true
:- Union
: :- Project [site_id#6, day#5, test_id#7]
: : +- FileScan parquet [day#5,site_id#6,test_id#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/infoA], PartitionFilters: [isnotnull(day#5), (day#5 = 1990-01-01), site_id#6 IN (site_0,site_1), isnotnull(test_id#7), isno..., PushedFilters: [], ReadSchema: struct<>
: : :- SubqueryBroadcast dynamicpruning#53, [0], [test_id#0], true, [id=#322]
: : : +- AdaptiveSparkPlan isFinalPlan=false
: : : +- CollectLimit 3000001
: : : +- HashAggregate(keys=[test_id#0], functions=[], output=[test_id#0])
: : : +- ShuffleQueryStage 2, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
: : : +- ReusedExchange [test_id#0, site_id#1], GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=238], [loreId=17]
: : +- SubqueryBroadcast dynamicpruning#54, [0], [site_id#1], true, [id=#335]
: : +- AdaptiveSparkPlan isFinalPlan=false
: : +- CollectLimit 3000001
: : +- HashAggregate(keys=[site_id#1], functions=[], output=[site_id#1])
: : +- ShuffleQueryStage 4, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
: : +- ReusedExchange [test_id#0, site_id#1], GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=238], [loreId=17]
: +- Project [CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 WHEN (site_id#15 = LONG_SITE_NAME_1) THEN site_1 ELSE site_id#15 END AS site_id#30, day#14, test_spec#12 AS test_id#31]
: +- Filter isnotnull(test_spec#12)
: +- FileScan parquet [test_spec#12,day#14,site_id#15] Batched: true, DataFilters: [isnotnull(test_spec#12), dynamicpruningexpression(test_spec#12 IN dynamicpruning#53)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/infoB], PartitionFilters: [isnotnull(day#14), (day#14 = 1990-01-01), CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 ..., PushedFilters: [IsNotNull(test_spec)], ReadSchema: struct<test_spec:string>
: :- ReusedSubquery SubqueryBroadcast dynamicpruning#54, [0], [site_id#1], true, [id=#335]
: +- ReusedSubquery SubqueryBroadcast dynamicpruning#53, [0], [test_id#0], true, [id=#322]
+- ShuffleQueryStage 0, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
+- GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=238], [loreId=17]
+- GpuCoalesceBatches targetsize(1073741824), [loreId=16]
+- GpuFilter ((gpuisnotnull(test_id#0) AND gpuisnotnull(site_id#1)) AND site_id#1 INSET site_0, site_1), [loreId=15]
+- GpuFileGpuScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>
I don't think it's spark-rapids
inserting this, but I'm checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above is for the query:
with tmp as
(SELECT
site_id, day, test_id
FROM infoA
UNION ALL
SELECT
CASE
WHEN site_id = 'LONG_SITE_NAME_0' then 'site_0'
WHEN site_id = 'LONG_SITE_NAME_1' then 'site_1'
ELSE site_id
END AS site_id, day, test_spec AS test_id
FROM infoB)
SELECT *
FROM tmp a JOIN tests b ON a.test_id = b.test_id AND a.site_id = b.site_id
WHERE day = '1990-01-01'
AND a.site_id IN ('site_0', 'site_1')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a really interesting optimization. Can you file an issue for us to explore this more closely. It looks like they are doing dynamic pruning. Not sure why they insert in the CollectLimit. We might want to be a bit smarter about how we are dealing with that operator in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've filed #11764 to explore this further.
...ain/spark350db143/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastMeta.scala
Show resolved
Hide resolved
Build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really want to understand what is happening with the collect limit. Especially if we do end up with more values than in the limit. What happens in databricks? Do they have special code to not do the filtering? We are not doing that, so I am scared that we have a missed feature that we need to deal with.
Fixes NVIDIA#11536. This commit fixes the tests in `dpp_test.py` that were failing on Databricks 14.3. The failures were largely a result of an erroneous shim implementation, that was fixed as part of NVIDIA#11750. This commit accounts for the remaining failures that result from there being a `CollectLimitExec` in certain DPP query plans (that include broadcast joins, for example). The tests have been made more permissive, in allowing the `CollectLimitExec` to run on the CPU. The `CollectLimitExec` based plans will be further explored as part of NVIDIA#11764. Signed-off-by: MithunR <[email protected]>
Fixes #11536. This commit fixes the tests in `dpp_test.py` that were failing on Databricks 14.3. The failures were largely a result of an erroneous shim implementation, that was fixed as part of #11750. This commit accounts for the remaining failures that result from there being a `CollectLimitExec` in certain DPP query plans (that include broadcast joins, for example). The tests have been made more permissive, in allowing the `CollectLimitExec` to run on the CPU. The `CollectLimitExec` based plans will be further explored as part of #11764. Signed-off-by: MithunR <[email protected]>
Fixes #11643.
This commit fixes the AQE/DPP tests that were reported in #11643 to be failing on Databricks 14.3.
This is the result of a deficient shim for GpuSubqueryBroadcastMeta being active for Databricks 14.3. The deficient shim errantly extended the non-Databricks base shim.
This commit moves the commonality in Databricks shims to a common base class that is then customized for the changes in Databricks 14.3.