Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars: DataFrame Serialization #17062

Merged
merged 42 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7ad6492
Impl. Dataframe serialization
madsbk Oct 11, 2024
1ea98ab
clean up
madsbk Oct 12, 2024
ab04157
Merge branch 'branch-24.12' of github.com:rapidsai/cudf into polars-s…
madsbk Oct 22, 2024
cd4907e
experimental
madsbk Oct 22, 2024
64bf977
clean up
madsbk Oct 22, 2024
4c0f11a
uncomment the empty table test
madsbk Oct 22, 2024
78f3318
Merge branch 'branch-24.12' of github.com:rapidsai/cudf into polars-s…
madsbk Oct 22, 2024
02e9ef6
CI: test_cudf_polars
madsbk Oct 23, 2024
20ddba5
Merge branch 'branch-24.12' of github.com:rapidsai/cudf into polars-s…
madsbk Oct 23, 2024
14691a6
test/experimental
madsbk Oct 23, 2024
06262f1
Update python/cudf_polars/cudf_polars/containers/dataframe.py
madsbk Oct 23, 2024
0a60d6f
Merge branch 'polars-serialize' of github.com:madsbk/cudf into polars…
madsbk Oct 23, 2024
cde93f0
Move to a tuple instead of list of frames
madsbk Oct 23, 2024
5642204
fix two-tuple frames
madsbk Oct 23, 2024
62e15e2
avoid using cupy
madsbk Oct 23, 2024
bf82c8b
typo
madsbk Oct 23, 2024
eab3eb9
Apply suggestions from code review
madsbk Oct 25, 2024
9aa4e54
clean up
madsbk Oct 25, 2024
616a2ee
Merge branch 'branch-24.12' of github.com:rapidsai/cudf into polars-s…
madsbk Oct 25, 2024
e864dff
more typing
madsbk Oct 25, 2024
8f7b0a7
Merge remote-tracking branch 'upstream/branch-24.12' into HEAD
wence- Nov 4, 2024
43b05cd
Perhaps split dependencies
wence- Nov 4, 2024
b5672c3
Explicitly depend on numpy as well for cudf-polars tests
wence- Nov 4, 2024
0d553c3
Fix isort issues
wence- Nov 4, 2024
be48e7e
Merge branch 'branch-24.12' into polars-serialize
madsbk Nov 5, 2024
2dfb419
Filter out a warning
wence- Nov 5, 2024
6606d89
Merge remote-tracking branch 'upstream/branch-24.12' into HEAD
wence- Nov 5, 2024
5ed1301
Shouldn't be necessary
wence- Nov 5, 2024
852bd76
Perhaps this?
wence- Nov 5, 2024
7d0671a
Require explicit call to register serialization routines
wence- Nov 5, 2024
269bd5d
Merge branch 'branch-24.12' into polars-serialize
wence- Nov 6, 2024
62d92aa
Merge branch 'branch-24.12' into polars-serialize
madsbk Nov 7, 2024
1db4ab4
Merge branch 'branch-24.12' into polars-serialize
wence- Nov 7, 2024
83f30c2
Apply suggestions from code review
madsbk Nov 8, 2024
943d753
Merge branch 'branch-24.12' into polars-serialize
madsbk Nov 8, 2024
00565b5
Update python/cudf_polars/cudf_polars/experimental/__init__.py
madsbk Nov 12, 2024
75e1691
Merge branch 'branch-24.12' into polars-serialize
madsbk Nov 12, 2024
26c0a52
Put rapids-dask-dependency in experimental optional extra
wence- Nov 13, 2024
993bda8
Merge remote-tracking branch 'upstream/branch-24.12' into HEAD
wence- Nov 13, 2024
1cdec15
Merge branch 'branch-24.12' into polars-serialize
rjzamora Nov 13, 2024
a37d6ae
Merge branch 'branch-24.12' into polars-serialize
vyasr Nov 14, 2024
732a1dd
Merge branch 'branch-24.12' into polars-serialize
pentschev Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/test_wheel_cudf_polars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ rapids-logger "Installing cudf_polars and its dependencies"
# generate constraints (possibly pinning to oldest support versions of dependencies)
rapids-generate-pip-constraints py_test_cudf_polars ./constraints.txt

# echo to expand wildcard before adding `[test]` requires for pip
# echo to expand wildcard before adding `[test,experimental]` requires for pip
python -m pip install \
-v \
--constraint ./constraints.txt \
"$(echo ./dist/cudf_polars_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \
"$(echo ./dist/cudf_polars_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test,experimental]" \
"$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \
"$(echo ./dist/pylibcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)"

Expand Down
30 changes: 30 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ files:
- test_cpp
- test_python_common
- test_python_cudf
- test_python_cudf_common
- test_python_dask_cudf
- test_python_pylibcudf
- test_python_cudf_pandas
- test_python_cudf_polars
test_static_build:
output: none
includes:
Expand All @@ -59,6 +61,7 @@ files:
- cuda_version
- py_version
- test_python_common
- test_python_cudf_common
- test_python_cudf
- test_python_cudf_pandas
test_python_cudf:
Expand All @@ -67,13 +70,15 @@ files:
- cuda_version
- py_version
- test_python_common
- test_python_cudf_common
- test_python_cudf
test_python_other:
output: none
includes:
- cuda_version
- py_version
- test_python_common
- test_python_cudf_common
- test_python_dask_cudf
test_java:
output: none
Expand Down Expand Up @@ -152,6 +157,7 @@ files:
key: test
includes:
- test_python_common
- test_python_cudf_common
- test_python_cudf
py_build_libcudf:
output: pyproject
Expand Down Expand Up @@ -216,6 +222,7 @@ files:
key: test
includes:
- test_python_common
- test_python_cudf_common
- test_python_pylibcudf
py_test_pandas_cudf:
output: pyproject
Expand Down Expand Up @@ -248,6 +255,14 @@ files:
includes:
- run_cudf_polars
- depends_on_pylibcudf
py_run_cudf_polars_experimental:
output: pyproject
pyproject_dir: python/cudf_polars
extras:
table: project.optional-dependencies
key: experimental
includes:
- run_cudf_polars_experimental
py_test_cudf_polars:
output: pyproject
pyproject_dir: python/cudf_polars
Expand All @@ -256,6 +271,7 @@ files:
key: test
includes:
- test_python_common
- test_python_cudf_polars
py_build_dask_cudf:
output: pyproject
pyproject_dir: python/dask_cudf
Expand All @@ -281,6 +297,7 @@ files:
key: test
includes:
- test_python_common
- test_python_cudf_common
- test_python_dask_cudf
py_build_cudf_kafka:
output: pyproject
Expand Down Expand Up @@ -313,6 +330,7 @@ files:
key: test
includes:
- test_python_common
- test_python_cudf_common
py_build_custreamz:
output: pyproject
pyproject_dir: python/custreamz
Expand All @@ -337,6 +355,7 @@ files:
key: test
includes:
- test_python_common
- test_python_cudf_common
channels:
- rapidsai
- rapidsai-nightly
Expand Down Expand Up @@ -730,6 +749,11 @@ dependencies:
- output_types: [conda, requirements, pyproject]
packages:
- polars>=1.11,<1.14
run_cudf_polars_experimental:
common:
- output_types: [conda, requirements, pyproject]
packages:
- rapids-dask-dependency==24.12.*,>=0.0.0a0
run_dask_cudf:
common:
- output_types: [conda, requirements, pyproject]
Expand Down Expand Up @@ -779,6 +803,7 @@ dependencies:
- pytest<8
- pytest-cov
- pytest-xdist
test_python_cudf_common:
specific:
# Define additional constraints for testing with oldest dependencies.
- output_types: [conda, requirements]
Expand Down Expand Up @@ -884,6 +909,11 @@ dependencies:
- pyarrow==14.0.1
- matrix:
packages:
test_python_cudf_polars:
common:
- output_types: [conda, requirements, pyproject]
packages:
- *numpy
depends_on_libcudf:
common:
- output_types: conda
Expand Down
72 changes: 71 additions & 1 deletion python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from __future__ import annotations

import pickle
from functools import cached_property
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast

import pyarrow as pa

Expand Down Expand Up @@ -147,6 +148,75 @@ def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self:
Column(c, name=name) for c, name in zip(table.columns(), names, strict=True)
)

@classmethod
def deserialize(
cls, header: Mapping[str, Any], frames: tuple[memoryview, plc.gpumemoryview]
) -> Self:
"""
Create a DataFrame from a serialized representation returned by `.serialize()`.

Parameters
----------
header
The (unpickled) metadata required to reconstruct the object.
frames
Two-tuple of frames (a memoryview and a gpumemoryview).

Returns
-------
DataFrame
The deserialized DataFrame.
"""
packed_metadata, packed_gpu_data = frames
madsbk marked this conversation as resolved.
Show resolved Hide resolved
table = plc.contiguous_split.unpack_from_memoryviews(
packed_metadata, packed_gpu_data
)
return cls(
Column(c, **kw)
for c, kw in zip(table.columns(), header["columns_kwargs"], strict=True)
)

def serialize(
self,
) -> tuple[Mapping[str, Any], tuple[memoryview, plc.gpumemoryview]]:
"""
Serialize the table into header and frames.

Follows the Dask serialization scheme with a picklable header (dict) and
a tuple of frames (in this case a contiguous host and device buffer).

To enable dask support, dask serializers must be registered

>>> from cudf_polars.experimental.dask_serialize import register
>>> register()

Returns
-------
header
A dict containing any picklable metadata required to reconstruct the object.
frames
Two-tuple of frames suitable for passing to `unpack_from_memoryviews`
"""
packed = plc.contiguous_split.pack(self.table)

# Keyword arguments for `Column.__init__`.
columns_kwargs = [
{
"is_sorted": col.is_sorted,
"order": col.order,
"null_order": col.null_order,
"name": col.name,
madsbk marked this conversation as resolved.
Show resolved Hide resolved
}
for col in self.columns
]
header = {
"columns_kwargs": columns_kwargs,
# Dask Distributed uses "type-serialized" to dispatch deserialization
"type-serialized": pickle.dumps(type(self)),
"frame_count": 2,
}
return header, packed.release()

def sorted_like(
self, like: DataFrame, /, *, subset: Set[str] | None = None
) -> Self:
Expand Down
8 changes: 8 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/__init__.py
madsbk marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Experimental features, which can change without any deprecation period."""

from __future__ import annotations

__all__: list[str] = []
59 changes: 59 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/dask_serialize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Dask serialization."""

from __future__ import annotations

from distributed.protocol import dask_deserialize, dask_serialize
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.utils import log_errors

import pylibcudf as plc
import rmm

from cudf_polars.containers import DataFrame

__all__ = ["register"]


def register() -> None:
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
"""Register dask serialization routines for DataFrames."""

@cuda_serialize.register(DataFrame)
def _(x: DataFrame):
with log_errors():
header, frames = x.serialize()
return header, list(frames) # Dask expect a list of frames

@cuda_deserialize.register(DataFrame)
def _(header, frames):
with log_errors():
assert len(frames) == 2
return DataFrame.deserialize(header, tuple(frames))

@dask_serialize.register(DataFrame)
def _(x: DataFrame):
with log_errors():
header, (metadata, gpudata) = x.serialize()

# For robustness, we check that the gpu data is contiguous
cai = gpudata.__cuda_array_interface__
assert len(cai["shape"]) == 1
assert cai["strides"] is None or cai["strides"] == (1,)
assert cai["typestr"] == "|u1"
nbytes = cai["shape"][0]

# Copy the gpudata to host memory
gpudata_on_host = memoryview(
rmm.DeviceBuffer(ptr=gpudata.ptr, size=nbytes).copy_to_host()
)
return header, (metadata, gpudata_on_host)

@dask_deserialize.register(DataFrame)
def _(header, frames) -> DataFrame:
with log_errors():
assert len(frames) == 2
# Copy the second frame (the gpudata in host memory) back to the gpu
frames = frames[0], plc.gpumemoryview(rmm.DeviceBuffer.to_device(frames[1]))
return DataFrame.deserialize(header, frames)
4 changes: 4 additions & 0 deletions python/cudf_polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ classifiers = [

[project.optional-dependencies]
test = [
"numpy>=1.23,<3.0a0",
"pytest-cov",
"pytest-xdist",
"pytest<8",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.
experimental = [
"rapids-dask-dependency==24.12.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.

[project.urls]
Homepage = "https://github.com/rapidsai/cudf"
Expand Down
23 changes: 23 additions & 0 deletions python/cudf_polars/tests/containers/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

from __future__ import annotations

import pyarrow as pa
import pytest

import polars as pl
from polars.testing.asserts import assert_frame_equal

import pylibcudf as plc

Expand Down Expand Up @@ -161,3 +163,24 @@ def test_empty_name_roundtrips_overlap():
def test_empty_name_roundtrips_no_overlap():
df = pl.LazyFrame({"": [1, 2, 3], "b": [4, 5, 6]})
assert_gpu_result_equal(df)


@pytest.mark.parametrize(
"arrow_tbl",
[
pa.table([]),
pa.table({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}),
pa.table({"a": [1, 2, 3]}),
pa.table({"a": [1], "b": [2], "c": [3]}),
pa.table({"a": ["a", "bb", "ccc"]}),
pa.table({"a": [1, 2, None], "b": [None, 3, 4]}),
],
)
def test_serialization_roundtrip(arrow_tbl):
plc_tbl = plc.interop.from_arrow(arrow_tbl)
df = DataFrame.from_table(plc_tbl, names=arrow_tbl.column_names)

header, frames = df.serialize()
res = DataFrame.deserialize(header, frames)

assert_frame_equal(df.to_polars(), res.to_polars())
8 changes: 8 additions & 0 deletions python/cudf_polars/tests/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Testing experimental features."""

from __future__ import annotations

__all__: list[str] = []
Loading
Loading