-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] Scalable and optimal dask_cudf based drop duplicates #14439
Comments
There are kind of two aspects here:
What exactly breaks when you have skewed data? I can see that a reduction-based approach will be painful with many unique values (because the binary op For a shuffle-based approach, if you can complete the shuffle then if your hash is good you shouldn't have skewed output partition sizes. So is the problem that you don't have a good hash (so that a single partition ends up with too many unique values), or is it that you don't have a large scalable shuffle? I guess skew due to repeated input values can at most blow up a single partition by a factor of If you have skew that is somehow only coming from a few unique values (though again, I'm not sure how this is problematic) you could imagine a two-pass approach where you first compute the top-k most frequent values and produce a small dataframe of those, do a broadcast and then de-dup those fully, followed by the existing approaches. We also discussed a data-movement suboptimal "systolic loop" approach where you compute on a partitionwise deduplicated dataframe This strictly moves more data than a shuffle. If the input has However it might be more amenable to running since the individual sends are large and the pattern is neighbour-wise (you do it in rounds and only send to your neighbours in each round) rather than all-to-all. For the shuffle, there might be ways of improving the local footprint by fusing the drop-dup call into the receive phase. I think @rjzamora did some experimentation with things in this regard. |
Thanks @VibhuJawa for starting this discussion, and @wence- for the nice breakdown. @VibhuJawa - Just to confirm: Your summary is implying that both the tree-reduction and shuffle-based drop-duplicates algorithms available in dask are not sufficient, correct? I just want to make sure it is understood that curator doesn't actually need its own version of drop_duplicates anymore, because dask offers the same functionality now. I agree with @wence- that there are other algorithms we can consider, but that there are also two parts to this problem. Reducing the memory footprint at the cudf/libcudf level makes the distributed algorithm much easier to scale. If the bottleneck is shuffling, then this may just be further motivation for us to push on scalable (no OOM) shuffling in dask-cudf (already a high priority for 24.02/04). |
No sorry for confusion, shuffle based drop duplicates is sufficient for curator as we don't have big skew there. For the Currently cugraph is pushing on its own custom solution in the C++ layer so we are not blocked there either. I was not aware about dask/dask#10542, that solution implies we can directly use that . One of the main goals of me starting the issue was pushing towards a solution that looks exactly like that while we figure out a way around the skew problem (which is not blocking yet). |
We shuffle on the |
Lets say if we have a data frame like below: Partition 0
Partition 1
And we shuffle on |
Okay, I think I understand your question/concern. When we use a hash-based shuffle in dask/dask-cudf, we will use both the In contrast, a sort-based shuffle in
Not unless |
Thanks, in that case i think |
Given this, is the initial request still required? If so, could you speak to what (if any) ordering requirements you have on the output, either global or per-partition. I imagine that you can't have any, but could be wrong. |
No we dont require it. For awareness, we have also moved I think the only push we require now is to just reduce the overhead of the |
I'm going to close this issue since it seems from the above discussion that the only open task is now captured in #13157. |
dask_cudf
currently lacks a scalable solution for dropping duplicates, especially in large datasets with skewed distributions. This limitation poses significant challenges in our workflows, particularly in projects like curator and cugraph which commonly run into this issue.For the time being in
curator
we use a shuffle based deduplication approach and incugraph
we are moving towards doing this in c++ cugraph layer. (See PR)Related
cudf
issue:cudf::distinct
with cuco reduction map #13157Internal Discussion about this: https://nvidia.slack.com/archives/CENC8QZ63/p1700146964773109
CC: @ayushdg , @rjzamora , @wence-
The text was updated successfully, but these errors were encountered: