Skip to content

Commit

Permalink
Migrate dask-cudf README improvements to dask-cudf sphinx docs (#16765)
Browse files Browse the repository at this point in the history
Follow up to #16671

- Moves most of the information recently added to the dask-cudf README into the dask-cudf Sphinx documentation
- Adds a "Quick-start" example to the simplified dask-cudf README

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Benjamin Zaitlen (https://github.com/quasiben)
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #16765
  • Loading branch information
rjzamora authored Sep 16, 2024
1 parent 3dbc33a commit 124d3e3
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 176 deletions.
31 changes: 16 additions & 15 deletions docs/cudf/source/user_guide/10min.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
"id": "4c6c548b",
"metadata": {},
"source": [
"# 10 Minutes to cuDF and Dask-cuDF\n",
"# 10 Minutes to cuDF and Dask cuDF\n",
"\n",
"Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly towards new users.\n",
"Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask cuDF, geared mainly towards new users.\n",
"\n",
"## What are these Libraries?\n",
"\n",
Expand All @@ -18,13 +18,14 @@
"[Dask cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) extends Dask where necessary to allow its DataFrame partitions to be processed using cuDF GPU DataFrames instead of Pandas DataFrames. For instance, when you call `dask_cudf.read_csv(...)`, your cluster's GPUs do the work of parsing the CSV file(s) by calling [`cudf.read_csv()`](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.read_csv.html).\n",
"\n",
"\n",
"> [!NOTE] \n",
"> This notebook uses the explicit Dask cuDF API (`dask_cudf`) for clarity. However, we strongly recommend that you use Dask's [configuration infrastructure](https://docs.dask.org/en/latest/configuration.html) to set the `\"dataframe.backend\"` to `\"cudf\"`, and work with the `dask.dataframe` API directly. Please see the [Dask cuDF documentation](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) for more information.\n",
"<div class=\"alert alert-block alert-info\">\n",
"<b>Note:</b> This notebook uses the explicit Dask cuDF API (dask_cudf) for clarity. However, we strongly recommend that you use Dask's <a href=\"https://docs.dask.org/en/latest/configuration.html\">configuration infrastructure</a> to set the \"dataframe.backend\" option to \"cudf\", and work with the Dask DataFrame API directly. Please see the <a href=\"https://github.com/rapidsai/cudf/tree/main/python/dask_cudf\">Dask cuDF documentation</a> for more information.\n",
"</div>\n",
"\n",
"\n",
"## When to use cuDF and Dask-cuDF\n",
"## When to use cuDF and Dask cuDF\n",
"\n",
"If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF."
"If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask cuDF."
]
},
{
Expand Down Expand Up @@ -115,7 +116,7 @@
"source": [
"ds = dask_cudf.from_cudf(s, npartitions=2)\n",
"# Note the call to head here to show the first few entries, unlike\n",
"# cuDF objects, dask-cuDF objects do not have a printing\n",
"# cuDF objects, Dask-cuDF objects do not have a printing\n",
"# representation that shows values since they may not be in local\n",
"# memory.\n",
"ds.head(n=3)"
Expand Down Expand Up @@ -331,11 +332,11 @@
"id": "b17db919",
"metadata": {},
"source": [
"Now we will convert our cuDF dataframe into a dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here `.head()` to look at the first few values). In the general case (see the end of this notebook), the data in `ddf` will be distributed across multiple GPUs.\n",
"Now we will convert our cuDF dataframe into a Dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here `.head()` to look at the first few values). In the general case (see the end of this notebook), the data in `ddf` will be distributed across multiple GPUs.\n",
"\n",
"In this small case, we could call `ddf.compute()` to obtain a cuDF object from the dask-cuDF object. In general, we should avoid calling `.compute()` on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call `.head()` to inspect the first few values of a dask-cuDF dataframe, occasionally calling out places where we use `.compute()` and why.\n",
"In this small case, we could call `ddf.compute()` to obtain a cuDF object from the Dask-cuDF object. In general, we should avoid calling `.compute()` on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call `.head()` to inspect the first few values of a Dask-cuDF dataframe, occasionally calling out places where we use `.compute()` and why.\n",
"\n",
"*To understand more of the differences between how cuDF and dask-cuDF behave here, visit the [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html) tutorial after this one.*"
"*To understand more of the differences between how cuDF and Dask cuDF behave here, visit the [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html) tutorial after this one.*"
]
},
{
Expand Down Expand Up @@ -1680,7 +1681,7 @@
"id": "7aa0089f",
"metadata": {},
"source": [
"Note here we call `compute()` rather than `head()` on the dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back)."
"Note here we call `compute()` rather than `head()` on the Dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back)."
]
},
{
Expand Down Expand Up @@ -2393,7 +2394,7 @@
"id": "f6094cbe",
"metadata": {},
"source": [
"Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe."
"Applying functions to a `Series`. Note that applying user defined functions directly with Dask cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe."
]
},
{
Expand Down Expand Up @@ -3492,7 +3493,7 @@
"id": "5ac3b004",
"metadata": {},
"source": [
"Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF."
"Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask cuDF."
]
},
{
Expand Down Expand Up @@ -4181,7 +4182,7 @@
"id": "aa8a445b",
"metadata": {},
"source": [
"To convert the first few entries to pandas, we similarly call `.head()` on the dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert."
"To convert the first few entries to pandas, we similarly call `.head()` on the Dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert."
]
},
{
Expand Down Expand Up @@ -4899,7 +4900,7 @@
"id": "787eae14",
"metadata": {},
"source": [
"Note that for the dask-cuDF case, we use `dask_cudf.read_csv` in preference to `dask_cudf.from_cudf(cudf.read_csv)` since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU."
"Note that for the Dask-cuDF case, we use `dask_cudf.read_csv` in preference to `dask_cudf.from_cudf(cudf.read_csv)` since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU."
]
},
{
Expand Down
210 changes: 159 additions & 51 deletions docs/dask_cudf/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,42 @@
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to dask-cudf's documentation!
Welcome to Dask cuDF's documentation!
=====================================

**Dask-cuDF** (pronounced "DASK KOO-dee-eff") is an extension
**Dask cuDF** (pronounced "DASK KOO-dee-eff") is an extension
library for the `Dask <https://dask.org>`__ parallel computing
framework that provides a `cuDF
<https://docs.rapids.ai/api/cudf/stable/>`__-backed distributed
dataframe with the same API as `Dask dataframes
<https://docs.dask.org/en/stable/dataframe.html>`__.
framework. When installed, Dask cuDF is automatically registered
as the ``"cudf"`` dataframe backend for
`Dask DataFrame <https://docs.dask.org/en/stable/dataframe.html>`__.

.. note::
Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU
or multi-node execution on their own. You must also deploy a
`dask.distributed <https://distributed.dask.org/en/stable/>` cluster
to leverage multiple GPUs. We strongly recommend using `Dask-CUDA
<https://docs.rapids.ai/api/dask-cuda/stable/>`__ to simplify the
setup of the cluster, taking advantage of all features of the GPU
and networking hardware.

If you are familiar with Dask and `pandas <pandas.pydata.org>`__ or
`cuDF <https://docs.rapids.ai/api/cudf/stable/>`__, then Dask-cuDF
`cuDF <https://docs.rapids.ai/api/cudf/stable/>`__, then Dask cuDF
should feel familiar to you. If not, we recommend starting with `10
minutes to Dask
<https://docs.dask.org/en/stable/10-minutes-to-dask.html>`__ followed
by `10 minutes to cuDF and Dask-cuDF
by `10 minutes to cuDF and Dask cuDF
<https://docs.rapids.ai/api/cudf/stable/user_guide/10min.html>`__.

When running on multi-GPU systems, `Dask-CUDA
<https://docs.rapids.ai/api/dask-cuda/stable/>`__ is recommended to
simplify the setup of the cluster, taking advantage of all features of
the GPU and networking hardware.

Using Dask-cuDF
Using Dask cuDF
---------------

When installed, Dask-cuDF registers itself as a dataframe backend for
Dask. This means that in many cases, using cuDF-backed dataframes requires
only small changes to an existing workflow. The minimal change is to
select cuDF as the dataframe backend in :doc:`Dask's
configuration <dask:configuration>`. To do so, we must set the option
``dataframe.backend`` to ``cudf``. From Python, this can be achieved
like so::
The Dask DataFrame API (Recommended)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Simply use the `Dask configuration <dask:configuration>` system to
set the ``"dataframe.backend"`` option to ``"cudf"``. From Python,
this can be achieved like so::

import dask

Expand All @@ -44,60 +47,165 @@ like so::
Alternatively, you can set ``DASK_DATAFRAME__BACKEND=cudf`` in the
environment before running your code.

Dataframe creation from on-disk formats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If your workflow creates Dask dataframes from on-disk formats
(for example using :func:`dask.dataframe.read_parquet`), then setting
the backend may well be enough to migrate your workflow.

For example, consider reading a dataframe from parquet::
Once this is done, the public Dask DataFrame API will leverage
``cudf`` automatically when a new DataFrame collection is created
from an on-disk format using any of the following ``dask.dataframe``
functions::

import dask.dataframe as dd
* :func:`dask.dataframe.read_parquet`
* :func:`dask.dataframe.read_json`
* :func:`dask.dataframe.read_csv`
* :func:`dask.dataframe.read_orc`
* :func:`dask.dataframe.read_hdf`
* :func:`dask.dataframe.from_dict`

# By default, we obtain a pandas-backed dataframe
df = dd.read_parquet("data.parquet", ...)
For example::

import dask.dataframe as dd

To obtain a cuDF-backed dataframe, we must set the
``dataframe.backend`` configuration option::
# By default, we obtain a pandas-backed dataframe
df = dd.read_parquet("data.parquet", ...)

import dask
import dask.dataframe as dd

dask.config.set({"dataframe.backend": "cudf"})
# This gives us a cuDF-backed dataframe
# This now gives us a cuDF-backed dataframe
df = dd.read_parquet("data.parquet", ...)

This code will use cuDF's GPU-accelerated :func:`parquet reader
<cudf.read_parquet>` to read partitions of the data.
When other functions are used to create a new collection
(e.g. :func:`from_map`, :func:`from_pandas`, :func:`from_delayed`,
and :func:`from_array`), the backend of the new collection will
depend on the inputs to those functions. For example::

import pandas as pd
import cudf

# This gives us a pandas-backed dataframe
dd.from_pandas(pd.DataFrame({"a": range(10)}))

# This gives us a cuDF-backed dataframe
dd.from_pandas(cudf.DataFrame({"a": range(10)}))

An existing collection can always be moved to a specific backend
using the :func:`dask.dataframe.DataFrame.to_backend` API::

# This ensures that we have a cuDF-backed dataframe
df = df.to_backend("cudf")

# This ensures that we have a pandas-backed dataframe
df = df.to_backend("pandas")

The explicit Dask cuDF API
~~~~~~~~~~~~~~~~~~~~~~~~~~

In addition to providing the ``"cudf"`` backend for Dask DataFrame,
Dask cuDF also provides an explicit ``dask_cudf`` API::

import dask_cudf

# This always gives us a cuDF-backed dataframe
df = dask_cudf.read_parquet("data.parquet", ...)

This API is used implicitly by the Dask DataFrame API when the ``"cudf"``
backend is enabled. Therefore, using it directly will not provide any
performance benefit over the CPU/GPU-portable ``dask.dataframe`` API.
Also, using some parts of the explicit API are incompatible with
automatic query planning (see the next section).

The explicit Dask cuDF API
~~~~~~~~~~~~~~~~~~~~~~~~~~

Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+).
As long as the ``"dataframe.query-planning"`` configuration is set to
``True`` (the default) when ``dask.dataframe`` is first imported, `Dask
Expressions <https://github.com/dask/dask-expr>`__ will be used under the hood.

For example, the following code will automatically benefit from predicate
pushdown when the result is computed::

df = dd.read_parquet("/my/parquet/dataset/")
result = df.sort_values('B')['A']

Unoptimized expression graph (``df.pprint()``)::

Projection: columns='A'
SortValues: by=['B'] shuffle_method='tasks' options={}
ReadParquetFSSpec: path='/my/parquet/dataset/' ...

Simplified expression graph (``df.simplify().pprint()``)::

Projection: columns='A'
SortValues: by=['B'] shuffle_method='tasks' options={}
ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ...

.. note::
Dask will automatically simplify the expression graph (within
:func:`optimize`) when the result is converted to a task graph
(via :func:`compute` or :func:`persist`). You do not need to call
:func:`simplify` yourself.


Using Multiple GPUs and Multiple Nodes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try
to partition your data into small-enough tasks to fit comfortably in the
memory of a single GPU. This means the necessary compute tasks needed to
compute a query can often be streamed to a single GPU process for
out-of-core computing. This also means that the compute tasks can be
executed in parallel over a multi-GPU cluster.

In order to execute your Dask workflow on multiple GPUs, you will
typically need to use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/>`__
to deploy distributed Dask cluster, and
`Distributed <https://distributed.dask.org/en/stable/client.html>`__
to define a client object. For example::

from dask_cuda import LocalCUDACluster
from distributed import Client

if __name__ == "__main__":

client = Client(
LocalCUDACluster(
CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1)
rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations
enable_cudf_spill=True, # Improve device memory stability
local_directory="/fast/scratch/", # Use fast local storage for spilling
)
)

df = dd.read_parquet("/my/parquet/dataset/")
agg = df.groupby('B').sum()
agg.compute() # This will use the cluster defined above

.. note::
This example uses :func:`compute` to materialize a concrete
``cudf.DataFrame`` object in local memory. Never call :func:`compute`
on a large collection that cannot fit comfortably in the memory of a
single GPU! See Dask's `documentation on managing computation
<https://distributed.dask.org/en/stable/manage-computation.html>`__
for more details.

Dataframe creation from in-memory formats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Please see the `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/>`__
documentation for more information about deploying GPU-aware clusters
(including `best practices
<https://docs.rapids.ai/api/dask-cuda/stable/examples/best-practices/>`__).

If you already have a dataframe in memory and want to convert it to a
cuDF-backend one, there are two options depending on whether the
dataframe is already a Dask one or not. If you have a Dask dataframe,
then you can call :func:`dask.dataframe.to_backend` passing ``"cudf"``
as the backend; if you have a pandas dataframe then you can either
call :func:`dask.dataframe.from_pandas` followed by
:func:`~dask.dataframe.to_backend` or first convert the dataframe with
:func:`cudf.from_pandas` and then parallelise this with
:func:`dask_cudf.from_cudf`.

API Reference
-------------

Generally speaking, Dask-cuDF tries to offer exactly the same API as
Dask itself. There are, however, some minor differences mostly because
Generally speaking, Dask cuDF tries to offer exactly the same API as
Dask DataFrame. There are, however, some minor differences mostly because
cuDF does not :doc:`perfectly mirror <cudf:user_guide/PandasCompat>`
the pandas API, or because cuDF provides additional configuration
flags (these mostly occur in data reading and writing interfaces).

As a result, straightforward workflows can be migrated without too
much trouble, but more complex ones that utilise more features may
need a bit of tweaking. The API documentation describes details of the
differences and all functionality that Dask-cuDF supports.
differences and all functionality that Dask cuDF supports.

.. toctree::
:maxdepth: 2
Expand Down
Loading

0 comments on commit 124d3e3

Please sign in to comment.