Skip to content

Commit

Permalink
more revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Sep 18, 2024
1 parent 1e028ea commit 3332717
Showing 1 changed file with 33 additions and 19 deletions.
52 changes: 33 additions & 19 deletions docs/dask_cudf/source/best_practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ JIT unspilling is likely to produce better protection from out-of-memory
<https://docs.rapids.ai/api/dask-cuda/24.10/spilling/>`__ for further details
and guidance.

Use RMM
~~~~~~~

Memory allocations in cuDF are significantly faster and more efficient when
the `RAPIDS Memory Manager (RMM) <https://docs.rapids.ai/api/rmm/stable/>`__
library is used on worker processes. In most cases, the best way to manage
memory is by initializing an RMM pool on each worker before executing a
workflow. When using :func:`LocalCUDACluster`, this is easily accomplished
by setting ``rmm_pool_size`` to a large fraction (e.g. ``0.9``).

See the `Dask-CUDA memory-management documentation
<https://docs.rapids.ai/api/dask-cuda/nightly/examples/best-practices/#gpu-memory-management>`__
for more details.

Use the Dask DataFrame API
~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -86,23 +100,22 @@ for each task. However, if the partitions are too large, the risk of OOM
errors can become significant.

.. note::
As a general rule of thumb, aim for 1/16 within shuffle-intensive workflows
As a general rule of thumb, aim for 1/16 in shuffle-intensive workflows
(e.g. large-scale sorting and joining), and 1/8 otherwise. For pathologically
skewed data distributions, it may be necessary to target 1/32 or smaller.

The best way to tune the partition size is to begin with appropriate sized
partitions when the DataFrame collection is first created by a function
like :func:`read_parquet`, :func:`read_csv`, or :func:`from_map`. For
example, both :func:`read_parquet` and :func:`read_csv` expose a
``blocksize`` argument for adjusting the maximum partition size.
The easiest way to tune the partition size is when the DataFrame collection
is first created by a function like :func:`read_parquet`, :func:`read_csv`,
or :func:`from_map`. For example, both :func:`read_parquet` and :func:`read_csv`
expose a ``blocksize`` argument for adjusting the maximum partition size.

If the partition size cannot be tuned effectively at creation time, the
`repartition <https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html>`__
method can be used as a last resort.


Use Parquet files
~~~~~~~~~~~~~~~~~
Use Parquet
~~~~~~~~~~~

`Parquet <https://parquet.apache.org/docs/file-format/>`__ is the recommended
file format for Dask cuDF. It provides efficient columnar storage and enables
Expand All @@ -113,16 +126,17 @@ The most important arguments to :func:`read_parquet` are ``blocksize`` and
``aggregate_files``::

``blocksize``: Use this argument to specify the maximum partition size.
The default is `"256 MiB"`, but larger values are usually more performant.
Dask will use the ``blocksize`` value to map a discrete number of Parquet
row-groups (or files) to each output partition. This mapping will only
account for the uncompressed storage size of each row group, which is
usually smaller than the correspondng ``cudf.DataFrame``.
The default is `"256 MiB"`, but larger values are usually more performant
on GPUs with more than 8 GiB of memory. Dask will use the ``blocksize``
value to map a discrete number of Parquet row-groups (or files) to each
output partition. This mapping will only account for the uncompressed
storage size of each row group, which is usually smaller than the
correspondng ``cudf.DataFrame``.

``aggregate_files``: Use this argument to specify whether Dask is allowed
to map multiple files to the same DataFrame partition. The default is ``False``,
but ``aggregate_files=True`` is usually more performant when the dataset
contains many files that are smaller than half of ``blocksize``.
to map multiple files to the same DataFrame partition. The default is
``False``, but ``aggregate_files=True`` is usually more performant when
the dataset contains many files that are smaller than half of ``blocksize``.

.. note::
Metadata collection can be extremely slow when reading from remote
Expand Down Expand Up @@ -160,11 +174,11 @@ bottleneck is typically device-to-host memory spilling.
Although every workflow is different, the following guidelines
are often recommended::

* Use a distributed cluster with Dask-CUDA workers
* Use native cuDF spilling whenever possible
* `Use a distributed cluster with Dask-CUDA workers <Use Dask-CUDA>`_
* `Use native cuDF spilling whenever possible <Enable cuDF Spilling>`_
* Avoid shuffling whenever possible
* Use ``split_out=1`` for low-cardinality groupby aggregations
* Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``>=5``)
* Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``<=5``)
* `Use UCX <https://docs.rapids.ai/api/dask-cuda/nightly/examples/ucx/>`__ if communication is a bottleneck


Expand Down

0 comments on commit 3332717

Please sign in to comment.