From 206ff897a836660689d1e5624098a01383015923 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 25 Nov 2024 11:04:01 -0800 Subject: [PATCH 1/7] update dask-cudf for dask>2024.11.2 --- .../dask_cudf/dask_cudf/_expr/collection.py | 6 +-- python/dask_cudf/dask_cudf/_legacy/io/orc.py | 31 ++++++-------- python/dask_cudf/dask_cudf/_legacy/io/text.py | 40 ++++++++++--------- python/dask_cudf/dask_cudf/backends.py | 5 +-- python/dask_cudf/dask_cudf/io/parquet.py | 9 ++++- 5 files changed, 44 insertions(+), 47 deletions(-) diff --git a/python/dask_cudf/dask_cudf/_expr/collection.py b/python/dask_cudf/dask_cudf/_expr/collection.py index fdf7d8630e9..89c0d108743 100644 --- a/python/dask_cudf/dask_cudf/_expr/collection.py +++ b/python/dask_cudf/dask_cudf/_expr/collection.py @@ -156,16 +156,12 @@ def to_orc(self, *args, **kwargs): from dask_cudf._legacy.io import to_orc return to_orc(self, *args, **kwargs) - # return self.to_legacy_dataframe().to_orc(*args, **kwargs) @staticmethod def read_text(*args, **kwargs): - from dask_expr import from_legacy_dataframe - from dask_cudf._legacy.io.text import read_text as legacy_read_text - ddf = legacy_read_text(*args, **kwargs) - return from_legacy_dataframe(ddf) + return legacy_read_text(*args, **kwargs) class Series(DXSeries, CudfFrameBase): diff --git a/python/dask_cudf/dask_cudf/_legacy/io/orc.py b/python/dask_cudf/dask_cudf/_legacy/io/orc.py index bed69f038b0..9326f41f5a7 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/orc.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/orc.py @@ -7,14 +7,14 @@ from pyarrow import orc as orc from dask import dataframe as dd -from dask.base import tokenize from dask.dataframe.io.utils import _get_pyarrow_dtypes import cudf -def _read_orc_stripe(fs, path, stripe, columns, kwargs=None): +def _read_orc_stripe(input, columns=None, kwargs=None): """Pull out specific columns from specific stripe""" + fs, path, stripe = input if kwargs is None: kwargs = {} with fs.open(path, "rb") as f: @@ -67,7 +67,7 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs): """ storage_options = storage_options or {} - fs, fs_token, paths = get_fs_token_paths( + fs, _, paths = get_fs_token_paths( path, mode="rb", storage_options=storage_options ) schema = None @@ -100,27 +100,22 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs): **kwargs, ) - name = "read-orc-" + tokenize(fs_token, path, columns, filters, **kwargs) - dsk = {} - N = 0 + inputs = [] for path, n in zip(paths, nstripes_per_file): for stripe in ( range(n) if filters is None else cudf.io.orc._filter_stripes(filters, path) ): - dsk[(name, N)] = ( - _read_orc_stripe, - fs, - path, - stripe, - columns, - kwargs, - ) - N += 1 - - divisions = [None] * (len(dsk) + 1) - return dd.core.new_dd_object(dsk, name, meta, divisions) + inputs.append((fs, path, stripe)) + + return dd.from_map( + _read_orc_stripe, + inputs, + columns=columns, + kwargs=kwargs, + meta=meta, + ) def write_orc_partition(df, path, fs, filename, compression="snappy"): diff --git a/python/dask_cudf/dask_cudf/_legacy/io/text.py b/python/dask_cudf/dask_cudf/_legacy/io/text.py index 9cdb7c5220b..d0154411df2 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/text.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/text.py @@ -4,13 +4,18 @@ from glob import glob import dask.dataframe as dd -from dask.base import tokenize -from dask.utils import apply, parse_bytes +from dask.utils import parse_bytes import cudf -def read_text(path, chunksize="256 MiB", **kwargs): +def _read_text(input, **kwargs): + # Wrapper for cudf.read_text operation + fn, byte_range = input + return cudf.read_text(fn, byte_range=byte_range, **kwargs) + + +def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs): if isinstance(chunksize, str): chunksize = parse_bytes(chunksize) @@ -27,28 +32,25 @@ def read_text(path, chunksize="256 MiB", **kwargs): msg = f"A file in: {filenames} does not exist." raise FileNotFoundError(msg) - name = "read-text-" + tokenize(path, tokenize, **kwargs) + if chunksize and byte_range: + raise ValueError("Cannot specify both chunksize and byte_range.") if chunksize: - dsk = {} - i = 0 + inputs = [] for fn in filenames: size = os.path.getsize(fn) for start in range(0, size, chunksize): - kwargs1 = kwargs.copy() - kwargs1["byte_range"] = ( + byte_range = ( start, chunksize, ) # specify which chunk of the file we care about - - dsk[(name, i)] = (apply, cudf.read_text, [fn], kwargs1) - i += 1 + inputs.append((fn, byte_range)) else: - dsk = { - (name, i): (apply, cudf.read_text, [fn], kwargs) - for i, fn in enumerate(filenames) - } - - meta = cudf.Series([], dtype="O") - divisions = [None] * (len(dsk) + 1) - return dd.core.new_dd_object(dsk, name, meta, divisions) + inputs = [(fn, byte_range) for fn in filenames] + + return dd.from_map( + _read_text, + inputs, + meta=cudf.Series([], dtype="O"), + **kwargs, + ) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 9c5d5523019..962a229a839 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -738,9 +738,6 @@ def read_json(*args, **kwargs): @staticmethod def read_orc(*args, **kwargs): - from dask_expr import from_legacy_dataframe - from dask_cudf._legacy.io.orc import read_orc as legacy_read_orc - ddf = legacy_read_orc(*args, **kwargs) - return from_legacy_dataframe(ddf) + return legacy_read_orc(*args, **kwargs) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index bbedd046760..9763fb0dfb6 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -26,6 +26,12 @@ from dask.tokenize import tokenize from dask.utils import parse_bytes +try: + # TODO: Remove when dask>2024.11.2 + from dask._task_spec import List as TaskList +except ImportError: + TaskList = list + import cudf from dask_cudf import QUERY_PLANNING_ON, _deprecated_api @@ -447,7 +453,8 @@ def _task(self, name, index: int): return Task( name, cudf.concat, - [expr._filtered_task(name, i) for i in bucket], + # [expr._filtered_task(name, i) for i in bucket], + TaskList(*(expr._filtered_task(name, i) for i in bucket)), ) pieces = [] From 7c01e560d073a8692fa7184c67ffcc28a5d51936 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 25 Nov 2024 11:38:01 -0800 Subject: [PATCH 2/7] fix Series.dtypes --- python/cudf/cudf/core/series.py | 9 +++++++++ python/cudf/cudf/tests/test_series.py | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 9b60424c924..38a789b00f6 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -1598,6 +1598,15 @@ def dtype(self): """The dtype of the Series.""" return self._column.dtype + @property # type: ignore + @_performance_tracking + def dtypes(self): + """The dtype of the Series. + + This is an alias for `Series.dtype`. + """ + return self.dtype + @classmethod @_performance_tracking def _concat(cls, objs, axis=0, index: bool = True): diff --git a/python/cudf/cudf/tests/test_series.py b/python/cudf/cudf/tests/test_series.py index 7f0a4902ed1..adc5f1713e4 100644 --- a/python/cudf/cudf/tests/test_series.py +++ b/python/cudf/cudf/tests/test_series.py @@ -2934,3 +2934,9 @@ def test_empty_astype_always_castable(type1, type2, as_dtype, copy): assert ser._column is result._column else: assert ser._column is not result._column + + +def test_dtype_dtypes_equal(): + ser = cudf.Series([0]) + assert ser.dtype == ser.dtypes + assert ser.dtypes == ser.to_pandas().dtypes From d5ed146efef3979bb78748574b97b899fff6ed01 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Mon, 25 Nov 2024 13:48:41 -0600 Subject: [PATCH 3/7] Update python/dask_cudf/dask_cudf/io/parquet.py --- python/dask_cudf/dask_cudf/io/parquet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 9763fb0dfb6..2644d7c2f43 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -453,7 +453,6 @@ def _task(self, name, index: int): return Task( name, cudf.concat, - # [expr._filtered_task(name, i) for i in bucket], TaskList(*(expr._filtered_task(name, i) for i in bucket)), ) From 59671f209af5bbe473df08b190e9165a4602af13 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 26 Nov 2024 07:46:45 -0800 Subject: [PATCH 4/7] fix TaskList workaround --- python/dask_cudf/dask_cudf/io/parquet.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 2644d7c2f43..9168e179cd0 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -30,7 +30,10 @@ # TODO: Remove when dask>2024.11.2 from dask._task_spec import List as TaskList except ImportError: - TaskList = list + + def TaskList(*x): + return list(x) + import cudf From 5bbae1809857f899c4d789487e29538e822de508 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 26 Nov 2024 07:47:26 -0800 Subject: [PATCH 5/7] tweak comment --- python/dask_cudf/dask_cudf/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 9168e179cd0..ce9935c8b3c 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -27,7 +27,7 @@ from dask.utils import parse_bytes try: - # TODO: Remove when dask>2024.11.2 + # TODO: Remove try/except when dask>2024.11.2 from dask._task_spec import List as TaskList except ImportError: From 9abaa7bea620deecdc80849901e52e9464a51403 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 26 Nov 2024 13:52:18 -0800 Subject: [PATCH 6/7] add Series.dtypes doc target --- docs/cudf/source/user_guide/api_docs/series.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/cudf/source/user_guide/api_docs/series.rst b/docs/cudf/source/user_guide/api_docs/series.rst index 48a7dc8ff87..ca57cf935f4 100644 --- a/docs/cudf/source/user_guide/api_docs/series.rst +++ b/docs/cudf/source/user_guide/api_docs/series.rst @@ -22,6 +22,7 @@ Attributes Series.values Series.data Series.dtype + Series.dtypes Series.shape Series.ndim Series.nullable From 2a9c93cf02bcd5725593bd9db002b105346d4f9b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 26 Nov 2024 14:46:44 -0800 Subject: [PATCH 7/7] address code review --- python/cudf/cudf/tests/test_series.py | 4 ++-- python/dask_cudf/dask_cudf/_legacy/io/orc.py | 11 ++++++----- python/dask_cudf/dask_cudf/_legacy/io/text.py | 12 ++++++------ 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/tests/test_series.py b/python/cudf/cudf/tests/test_series.py index adc5f1713e4..a040d1dc57f 100644 --- a/python/cudf/cudf/tests/test_series.py +++ b/python/cudf/cudf/tests/test_series.py @@ -2938,5 +2938,5 @@ def test_empty_astype_always_castable(type1, type2, as_dtype, copy): def test_dtype_dtypes_equal(): ser = cudf.Series([0]) - assert ser.dtype == ser.dtypes - assert ser.dtypes == ser.to_pandas().dtypes + assert ser.dtype is ser.dtypes + assert ser.dtypes is ser.to_pandas().dtypes diff --git a/python/dask_cudf/dask_cudf/_legacy/io/orc.py b/python/dask_cudf/dask_cudf/_legacy/io/orc.py index 9326f41f5a7..fcf684fd6c8 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/orc.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/orc.py @@ -12,9 +12,9 @@ import cudf -def _read_orc_stripe(input, columns=None, kwargs=None): +def _read_orc_stripe(source, fs, columns=None, kwargs=None): """Pull out specific columns from specific stripe""" - fs, path, stripe = input + path, stripe = source if kwargs is None: kwargs = {} with fs.open(path, "rb") as f: @@ -100,18 +100,19 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs): **kwargs, ) - inputs = [] + sources = [] for path, n in zip(paths, nstripes_per_file): for stripe in ( range(n) if filters is None else cudf.io.orc._filter_stripes(filters, path) ): - inputs.append((fs, path, stripe)) + sources.append((path, stripe)) return dd.from_map( _read_orc_stripe, - inputs, + sources, + args=[fs], columns=columns, kwargs=kwargs, meta=meta, diff --git a/python/dask_cudf/dask_cudf/_legacy/io/text.py b/python/dask_cudf/dask_cudf/_legacy/io/text.py index d0154411df2..3757c85c80c 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/text.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/text.py @@ -9,9 +9,9 @@ import cudf -def _read_text(input, **kwargs): +def _read_text(source, **kwargs): # Wrapper for cudf.read_text operation - fn, byte_range = input + fn, byte_range = source return cudf.read_text(fn, byte_range=byte_range, **kwargs) @@ -36,7 +36,7 @@ def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs): raise ValueError("Cannot specify both chunksize and byte_range.") if chunksize: - inputs = [] + sources = [] for fn in filenames: size = os.path.getsize(fn) for start in range(0, size, chunksize): @@ -44,13 +44,13 @@ def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs): start, chunksize, ) # specify which chunk of the file we care about - inputs.append((fn, byte_range)) + sources.append((fn, byte_range)) else: - inputs = [(fn, byte_range) for fn in filenames] + sources = [(fn, byte_range) for fn in filenames] return dd.from_map( _read_text, - inputs, + sources, meta=cudf.Series([], dtype="O"), **kwargs, )