Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Scalable and optimal dask_cudf based drop duplicates #14439

Closed
VibhuJawa opened this issue Nov 17, 2023 · 10 comments
Closed

[FEA] Scalable and optimal dask_cudf based drop duplicates #14439

VibhuJawa opened this issue Nov 17, 2023 · 10 comments
Labels
feature request New feature or request no-oom Reducing memory footprint of cudf algorithms

Comments

@VibhuJawa
Copy link
Member

VibhuJawa commented Nov 17, 2023

dask_cudf currently lacks a scalable solution for dropping duplicates, especially in large datasets with skewed distributions. This limitation poses significant challenges in our workflows, particularly in projects like curator and cugraph which commonly run into this issue.

For the time being in curator we use a shuffle based deduplication approach and in cugraph we are moving towards doing this in c++ cugraph layer. (See PR)

Related cudf issue:

Internal Discussion about this: https://nvidia.slack.com/archives/CENC8QZ63/p1700146964773109

CC: @ayushdg , @rjzamora , @wence-

@VibhuJawa VibhuJawa added feature request New feature or request Needs Triage Need team to review and classify labels Nov 17, 2023
@wence-
Copy link
Contributor

wence- commented Nov 17, 2023

There are kind of two aspects here:

  1. Reducing the footprint of any individual drop_duplicates call on a single data frame. If you're keeping unique values and don't care about which unique value you get we can make cudf's memory footprint a bit lower by exposing the keep = ANY parameter to python (I think [FEA] Improve cudf::distinct with cuco reduction map #13157 would have a similar effect in terms of reducing the footprint).
  2. Providing a distributed approach that scales

What exactly breaks when you have skewed data? I can see that a reduction-based approach will be painful with many unique values (because the binary op $A \oplus B$ which is concat([A, B]).drop_duplicates() that combines two partitions doesn't necessarily decrease the size, and may increase it) so there needs to be rebalancing at every level of the tree to keep the size in check.

For a shuffle-based approach, if you can complete the shuffle then if your hash is good you shouldn't have skewed output partition sizes. So is the problem that you don't have a good hash (so that a single partition ends up with too many unique values), or is it that you don't have a large scalable shuffle? I guess skew due to repeated input values can at most blow up a single partition by a factor of npartitions (you'd have to be really unlucky for this to be terrible though, I think).

If you have skew that is somehow only coming from a few unique values (though again, I'm not sure how this is problematic) you could imagine a two-pass approach where you first compute the top-k most frequent values and produce a small dataframe of those, do a broadcast and then de-dup those fully, followed by the existing approaches.

We also discussed a data-movement suboptimal "systolic loop" approach where you compute

$$ \begin{bmatrix} A_0 \otimes A_1 \otimes \dots \otimes A_n \\\ A_1 \otimes A_2 \otimes \dots \otimes A_n \otimes A_0 \\\ \vdots\\ A_n \otimes A_0 \otimes \dots \otimes A_1 \end{bmatrix} $$

on a partitionwise deduplicated dataframe $[A_0, \dots, A_n]$, where $A \otimes B$ is A.merge(B, on=..., how="leftanti").

This strictly moves more data than a shuffle. If the input has $P$ partitions and $N$ total rows (after local deduplication) then everyone sends $P \frac{N}{P}$ data, for a total of $N P$ data movement. In contrast, in a shuffle, everyone sends $\frac{N}{P}$ data for a total $N$ data movement.

However it might be more amenable to running since the individual sends are large and the pattern is neighbour-wise (you do it in rounds and only send to your neighbours in each round) rather than all-to-all.

For the shuffle, there might be ways of improving the local footprint by fusing the drop-dup call into the receive phase. I think @rjzamora did some experimentation with things in this regard.

@rjzamora
Copy link
Member

Thanks @VibhuJawa for starting this discussion, and @wence- for the nice breakdown.

@VibhuJawa - Just to confirm: Your summary is implying that both the tree-reduction and shuffle-based drop-duplicates algorithms available in dask are not sufficient, correct? I just want to make sure it is understood that curator doesn't actually need its own version of drop_duplicates anymore, because dask offers the same functionality now.

I agree with @wence- that there are other algorithms we can consider, but that there are also two parts to this problem. Reducing the memory footprint at the cudf/libcudf level makes the distributed algorithm much easier to scale. If the bottleneck is shuffling, then this may just be further motivation for us to push on scalable (no OOM) shuffling in dask-cudf (already a high priority for 24.02/04).

@VibhuJawa
Copy link
Member Author

@VibhuJawa - Just to confirm: Your summary is implying that both the tree-reduction and shuffle-based drop-duplicates algorithms available in dask are not sufficient, correct?

No sorry for confusion, shuffle based drop duplicates is sufficient for curator as we don't have big skew there.

For the cugraph use case i will have to find a MRE . The graph input is edges lists (src, dst) and the graph can often have super nodes(nodes with large degrees). Given that shuffle currently only does shuffling on a single column so i don't that shuffle can always work there as we can not repartition there. Please confirm that behavior (@rjzamora )

Currently cugraph is pushing on its own custom solution in the C++ layer so we are not blocked there either.

I was not aware about dask/dask#10542, that solution implies we can directly use that . One of the main goals of me starting the issue was pushing towards a solution that looks exactly like that while we figure out a way around the skew problem (which is not blocking yet).

@rjzamora
Copy link
Member

Given that shuffle currently only does shuffling on a single column so i don't that shuffle can always work there as we can not repartition there. Please confirm that behavior (@rjzamora )

We shuffle on the subset columns if they are specified, otherwise we just shuffle on all columns (https://github.com/rjzamora/dask/blob/da04f15e48ceadcf4893550de7b9c38126011a19/dask/dataframe/core.py#L905). Is that what you are asking?

@VibhuJawa
Copy link
Member Author

VibhuJawa commented Nov 17, 2023

We shuffle on the subset columns if they are specified, otherwise we just shuffle on all columns (https://github.com/rjzamora/dask/blob/da04f15e48ceadcf4893550de7b9c38126011a19/dask/dataframe/core.py#L905). Is that what you are asking?

Lets say if we have a data frame like below:

Partition 0

src dst
1 1
1 2
2 1
2 2

Partition 1

src dst
1 2
1 3
1 3
2 2

And we shuffle on [src, dst], will we end up with all the 1 (src) in a single output partition or can it be spread across partitions .

@rjzamora
Copy link
Member

rjzamora commented Nov 17, 2023

And we shuffle on [src, dst], will we end up with all the 1 in a single output partition or can it be spread across partitions

Okay, I think I understand your question/concern. When we use a hash-based shuffle in dask/dask-cudf, we will use both the src and dst columns to calculate the hash for each row. Therefore, a row with (1, 2) will not hash to the same value as a row with (1, 3).

In contrast, a sort-based shuffle in dask.dataframe will only perform the global part of the sort on the the first column. (Note that dask-cudf has a custom sort_values implementation that will allow (1, 2) and (1, 3) to live in distinct output partitions)

will we end up with all the 1 in a single output partition

Not unless hash(df[["src", "dst"]]) % npartitions produce the same integer for all rows were "src" are 1.

@VibhuJawa
Copy link
Member Author

And we shuffle on [src, dst], will we end up with all the 1 in a single output partition or can it be spread across partitions

Okay, I think I understand your question/concern. When we use a hash-based shuffle in dask/dask-cudf, we will use both the src and dst columns to calculate the hash for each row. Therefore, a row with (1, 2) will not hash to the same value as a row with (1, 3).

In contrast, a sort-based shuffle in dask.dataframe will only perform the global part of the sort on the the first column. (Note that dask-cudf has a custom sort_values implementation that will allow (1, 2) and (1, 3) to live in distinct output partitions)

Thanks, in that case i think shuffle should probably work .

@wence- wence- added no-oom Reducing memory footprint of cudf algorithms and removed Needs Triage Need team to review and classify labels Nov 22, 2023
@wence-
Copy link
Contributor

wence- commented Nov 28, 2023

Thanks, in that case i think shuffle should probably work .

Given this, is the initial request still required? If so, could you speak to what (if any) ordering requirements you have on the output, either global or per-partition. I imagine that you can't have any, but could be wrong.

@VibhuJawa
Copy link
Member Author

Given this, is the initial request still required? If so, could you speak to what (if any) ordering requirements you have on the output, either global or per-partition. I imagine that you can't have any, but could be wrong.

No we dont require it.

For awareness, we have also moved cugraph to follow the new shuffle based drop_duplicates (while the C++ changes come through).

See below:
https://github.com/rapidsai/cugraph/blob/5eaae7d6543de708a21545f8fc236cedfb05f018/python/cugraph/cugraph/structure/symmetrize.py#L297-L306

I think the only push we require now is to just reduce the overhead of the cudf drop_duplicates itself which is tracked below. #13157

@vyasr
Copy link
Contributor

vyasr commented Dec 12, 2023

I'm going to close this issue since it seems from the above discussion that the only open task is now captured in #13157.

@vyasr vyasr closed this as completed Dec 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request no-oom Reducing memory footprint of cudf algorithms
Projects
None yet
Development

No branches or pull requests

4 participants