Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix aqe_test failures on [databricks] 14.3. #11750

Merged
merged 3 commits into from
Nov 25, 2024

Conversation

mythrocks
Copy link
Collaborator

Fixes #11643.

This commit fixes the AQE/DPP tests that were reported in #11643 to be failing on Databricks 14.3.

This is the result of a deficient shim for GpuSubqueryBroadcastMeta being active for Databricks 14.3. The deficient shim errantly extended the non-Databricks base shim.

This commit moves the commonality in Databricks shims to a common base class that is then customized for the changes in Databricks 14.3.

@mythrocks mythrocks force-pushed the databricks-14.3-aqe-dpp branch from 538532f to aa92d57 Compare November 22, 2024 04:38
Fixes NVIDIA#11643.

This commit fixes the AQE/DPP tests that were reported in NVIDIA#11643 to
be failing on Databricks 14.3.

This is the result of a deficient shim for GpuSubqueryBroadcastMeta
being active for Databricks 14.3.  The deficient shim errantly
extended the non-Databricks base shim.

This commit moves the commonality in Databricks shims to a common
base class that is then customized for the changes in Databricks 14.3.

Signed-off-by: MithunR <[email protected]>
@mythrocks mythrocks force-pushed the databricks-14.3-aqe-dpp branch from aa92d57 to 0f42635 Compare November 22, 2024 04:45
@mythrocks
Copy link
Collaborator Author

Build

@sameerz sameerz added the bug Something isn't working label Nov 22, 2024
@@ -338,10 +338,10 @@ def do_it(spark):

# this should be fixed by https://github.com/NVIDIA/spark-rapids/issues/11120
aqe_join_with_dpp_fallback=["FilterExec"] if (is_databricks_runtime() or is_before_spark_330()) else []
if is_databricks_version_or_later(14, 3):
aqe_join_with_dpp_fallback.append("CollectLimitExec")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does a collect limit show up in these queries? This scares me a bit because CollectLimitExec implies a non-deterministic result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. DB-14.3 seems to be inserting this into the plan, albeit with a large collection limit:

+- Project [toprettystring(site_id#6, Some(UTC)) AS toprettystring(site_id)#42, toprettystring(day#5, Some(UTC)) AS toprettystring(day)#43, toprettystring(test_id#7, Some(UTC)) AS toprettystring(test_id)#44, toprettystring(test_id#0, Some(UTC)) AS toprettystring(test_id)#45, toprettystring(site_id#1, Some(UTC)) AS toprettystring(site_id)#46]
   +- BroadcastHashJoin [test_id#7, site_id#6], [test_id#0, site_id#1], Inner, BuildRight, false, true
      :- Union
      :  :- Project [site_id#6, day#5, test_id#7]
      :  :  +- FileScan parquet [day#5,site_id#6,test_id#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/infoA], PartitionFilters: [isnotnull(day#5), (day#5 = 1990-01-01), site_id#6 IN (site_0,site_1), isnotnull(test_id#7), isno..., PushedFilters: [], ReadSchema: struct<>
      :  :        :- SubqueryBroadcast dynamicpruning#53, [0], [test_id#0], true, [id=#322]
      :  :        :  +- AdaptiveSparkPlan isFinalPlan=false
      :  :        :     +- CollectLimit 3000001
      :  :        :        +- HashAggregate(keys=[test_id#0], functions=[], output=[test_id#0])
      :  :        :           +- ShuffleQueryStage 2, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
      :  :        :              +- ReusedExchange [test_id#0, site_id#1], GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=238], [loreId=17]
      :  :        +- SubqueryBroadcast dynamicpruning#54, [0], [site_id#1], true, [id=#335]
      :  :           +- AdaptiveSparkPlan isFinalPlan=false
      :  :              +- CollectLimit 3000001
      :  :                 +- HashAggregate(keys=[site_id#1], functions=[], output=[site_id#1])
      :  :                    +- ShuffleQueryStage 4, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
      :  :                       +- ReusedExchange [test_id#0, site_id#1], GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=238], [loreId=17]
      :  +- Project [CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 WHEN (site_id#15 = LONG_SITE_NAME_1) THEN site_1 ELSE site_id#15 END AS site_id#30, day#14, test_spec#12 AS test_id#31]
      :     +- Filter isnotnull(test_spec#12)
      :        +- FileScan parquet [test_spec#12,day#14,site_id#15] Batched: true, DataFilters: [isnotnull(test_spec#12), dynamicpruningexpression(test_spec#12 IN dynamicpruning#53)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/infoB], PartitionFilters: [isnotnull(day#14), (day#14 = 1990-01-01), CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 ..., PushedFilters: [IsNotNull(test_spec)], ReadSchema: struct<test_spec:string>
      :              :- ReusedSubquery SubqueryBroadcast dynamicpruning#54, [0], [site_id#1], true, [id=#335]
      :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#53, [0], [test_id#0], true, [id=#322]
      +- ShuffleQueryStage 0, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
         +- GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=238], [loreId=17]
            +- GpuCoalesceBatches targetsize(1073741824), [loreId=16]
               +- GpuFilter ((gpuisnotnull(test_id#0) AND gpuisnotnull(site_id#1)) AND site_id#1 INSET site_0, site_1), [loreId=15]
                  +- GpuFileGpuScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>

I don't think it's spark-rapids inserting this, but I'm checking.

Copy link
Collaborator Author

@mythrocks mythrocks Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is for the query:

with tmp as 
(SELECT
   site_id, day, test_id
 FROM infoA
 UNION ALL
 SELECT
   CASE
     WHEN site_id = 'LONG_SITE_NAME_0' then 'site_0'
     WHEN site_id = 'LONG_SITE_NAME_1' then 'site_1'
     ELSE site_id
   END AS site_id, day, test_spec AS test_id
 FROM infoB)
 SELECT *
 FROM tmp a JOIN tests b ON a.test_id = b.test_id AND a.site_id = b.site_id
 WHERE day = '1990-01-01'
 AND a.site_id IN ('site_0', 'site_1')

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a really interesting optimization. Can you file an issue for us to explore this more closely. It looks like they are doing dynamic pruning. Not sure why they insert in the CollectLimit. We might want to be a bit smarter about how we are dealing with that operator in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filed #11764 to explore this further.

@mythrocks mythrocks self-assigned this Nov 23, 2024
@mythrocks
Copy link
Collaborator Author

Build

@mythrocks mythrocks requested a review from revans2 November 25, 2024 17:00
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really want to understand what is happening with the collect limit. Especially if we do end up with more values than in the limit. What happens in databricks? Do they have special code to not do the filtering? We are not doing that, so I am scared that we have a missed feature that we need to deal with.

@mythrocks
Copy link
Collaborator Author

Thank you for the review, @revans2. I've filed #11764 to explore the optimization further.

@mythrocks mythrocks merged commit 938db21 into NVIDIA:branch-24.12 Nov 25, 2024
49 checks passed
mythrocks added a commit to mythrocks/spark-rapids that referenced this pull request Nov 26, 2024
Fixes NVIDIA#11536.

This commit fixes the tests in `dpp_test.py` that were failing on
Databricks 14.3.

The failures were largely a result of an erroneous shim implementation,
that was fixed as part of NVIDIA#11750.

This commit accounts for the remaining failures that result from there
being a `CollectLimitExec` in certain DPP query plans (that include
broadcast joins, for example).  The tests have been made more
permissive, in allowing the `CollectLimitExec` to run on the CPU.

The `CollectLimitExec` based plans will be further explored as part of
NVIDIA#11764.

Signed-off-by: MithunR <[email protected]>
mythrocks added a commit that referenced this pull request Nov 26, 2024
Fixes #11536.

This commit fixes the tests in `dpp_test.py` that were failing on
Databricks 14.3.

The failures were largely a result of an erroneous shim implementation,
that was fixed as part of #11750.

This commit accounts for the remaining failures that result from there
being a `CollectLimitExec` in certain DPP query plans (that include
broadcast joins, for example).  The tests have been made more
permissive, in allowing the `CollectLimitExec` to run on the CPU.

The `CollectLimitExec` based plans will be further explored as part of
#11764.

Signed-off-by: MithunR <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Support AQE with Broadcast Hash Join and DPP on Databricks 14.3
3 participants