Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into memcheck-norm-nans-test
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Dec 17, 2024
2 parents 0e0fa5b + e5753e3 commit 2e80d8f
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 84 deletions.
17 changes: 12 additions & 5 deletions python/cudf/cudf/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ def read_avro(
if not isinstance(skip_rows, int) or skip_rows < 0:
raise TypeError("skip_rows must be an int >= 0")

plc_result = plc.io.avro.read_avro(
plc.io.types.SourceInfo([filepath_or_buffer]),
columns,
skip_rows,
num_rows,
options = (
plc.io.avro.AvroReaderOptions.builder(
plc.io.types.SourceInfo([filepath_or_buffer])
)
.skip_rows(skip_rows)
.num_rows(num_rows)
.build()
)

if columns is not None and len(columns) > 0:
options.set_columns(columns)

plc_result = plc.io.avro.read_avro(options)

return cudf.DataFrame._from_data(*data_from_pylibcudf_io(plc_result))
36 changes: 25 additions & 11 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,21 +714,35 @@ def read_csv(
storage_options=None,
**kwargs,
):
import dask_expr as dx
from fsspec.utils import stringify_path
try:
# TODO: Remove when cudf is pinned to dask>2024.12.0
import dask_expr as dx
from dask_expr.io.csv import ReadCSV
from fsspec.utils import stringify_path

if not isinstance(path, str):
path = stringify_path(path)
return dx.new_collection(
ReadCSV(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
dataframe_backend="cudf",
)
)
except ImportError:
# Requires dask>2024.12.0
from dask_cudf.io.csv import read_csv

if not isinstance(path, str):
path = stringify_path(path)
return dx.new_collection(
dx.io.csv.ReadCSV(
return read_csv(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
*args,
header=header,
dataframe_backend="cudf",
storage_options=storage_options,
**kwargs,
)
)

@staticmethod
def read_json(*args, **kwargs):
Expand Down
195 changes: 190 additions & 5 deletions python/dask_cudf/dask_cudf/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,193 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from dask_cudf import _deprecated_api
import os
from glob import glob
from warnings import warn

read_csv = _deprecated_api(
"dask_cudf.io.csv.read_csv",
new_api="dask_cudf.read_csv",
)
from fsspec.utils import infer_compression

from dask import dataframe as dd
from dask.dataframe.io.csv import make_reader
from dask.utils import parse_bytes

import cudf


def read_csv(path, blocksize="default", **kwargs):
"""
Read CSV files into a :class:`.DataFrame`.
This API parallelizes the :func:`cudf:cudf.read_csv` function in
the following ways:
It supports loading many files at once using globstrings:
>>> import dask_cudf
>>> df = dask_cudf.read_csv("myfiles.*.csv")
In some cases it can break up large files:
>>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB")
It can read CSV files from external resources (e.g. S3, HTTP, FTP)
>>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv")
>>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv")
Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and
supports many of the same keyword arguments with the same
performance guarantees. See the docstring for
:func:`cudf:cudf.read_csv` for more information on available
keyword arguments.
Parameters
----------
path : str, path object, or file-like object
Either a path to a file (a str, :py:class:`pathlib.Path`, or
py._path.local.LocalPath), URL (including http, ftp, and S3
locations), or any object with a read() method (such as
builtin :py:func:`open` file handler function or
:py:class:`~io.StringIO`).
blocksize : int or str, default "256 MiB"
The target task partition size. If ``None``, a single block
is used for each file.
**kwargs : dict
Passthrough key-word arguments that are sent to
:func:`cudf:cudf.read_csv`.
Notes
-----
If any of `skipfooter`/`skiprows`/`nrows` are passed,
`blocksize` will default to None.
Examples
--------
>>> import dask_cudf
>>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"])
>>> ddf.compute()
a b
0 1 hi
1 2 hello
2 3 ai
"""
# Set default `blocksize`
if blocksize == "default":
if (
kwargs.get("skipfooter", 0) != 0
or kwargs.get("skiprows", 0) != 0
or kwargs.get("nrows", None) is not None
):
# Cannot read in blocks if skipfooter,
# skiprows or nrows is passed.
blocksize = None
else:
blocksize = "256 MiB"

if "://" in str(path):
func = make_reader(cudf.read_csv, "read_csv", "CSV")
return func(path, blocksize=blocksize, **kwargs)
else:
return _internal_read_csv(path=path, blocksize=blocksize, **kwargs)


def _internal_read_csv(path, blocksize="256 MiB", **kwargs):
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)

if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

if not filenames:
msg = f"A file in: {filenames} does not exist."
raise FileNotFoundError(msg)

compression = kwargs.get("compression", "infer")

if compression == "infer":
# Infer compression from first path by default
compression = infer_compression(filenames[0])

if compression and blocksize:
# compressed CSVs reading must read the entire file
kwargs.pop("byte_range", None)
warn(
"Warning %s compression does not support breaking apart files\n"
"Please ensure that each individual file can fit in memory and\n"
"use the keyword ``blocksize=None to remove this message``\n"
"Setting ``blocksize=(size of file)``" % compression
)
blocksize = None

if blocksize is None:
return read_csv_without_blocksize(path, **kwargs)

# Let dask.dataframe generate meta
dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV")
kwargs1 = kwargs.copy()
usecols = kwargs1.pop("usecols", None)
dtype = kwargs1.pop("dtype", None)
meta = dask_reader(filenames[0], **kwargs1)._meta
names = meta.columns
if usecols or dtype:
# Regenerate meta with original kwargs if
# `usecols` or `dtype` was specified
meta = dask_reader(filenames[0], **kwargs)._meta

i = 0
path_list = []
kwargs_list = []
for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, blocksize):
kwargs2 = kwargs.copy()
kwargs2["byte_range"] = (
start,
blocksize,
) # specify which chunk of the file we care about
if start != 0:
kwargs2["names"] = names # no header in the middle of the file
kwargs2["header"] = None
path_list.append(fn)
kwargs_list.append(kwargs2)
i += 1

return dd.from_map(_read_csv, path_list, kwargs_list, meta=meta)


def _read_csv(fn, kwargs):
return cudf.read_csv(fn, **kwargs)


def read_csv_without_blocksize(path, **kwargs):
"""Read entire CSV with optional compression (gzip/zip)
Parameters
----------
path : str
path to files (support for glob)
"""
if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

meta_kwargs = kwargs.copy()
if "skipfooter" in meta_kwargs:
meta_kwargs.pop("skipfooter")
if "nrows" in meta_kwargs:
meta_kwargs.pop("nrows")
# Read "head" of first file (first 5 rows).
# Convert to empty df for metadata.
meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0]
return dd.from_map(cudf.read_csv, filenames, meta=meta, **kwargs)
9 changes: 0 additions & 9 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ def test_read_csv_blocksize_none(tmp_path, compression, size):
df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ)
dd.assert_eq(df, df2)

# Test chunksize deprecation
with pytest.warns(FutureWarning, match="deprecated"):
df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
dd.assert_eq(df, df3)


@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None])
def test_csv_reader_usecols(tmp_path, dtype):
Expand Down Expand Up @@ -275,7 +270,3 @@ def test_deprecated_api_paths(tmp_path):
with pytest.warns(match="dask_cudf.io.read_csv is now deprecated"):
df2 = dask_cudf.io.read_csv(csv_path)
dd.assert_eq(df, df2, check_divisions=False)

with pytest.warns(match="dask_cudf.io.csv.read_csv is now deprecated"):
df2 = dask_cudf.io.csv.read_csv(csv_path)
dd.assert_eq(df, df2, check_divisions=False)
25 changes: 18 additions & 7 deletions python/pylibcudf/pylibcudf/io/avro.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.libcudf.io.avro cimport avro_reader_options
from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder
from pylibcudf.libcudf.types cimport size_type


cpdef TableWithMetadata read_avro(
SourceInfo source_info,
list columns = *,
size_type skip_rows = *,
size_type num_rows = *
)
from pylibcudf.libcudf.types cimport size_type

cdef class AvroReaderOptions:
cdef avro_reader_options c_obj
cdef SourceInfo source
cpdef void set_columns(self, list col_names)


cdef class AvroReaderOptionsBuilder:
cdef avro_reader_options_builder c_obj
cdef SourceInfo source
cpdef AvroReaderOptionsBuilder columns(self, list col_names)
cpdef AvroReaderOptionsBuilder skip_rows(self, size_type skip_rows)
cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows)
cpdef AvroReaderOptions build(self)

cpdef TableWithMetadata read_avro(AvroReaderOptions options)
21 changes: 13 additions & 8 deletions python/pylibcudf/pylibcudf/io/avro.pyi
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
from pylibcudf.io.types import SourceInfo, TableWithMetadata

__all__ = ["read_avro"]

def read_avro(
source_info: SourceInfo,
columns: list[str] | None = None,
skip_rows: int = 0,
num_rows: int = -1,
) -> TableWithMetadata: ...
__all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"]

class AvroReaderOptions:
@staticmethod
def builder(source: SourceInfo) -> AvroReaderOptionsBuilder: ...

class AvroReaderOptionsBuilder:
def columns(col_names: list[str]) -> AvroReaderOptionsBuilder: ...
def skip_rows(skip_rows: int) -> AvroReaderOptionsBuilder: ...
def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ...
def build(self) -> AvroReaderOptions: ...

def read_avro(options: AvroReaderOptions) -> TableWithMetadata: ...
Loading

0 comments on commit 2e80d8f

Please sign in to comment.