-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] Explore CollectLimit
use in DPP queries on Databricks
#11764
Comments
I really want to understand a few things about this feature. The main one is that it looks like we might produce an incorrect answer if the aggregation returns more results than the limit allows for. So essentially what is happening is that we have a table that is small enough it is being turned into a broadcast join. That broadcast is then turned into a single partition task through the AQE transition code. It goes through a hash aggregate to get distinct values and those distinct values are then limited and finally sent to the DPP filter. If that CollectLimit removed some values before it was turned into a new broadcast, then the DPP would produce the wrong answer because we are broadcasting an allow list. If we are missing something that should be allowed we will not read everything we should. I suspect that the code that reads the data from the collect is looking at how many rows are returned, and if there are too many rows, then it might not do a DPP filter at all (note that they are probably asking for a limit of 3,000,000 rows, and collectLimit itself is adding the 1 extra row so that it can tell if the rows were cut off or not). It does not appear that AQE is looking at the number of rows in the broadcast, as that is only 34, so it would know that the limit is not needed, or might violate something. It also don't think that the code knows the number of partitions in the table and would be limiting things that way (as there is no where near 3 million partitions in this test). But who knows. I just want to understand it so I know if we need to fix some bugs somewhere to deal with it. The other question I have really is around performance, and if it is worth us trying to optimize collectLimitExec? We need to convert data back to rows and send it to the CPU so I don't want to write our own collectLimitExec, but We could insert in a GpuLocalLimit in front of it. |
Fixes NVIDIA#11536. This commit fixes the tests in `dpp_test.py` that were failing on Databricks 14.3. The failures were largely a result of an erroneous shim implementation, that was fixed as part of NVIDIA#11750. This commit accounts for the remaining failures that result from there being a `CollectLimitExec` in certain DPP query plans (that include broadcast joins, for example). The tests have been made more permissive, in allowing the `CollectLimitExec` to run on the CPU. The `CollectLimitExec` based plans will be further explored as part of NVIDIA#11764. Signed-off-by: MithunR <[email protected]>
Fixes #11536. This commit fixes the tests in `dpp_test.py` that were failing on Databricks 14.3. The failures were largely a result of an erroneous shim implementation, that was fixed as part of #11750. This commit accounts for the remaining failures that result from there being a `CollectLimitExec` in certain DPP query plans (that include broadcast joins, for example). The tests have been made more permissive, in allowing the `CollectLimitExec` to run on the CPU. The `CollectLimitExec` based plans will be further explored as part of #11764. Signed-off-by: MithunR <[email protected]>
This is a follow-on issue from the work in #11750, which fixes the test failures in
aqe_test.py
pertaining to DPP on Databricks 14.3.We found a condition where Databricks seems to insert a
CollectLimit
(with a very large collection limit) on a join query, with DPP enabled.Here's the query (paraphrased from
aqe_test.py:: test_aqe_join_with_dpp
):The resultant plan looks thus:
Note the
+- CollectLimit 3000001
. It appears to be the result of pruning (sub)partitions as part of DPP.It would be good to understand this optimization better, and the expected behaviour if there were more records/partitions than the limit.
The text was updated successfully, but these errors were encountered: