diff --git a/README.md b/README.md index 205e16ea0e5..75ee405bc1f 100644 --- a/README.md +++ b/README.md @@ -14,13 +14,8 @@ You can import `cudf` directly and use it like `pandas`: ```python import cudf -import requests -from io import StringIO -url = "https://github.com/plotly/datasets/raw/master/tips.csv" -content = requests.get(url).content.decode("utf-8") - -tips_df = cudf.read_csv(StringIO(content)) +tips_df = cudf.read_csv("https://github.com/plotly/datasets/raw/master/tips.csv") tips_df["tip_percentage"] = tips_df["tip"] / tips_df["total_bill"] * 100 # display average tip by dining party size @@ -36,13 +31,8 @@ supported operations and falling back to pandas when needed: %load_ext cudf.pandas # pandas operations now use the GPU! import pandas as pd -import requests -from io import StringIO - -url = "https://github.com/plotly/datasets/raw/master/tips.csv" -content = requests.get(url).content.decode("utf-8") -tips_df = pd.read_csv(StringIO(content)) +tips_df = pd.read_csv("https://github.com/plotly/datasets/raw/master/tips.csv") tips_df["tip_percentage"] = tips_df["tip"] / tips_df["total_bill"] * 100 # display average tip by dining party size diff --git a/cpp/include/cudf/io/csv.hpp b/cpp/include/cudf/io/csv.hpp index fdceda40e92..a20f75cecd7 100644 --- a/cpp/include/cudf/io/csv.hpp +++ b/cpp/include/cudf/io/csv.hpp @@ -106,6 +106,9 @@ class csv_reader_options { char _quotechar = '"'; // Whether a quote inside a value is double-quoted bool _doublequote = true; + // Whether to detect quotes surrounded by spaces e.g. ` "data" `. This flag has no effect when + // _doublequote is true + bool _detect_whitespace_around_quotes = false; // Names of columns to read as datetime std::vector _parse_dates_names; // Indexes of columns to read as datetime @@ -375,6 +378,17 @@ class csv_reader_options { */ [[nodiscard]] bool is_enabled_doublequote() const { return _doublequote; } + /** + * @brief Whether to detect quotes surrounded by spaces e.g. ` "data" `. This flag has no + * effect when _doublequote is true + * + * @return `true` if detect_whitespace_around_quotes is enabled + */ + [[nodiscard]] bool is_enabled_detect_whitespace_around_quotes() const + { + return _detect_whitespace_around_quotes; + } + /** * @brief Returns names of columns to read as datetime. * @@ -698,6 +712,14 @@ class csv_reader_options { */ void enable_doublequote(bool val) { _doublequote = val; } + /** + * @brief Sets whether to detect quotes surrounded by spaces e.g. ` "data" `. This flag has no + * effect when _doublequote is true + * + * @param val Boolean value to enable/disable + */ + void enable_detect_whitespace_around_quotes(bool val) { _detect_whitespace_around_quotes = val; } + /** * @brief Sets names of columns to read as datetime. * @@ -1126,6 +1148,19 @@ class csv_reader_options_builder { return *this; } + /** + * @brief Sets whether to detect quotes surrounded by spaces e.g. ` "data" `. This flag has no + * effect when _doublequote is true + * + * @param val Boolean value to enable/disable + * @return this for chaining + */ + csv_reader_options_builder& detect_whitespace_around_quotes(bool val) + { + options._detect_whitespace_around_quotes = val; + return *this; + } + /** * @brief Sets names of columns to read as datetime. * diff --git a/cpp/src/io/csv/csv_gpu.cu b/cpp/src/io/csv/csv_gpu.cu index 9c186f161b3..7a05d0aebaf 100644 --- a/cpp/src/io/csv/csv_gpu.cu +++ b/cpp/src/io/csv/csv_gpu.cu @@ -351,9 +351,19 @@ CUDF_KERNEL void __launch_bounds__(csvparse_block_dim) if (dtypes[actual_col].id() == cudf::type_id::STRING) { auto end = next_delimiter; if (not options.keepquotes) { - if ((*field_start == options.quotechar) && (*(end - 1) == options.quotechar)) { - ++field_start; - --end; + if (not options.detect_whitespace_around_quotes) { + if ((*field_start == options.quotechar) && (*(end - 1) == options.quotechar)) { + ++field_start; + --end; + } + } else { + // If the string is quoted, whitespace around the quotes get removed as well + auto const trimmed_field = trim_whitespaces(field_start, end); + if ((*trimmed_field.first == options.quotechar) && + (*(trimmed_field.second - 1) == options.quotechar)) { + field_start = trimmed_field.first + 1; + end = trimmed_field.second - 1; + } } } auto str_list = static_cast*>(columns[actual_col]); diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 67c1194578a..5dee0c17a33 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -951,8 +951,10 @@ parse_options make_parse_options(csv_reader_options const& reader_opts, parse_opts.terminator = reader_opts.get_lineterminator(); if (reader_opts.get_quotechar() != '\0' && reader_opts.get_quoting() != quote_style::NONE) { - parse_opts.quotechar = reader_opts.get_quotechar(); - parse_opts.keepquotes = false; + parse_opts.quotechar = reader_opts.get_quotechar(); + parse_opts.keepquotes = false; + parse_opts.detect_whitespace_around_quotes = + reader_opts.is_enabled_detect_whitespace_around_quotes(); parse_opts.doublequote = reader_opts.is_enabled_doublequote(); } else { parse_opts.quotechar = '\0'; diff --git a/cpp/src/io/utilities/parsing_utils.cuh b/cpp/src/io/utilities/parsing_utils.cuh index 06a0a63c0ab..faee05541cc 100644 --- a/cpp/src/io/utilities/parsing_utils.cuh +++ b/cpp/src/io/utilities/parsing_utils.cuh @@ -63,6 +63,7 @@ struct parse_options_view { char thousands; char comment; bool keepquotes; + bool detect_whitespace_around_quotes; bool doublequote; bool dayfirst; bool skipblanklines; @@ -80,6 +81,7 @@ struct parse_options { char thousands; char comment; bool keepquotes; + bool detect_whitespace_around_quotes; bool doublequote; bool dayfirst; bool skipblanklines; @@ -105,6 +107,7 @@ struct parse_options { thousands, comment, keepquotes, + detect_whitespace_around_quotes, doublequote, dayfirst, skipblanklines, diff --git a/cpp/tests/io/csv_test.cpp b/cpp/tests/io/csv_test.cpp index 8e3ecd817e4..880dc911954 100644 --- a/cpp/tests/io/csv_test.cpp +++ b/cpp/tests/io/csv_test.cpp @@ -1018,6 +1018,47 @@ TEST_F(CsvReaderTest, StringsQuotesIgnored) view.column(1)); } +TEST_F(CsvReaderTest, StringsQuotesWhitespace) +{ + std::vector names{"line", "verse"}; + + auto filepath = temp_env->get_temp_dir() + "StringsQuotesIgnored.csv"; + { + std::ofstream outfile(filepath, std::ofstream::out); + outfile << names[0] << ',' << names[1] << '\n'; + outfile << "A,a" << '\n'; // unquoted no whitespace + outfile << " B,b" << '\n'; // unquoted leading whitespace + outfile << "C ,c" << '\n'; // unquoted trailing whitespace + outfile << " D ,d" << '\n'; // unquoted leading and trailing whitespace + outfile << "\"E\",e" << '\n'; // quoted no whitespace + outfile << "\"F\" ,f" << '\n'; // quoted trailing whitespace + outfile << " \"G\",g" << '\n'; // quoted leading whitespace + outfile << " \"H\" ,h" << '\n'; // quoted leading and trailing whitespace + outfile << " \" I \" ,i" + << '\n'; // quoted leading and trailing whitespace with spaces inside quotes + } + + cudf::io::csv_reader_options in_opts = + cudf::io::csv_reader_options::builder(cudf::io::source_info{filepath}) + .names(names) + .dtypes(std::vector{dtype(), dtype()}) + .quoting(cudf::io::quote_style::ALL) + .doublequote(false) + .detect_whitespace_around_quotes(true); + auto result = cudf::io::read_csv(in_opts); + + auto const view = result.tbl->view(); + ASSERT_EQ(2, view.num_columns()); + ASSERT_EQ(type_id::STRING, view.column(0).type().id()); + ASSERT_EQ(type_id::STRING, view.column(1).type().id()); + + expect_column_data_equal( + std::vector{"A", " B", "C ", " D ", "E", "F", "G", "H", " I "}, + view.column(0)); + expect_column_data_equal(std::vector{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, + view.column(1)); +} + TEST_F(CsvReaderTest, SkiprowsNrows) { auto filepath = temp_env->get_temp_dir() + "SkiprowsNrows.csv"; diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 70acb7f917b..f0eef9be124 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -408,6 +408,10 @@ def write_parquet( object force_nullable_schema=False, header_version="1.0", use_dictionary=True, + object skip_compression=None, + object column_encoding=None, + object column_type_length=None, + object output_as_binary=None, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -458,7 +462,12 @@ def write_parquet( _set_col_metadata( table[name]._column, tbl_meta.column_metadata[i], - force_nullable_schema + force_nullable_schema, + None, + skip_compression, + column_encoding, + column_type_length, + output_as_binary ) cdef map[string, string] tmp_user_data @@ -810,16 +819,62 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression): raise ValueError("Unsupported `compression` type") +cdef cudf_io_types.column_encoding _get_encoding_type(object encoding): + if encoding is None: + return cudf_io_types.column_encoding.USE_DEFAULT + + enc = str(encoding).upper() + if enc == "PLAIN": + return cudf_io_types.column_encoding.PLAIN + elif enc == "DICTIONARY": + return cudf_io_types.column_encoding.DICTIONARY + elif enc == "DELTA_BINARY_PACKED": + return cudf_io_types.column_encoding.DELTA_BINARY_PACKED + elif enc == "DELTA_LENGTH_BYTE_ARRAY": + return cudf_io_types.column_encoding.DELTA_LENGTH_BYTE_ARRAY + elif enc == "DELTA_BYTE_ARRAY": + return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY + elif enc == "BYTE_STREAM_SPLIT": + return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT + elif enc == "USE_DEFAULT": + return cudf_io_types.column_encoding.USE_DEFAULT + else: + raise ValueError("Unsupported `column_encoding` type") + + cdef _set_col_metadata( Column col, column_in_metadata& col_meta, bool force_nullable_schema=False, + str path=None, + object skip_compression=None, + object column_encoding=None, + object column_type_length=None, + object output_as_binary=None, ): + need_path = (skip_compression is not None or column_encoding is not None or + column_type_length is not None or output_as_binary is not None) + name = col_meta.get_name().decode('UTF-8') if need_path else None + full_path = path + "." + name if path is not None else name + if force_nullable_schema: # Only set nullability if `force_nullable_schema` # is true. col_meta.set_nullability(True) + if skip_compression is not None and full_path in skip_compression: + col_meta.set_skip_compression(True) + + if column_encoding is not None and full_path in column_encoding: + col_meta.set_encoding(_get_encoding_type(column_encoding[full_path])) + + if column_type_length is not None and full_path in column_type_length: + col_meta.set_output_as_binary(True) + col_meta.set_type_length(column_type_length[full_path]) + + if output_as_binary is not None and full_path in output_as_binary: + col_meta.set_output_as_binary(True) + if isinstance(col.dtype, cudf.StructDtype): for i, (child_col, name) in enumerate( zip(col.children, list(col.dtype.fields)) @@ -828,13 +883,26 @@ cdef _set_col_metadata( _set_col_metadata( child_col, col_meta.child(i), - force_nullable_schema + force_nullable_schema, + full_path, + skip_compression, + column_encoding, + column_type_length, + output_as_binary ) elif isinstance(col.dtype, cudf.ListDtype): + if full_path is not None: + full_path = full_path + ".list" + col_meta.child(1).set_name("element".encode()) _set_col_metadata( col.children[1], col_meta.child(1), - force_nullable_schema + force_nullable_schema, + full_path, + skip_compression, + column_encoding, + column_type_length, + output_as_binary ) elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype): col_meta.set_decimal_precision(col.dtype.precision) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/csv.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/csv.pxd index 754dd37d53f..b5ff6558cd8 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/csv.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/csv.pxd @@ -50,6 +50,7 @@ cdef extern from "cudf/io/csv.hpp" \ cudf_io_types.quote_style get_quoting() except + char get_quotechar() except + bool is_enabled_doublequote() except + + bool is_enabled_updated_quotes_detection() except + vector[string] get_parse_dates_names() except + vector[int] get_parse_dates_indexes() except + vector[string] get_parse_hex_names() except + @@ -95,6 +96,7 @@ cdef extern from "cudf/io/csv.hpp" \ void set_quoting(cudf_io_types.quote_style style) except + void set_quotechar(char val) except + void set_doublequote(bool val) except + + void set_detect_whitespace_around_quotes(bool val) except + void set_parse_dates(vector[string]) except + void set_parse_dates(vector[int]) except + void set_parse_hex(vector[string]) except + @@ -163,6 +165,7 @@ cdef extern from "cudf/io/csv.hpp" \ ) except + csv_reader_options_builder& quotechar(char val) except + csv_reader_options_builder& doublequote(bool val) except + + csv_reader_options_builder& detect_whitespace_around_quotes(bool val) except + csv_reader_options_builder& parse_dates(vector[string]) except + csv_reader_options_builder& parse_dates(vector[int]) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd index 4725c4e5937..38fae1df1e5 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd @@ -1,6 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t +from libc.stdint cimport int32_t, uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport shared_ptr, unique_ptr @@ -57,6 +57,19 @@ cdef extern from "cudf/io/types.hpp" \ ADAPTIVE = 1, ALWAYS = 2, + cdef extern from "cudf/io/types.hpp" namespace "cudf::io" nogil: + cpdef enum class column_encoding: + USE_DEFAULT = -1 + DICTIONARY = 0 + PLAIN = 1 + DELTA_BINARY_PACKED = 2 + DELTA_LENGTH_BYTE_ARRAY =3 + DELTA_BYTE_ARRAY = 4 + BYTE_STREAM_SPLIT = 5 + DIRECT = 6 + DIRECT_V2 = 7 + DICTIONARY_V2 = 8 + cdef cppclass column_name_info: string name vector[column_name_info] children @@ -81,6 +94,9 @@ cdef extern from "cudf/io/types.hpp" \ column_in_metadata& set_decimal_precision(uint8_t precision) column_in_metadata& child(size_type i) column_in_metadata& set_output_as_binary(bool binary) + column_in_metadata& set_type_length(int32_t type_length) + column_in_metadata& set_skip_compression(bool skip) + column_in_metadata& set_encoding(column_encoding enc) string get_name() cdef cppclass table_input_metadata: diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 1785eb834b2..59bae179497 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -333,16 +333,27 @@ def from_arrow(cls, array: pa.Array) -> ColumnBase: """ if not isinstance(array, (pa.Array, pa.ChunkedArray)): raise TypeError("array should be PyArrow array or chunked array") - - data = pa.table([array], [None]) - - if ( - isinstance(array.type, pa.TimestampType) - and array.type.tz is not None - ): + elif pa.types.is_float16(array.type): + raise NotImplementedError( + "Type casting from `float16` to `float32` is not " + "yet supported in pyarrow, see: " + "https://github.com/apache/arrow/issues/20213" + ) + elif pa.types.is_timestamp(array.type) and array.type.tz is not None: raise NotImplementedError( "cuDF does not yet support timezone-aware datetimes" ) + elif isinstance(array.type, ArrowIntervalType): + return cudf.core.column.IntervalColumn.from_arrow(array) + elif pa.types.is_large_string(array.type): + # Pandas-2.2+: Pandas defaults to `large_string` type + # instead of `string` without data-introspection. + # Temporary workaround until cudf has native + # support for `LARGE_STRING` i.e., 64 bit offsets + array = array.cast(pa.string()) + + data = pa.table([array], [None]) + if isinstance(array.type, pa.DictionaryType): indices_table = pa.table( { @@ -371,8 +382,6 @@ def from_arrow(cls, array: pa.Array) -> ColumnBase: size=codes.size, ordered=array.type.ordered, ) - elif isinstance(array.type, ArrowIntervalType): - return cudf.core.column.IntervalColumn.from_arrow(array) result = libcudf.interop.from_arrow(data)[0] @@ -1809,27 +1818,7 @@ def as_column( return col elif isinstance(arbitrary, (pa.Array, pa.ChunkedArray)): - if pa.types.is_large_string(arbitrary.type): - # Pandas-2.2+: Pandas defaults to `large_string` type - # instead of `string` without data-introspection. - # Temporary workaround until cudf has native - # support for `LARGE_STRING` i.e., 64 bit offsets - arbitrary = arbitrary.cast(pa.string()) - - if pa.types.is_float16(arbitrary.type): - raise NotImplementedError( - "Type casting from `float16` to `float32` is not " - "yet supported in pyarrow, see: " - "https://github.com/apache/arrow/issues/20213" - ) - elif ( - pa.types.is_timestamp(arbitrary.type) - and arbitrary.type.tz is not None - ): - raise NotImplementedError( - "cuDF does not yet support timezone-aware datetimes" - ) - elif (nan_as_null is None or nan_as_null) and pa.types.is_floating( + if (nan_as_null is None or nan_as_null) and pa.types.is_floating( arbitrary.type ): arbitrary = pc.if_else( @@ -1837,31 +1826,12 @@ def as_column( pa.nulls(len(arbitrary), type=arbitrary.type), arbitrary, ) + elif dtype is None and pa.types.is_null(arbitrary.type): + # default "empty" type + dtype = "str" col = ColumnBase.from_arrow(arbitrary) - if isinstance(arbitrary, pa.NullArray): - if dtype is not None: - # Cast the column to the `dtype` if specified. - new_dtype = dtype - elif len(arbitrary) == 0: - # If the column is empty, it has to be - # a `str` dtype. - new_dtype = cudf.dtype("str") - else: - # If the null column is not empty, it has to - # be of `object` dtype. - new_dtype = cudf.dtype(arbitrary.type.to_pandas_dtype()) - - if cudf.get_option( - "mode.pandas_compatible" - ) and new_dtype == cudf.dtype("O"): - # We internally raise if we do `astype("object")`, hence - # need to cast to `str` since this is safe to do so because - # it is a null-array. - new_dtype = "str" - - col = col.astype(new_dtype) - elif dtype is not None: + if dtype is not None: col = col.astype(dtype) return col diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 0b7c40ff516..1f530aa3108 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -254,7 +254,7 @@ def _getitem_tuple_arg(self, arg): # Step 1: Gather columns if isinstance(arg, tuple): columns_df = self._frame._get_columns_by_label(arg[1]) - columns_df._index = self._frame._index + columns_df.index = self._frame.index else: columns_df = self._frame @@ -545,7 +545,7 @@ def __getitem__(self, arg): @_cudf_nvtx_annotate def _setitem_tuple_arg(self, key, value): columns_df = self._frame._from_data( - self._frame._data.select_by_index(key[1]), self._frame._index + self._frame._data.select_by_index(key[1]), self._frame.index ) if is_scalar(value): @@ -710,11 +710,11 @@ def __init__( if index is not None: if not data.index.equals(index): data = data.reindex(index) - index = data._index + index = data.index else: index = as_index(index) else: - index = data._index + index = data.index self._index = index @@ -1176,7 +1176,7 @@ def _constructor_expanddim(self): def serialize(self): header, frames = super().serialize() - header["index"], index_frames = self._index.serialize() + header["index"], index_frames = self.index.serialize() header["index_frame_count"] = len(index_frames) # For backwards compatibility with older versions of cuDF, index # columns are placed before data columns. @@ -1193,7 +1193,7 @@ def deserialize(cls, header, frames): idx_typ = pickle.loads(header["index"]["type-serialized"]) index = idx_typ.deserialize(header["index"], frames[:index_nframes]) - obj._index = index + obj.index = index return obj @@ -1396,27 +1396,27 @@ def __setitem__(self, arg, value): else: if arg in self._data: if not is_scalar(value) and len(self) == 0: + value = column.as_column(value) + length = len(value) + new_columns = ( + value + if key == arg + else column.column_empty_like( + col, masked=True, newsize=length + ) + for key, col in self._data.items() + ) + self._data = self._data._from_columns_like_self( + new_columns, verify=False + ) if isinstance(value, (pd.Series, Series)): self._index = as_index(value.index) elif len(value) > 0: - self._index = RangeIndex(start=0, stop=len(value)) - value = column.as_column(value) - new_data = self._data.__class__() - for key in self._data: - if key == arg: - new_data[key] = value - else: - new_data[key] = column.column_empty_like( - self._data[key], - masked=True, - newsize=len(value), - ) - - self._data = new_data + self._index = RangeIndex(length) return elif isinstance(value, (pd.Series, Series)): value = Series(value)._align_to_index( - self._index, + self.index, how="right", sort=False, allow_non_unique=True, @@ -1489,7 +1489,7 @@ def memory_usage(self, index=True, deep=False): mem_usage = [col.memory_usage for col in self._data.columns] names = [str(name) for name in self._data.names] if index: - mem_usage.append(self._index.memory_usage()) + mem_usage.append(self.index.memory_usage()) names.append("Index") return Series._from_data( data={None: as_column(mem_usage)}, @@ -1698,7 +1698,7 @@ def _concat( [] if are_all_range_index or (ignore_index and not empty_has_index) - else list(f._index._data.columns) + else list(f.index._data.columns) ) + [f._data[name] if name in f._data else None for name in names] for f in objs @@ -1761,11 +1761,9 @@ def _concat( # least one input frame has an index, assign a new RangeIndex # to the result frame. if empty_has_index and num_empty_input_frames == len(objs): - out._index = cudf.RangeIndex(result_index_length) + out.index = cudf.RangeIndex(result_index_length) elif are_all_range_index and not ignore_index: - out._index = cudf.core.index.Index._concat( - [o._index for o in objs] - ) + out.index = cudf.core.index.Index._concat([o.index for o in objs]) # Reassign the categories for any categorical table cols _reassign_categories( @@ -1773,14 +1771,14 @@ def _concat( ) # Reassign the categories for any categorical index cols - if not isinstance(out._index, cudf.RangeIndex): + if not isinstance(out.index, cudf.RangeIndex): _reassign_categories( categories, - out._index._data, + out.index._data, indices[:first_data_column_position], ) - if not isinstance(out._index, MultiIndex) and isinstance( - out._index.dtype, cudf.CategoricalDtype + if not isinstance(out.index, MultiIndex) and isinstance( + out.index.dtype, cudf.CategoricalDtype ): out = out.set_index( cudf.core.index.as_index(out.index._values) @@ -1796,8 +1794,8 @@ def _concat( else: out.columns = names if not ignore_index: - out._index.name = objs[0]._index.name - out._index.names = objs[0]._index.names + out.index.name = objs[0].index.name + out.index.names = objs[0].index.names return out @@ -1965,7 +1963,7 @@ def _get_renderable_dataframe(self): output = cudf.concat([upper, lower]) output = self._clean_nulls_from_dataframe(output) - output._index = output._index._clean_nulls_from_index() + output.index = output.index._clean_nulls_from_index() return output @@ -2036,7 +2034,7 @@ def _make_operands_and_index_for_binop( bool, ]: lhs, rhs = self._data, other - index = self._index + index = self.index fill_requires_key = False left_default: Any = False equal_columns = False @@ -2081,7 +2079,7 @@ def _make_operands_and_index_for_binop( "Can only compare identically-labeled DataFrame objects" ) new_lhs, new_rhs = _align_indices(self, other) - index = new_lhs._index + index = new_lhs.index lhs, rhs = new_lhs._data, new_rhs._data fill_requires_key = True # For DataFrame-DataFrame ops, always default to operating against @@ -2455,7 +2453,7 @@ def scatter_by_map( ) partitioned_columns, output_offsets = libcudf.partitioning.partition( - [*(self._index._columns if keep_index else ()), *self._columns], + [*(self.index._columns if keep_index else ()), *self._columns], map_index, map_size, ) @@ -3248,23 +3246,28 @@ def _insert(self, loc, name, value, nan_as_null=None, ignore_index=True): if len(self) == 0: if isinstance(value, (pd.Series, Series)): if not ignore_index: - self._index = as_index(value.index) - elif len(value) > 0: - self._index = RangeIndex(start=0, stop=len(value)) - new_data = self._data.__class__() + self.index = as_index(value.index) + elif (length := len(value)) > 0: if num_cols != 0: - for col_name in self._data: - new_data[col_name] = column.column_empty_like( - self._data[col_name], - masked=True, - newsize=len(value), - ) - self._data = new_data + ca = self._data._from_columns_like_self( + ( + column.column_empty_like( + col_data, masked=True, newsize=length + ) + for col_data in self._data.values() + ), + verify=False, + ) + else: + ca = ColumnAccessor({}) + self._data = ca + self._index = RangeIndex(length) + elif isinstance(value, (pd.Series, Series)): value = Series(value, nan_as_null=nan_as_null) if not ignore_index: value = value._align_to_index( - self._index, how="right", sort=False + self.index, how="right", sort=False ) value = column.as_column(value, nan_as_null=nan_as_null) @@ -3293,7 +3296,7 @@ def axes(self): Index(['key', 'k2', 'val', 'temp'], dtype='object')] """ - return [self._index, self._data.to_pandas_index()] + return [self.index, self._data.to_pandas_index()] def diff(self, periods=1, axis=0): """ @@ -4853,8 +4856,8 @@ def partition_by_hash(self, columns, nparts, keep_index=True): """ key_indices = [self._column_names.index(k) for k in columns] if keep_index: - cols = [*self._index._columns, *self._columns] - key_indices = [i + len(self._index._columns) for i in key_indices] + cols = [*self.index._columns, *self._columns] + key_indices = [i + len(self.index._columns) for i in key_indices] else: cols = [*self._columns] @@ -5019,13 +5022,13 @@ def info( lines = [str(type(self))] - index_name = type(self._index).__name__ - if len(self._index) > 0: - entries_summary = f", {self._index[0]} to {self._index[-1]}" + index_name = type(self.index).__name__ + if len(self.index) > 0: + entries_summary = f", {self.index[0]} to {self.index[-1]}" else: entries_summary = "" index_summary = ( - f"{index_name}: {len(self._index)} entries{entries_summary}" + f"{index_name}: {len(self.index)} entries{entries_summary}" ) lines.append(index_summary) @@ -5629,7 +5632,7 @@ def from_records(cls, data, index=None, columns=None, nan_as_null=False): num_cols = len(data[0]) if columns is None and data.dtype.names is None: - names = [i for i in range(num_cols)] + names = range(num_cols) elif data.dtype.names is not None: names = data.dtype.names @@ -5642,28 +5645,43 @@ def from_records(cls, data, index=None, columns=None, nan_as_null=False): ) names = columns - df = DataFrame() - if data.ndim == 2: - for i, k in enumerate(names): - df._data[k] = column.as_column( - data[:, i], nan_as_null=nan_as_null - ) + ca_data = { + k: column.as_column(data[:, i], nan_as_null=nan_as_null) + for i, k in enumerate(names) + } elif data.ndim == 1: - for k in names: - df._data[k] = column.as_column( - data[k], nan_as_null=nan_as_null - ) + ca_data = { + name: column.as_column(data[name], nan_as_null=nan_as_null) + for name in names + } - if index is None: - df._index = RangeIndex(start=0, stop=len(data)) - elif is_scalar(index): - df._index = RangeIndex(start=0, stop=len(data)) - df = df.set_index(index) + if not is_scalar(index): + new_index = as_index(index) + else: + new_index = None + + if isinstance(columns, (pd.Index, cudf.Index)): + level_names = tuple(columns.names) else: - df._index = as_index(index) - if isinstance(columns, pd.Index): - df._data._level_names = tuple(columns.names) + level_names = None + + df = cls._from_data( + ColumnAccessor( + data=ca_data, + multiindex=isinstance( + columns, (pd.MultiIndex, cudf.MultiIndex) + ), + rangeindex=isinstance( + columns, (range, pd.RangeIndex, cudf.RangeIndex) + ), + level_names=level_names, + label_dtype=getattr(columns, "dtype", None), + ), + index=new_index, + ) + if is_scalar(index) and index is not None: + df = df.set_index(index) return df @classmethod @@ -5712,26 +5730,38 @@ def _from_arrays(cls, data, index=None, columns=None, nan_as_null=False): raise ValueError("Duplicate column names are not allowed") names = columns - df = cls() if data.ndim == 2: - for i, k in enumerate(names): - df._data[k] = column.as_column( - data[:, i], nan_as_null=nan_as_null - ) + ca_data = { + k: column.as_column(data[:, i], nan_as_null=nan_as_null) + for i, k in enumerate(names) + } elif data.ndim == 1: - df._data[names[0]] = column.as_column( - data, nan_as_null=nan_as_null - ) - if isinstance(columns, pd.Index): - df._data._level_names = tuple(columns.names) - if isinstance(columns, (range, pd.RangeIndex, cudf.RangeIndex)): - df._data.rangeindex = True + ca_data = { + names[0]: column.as_column(data, nan_as_null=nan_as_null) + } - if index is None: - df._index = RangeIndex(start=0, stop=len(data)) + if index is not None: + index = as_index(index) + + if isinstance(columns, (pd.Index, cudf.Index)): + level_names = tuple(columns.names) else: - df._index = as_index(index) - return df + level_names = None + + return cls._from_data( + ColumnAccessor( + data=ca_data, + multiindex=isinstance( + columns, (pd.MultiIndex, cudf.MultiIndex) + ), + rangeindex=isinstance( + columns, (range, pd.RangeIndex, cudf.RangeIndex) + ), + level_names=level_names, + label_dtype=getattr(columns, "dtype", None), + ), + index=index, + ) @_cudf_nvtx_annotate def interpolate( @@ -6677,6 +6707,10 @@ def to_parquet( return_metadata=False, use_dictionary=True, header_version="1.0", + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, *args, **kwargs, ): @@ -6703,6 +6737,10 @@ def to_parquet( return_metadata=return_metadata, use_dictionary=use_dictionary, header_version=header_version, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, *args, **kwargs, ) @@ -7006,7 +7044,7 @@ def stack(self, level=-1, dropna=no_default, future_stack=False): # Assemble the final index new_index_columns = [*repeated_index._columns, *tiled_index] - index_names = [*self._index.names, *unique_named_levels.names] + index_names = [*self.index.names, *unique_named_levels.names] new_index = MultiIndex.from_frame( DataFrame._from_data( dict(zip(range(0, len(new_index_columns)), new_index_columns)) @@ -7797,7 +7835,7 @@ def value_counts( result = result / result._column.sum() # Pandas always returns MultiIndex even if only one column. if not isinstance(result.index, MultiIndex): - result.index = MultiIndex._from_data(result._index._data) + result.index = MultiIndex._from_data(result.index._data) result.name = "proportion" if normalize else "count" return result diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 3e4b8192888..3e7a1ee6026 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -40,6 +40,15 @@ from cudf.utils.utils import GetAttrGetItemMixin +def _deprecate_collect(): + warnings.warn( + "Groupby.collect is deprecated and " + "will be removed in a future version. " + "Use `.agg(list)` instead.", + FutureWarning, + ) + + # The three functions below return the quantiles [25%, 50%, 75%] # respectively, which are called in the describe() method to output # the summary stats of a GroupBy object @@ -940,7 +949,7 @@ def nth(self, n): result = result[sizes > n] - result._index = self.obj.index.take( + result.index = self.obj.index.take( result._data["__groupbynth_order__"] ) del result._data["__groupbynth_order__"] @@ -1029,7 +1038,7 @@ def ngroup(self, ascending=True): if has_null_group: group_ids.iloc[-1] = cudf.NA - group_ids._index = index + group_ids.index = index return self._broadcast(group_ids) def sample( @@ -1199,7 +1208,7 @@ def deserialize(cls, header, frames): def _grouped(self, *, include_groups: bool = True): offsets, grouped_key_cols, grouped_value_cols = self._groupby.groups( - [*self.obj._index._columns, *self.obj._columns] + [*self.obj.index._columns, *self.obj._columns] ) grouped_keys = cudf.core.index._index_from_data( dict(enumerate(grouped_key_cols)) @@ -2180,7 +2189,8 @@ def func(x): @_cudf_nvtx_annotate def collect(self): """Get a list of all the values for each column in each group.""" - return self.agg("collect") + _deprecate_collect() + return self.agg(list) @_cudf_nvtx_annotate def unique(self): @@ -2839,8 +2849,8 @@ def _handle_label(self, by): self._key_columns.append(self._obj._data[by]) except KeyError as e: # `by` can be index name(label) too. - if by in self._obj._index.names: - self._key_columns.append(self._obj._index._data[by]) + if by in self._obj.index.names: + self._key_columns.append(self._obj.index._data[by]) else: raise e self.names.append(by) diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 7aae0d1729e..a166c256689 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -289,11 +289,11 @@ def __init__(self, data=None, index=None): @property def _num_rows(self) -> int: # Important to use the index because the data may be empty. - return len(self._index) + return len(self.index) @property def _index_names(self) -> Tuple[Any, ...]: # TODO: Tuple[str]? - return self._index._data.names + return self.index._data.names @classmethod def _from_data( @@ -307,7 +307,7 @@ def _from_data( @_cudf_nvtx_annotate def _from_data_like_self(self, data: MutableMapping): - out = self._from_data(data, self._index) + out = self._from_data(data, self.index) out._data._level_names = self._data._level_names return out @@ -350,7 +350,7 @@ def _from_columns_like_self( frame = self.__class__._from_data(data) if index is not None: - frame._index = index + frame.index = index return frame._copy_type_metadata( self, include_index=bool(index_names), @@ -367,7 +367,7 @@ def _mimic_inplace( self, result: Self, inplace: bool = False ) -> Optional[Self]: if inplace: - self._index = result._index + self._index = result.index return super()._mimic_inplace(result, inplace) # Scans @@ -442,15 +442,15 @@ def _scan(self, op, axis=None, skipna=True): # pandas returns an int64 dtype for all int or bool dtypes. result_col = result_col.astype(np.int64) results[name] = getattr(result_col, op)() - return self._from_data(results, self._index) + return self._from_data(results, self.index) def _check_data_index_length_match(self) -> None: # Validate that the number of rows in the data matches the index if the # data is not empty. This is a helper for the constructor. - if self._data.nrows > 0 and self._data.nrows != len(self._index): + if self._data.nrows > 0 and self._data.nrows != len(self.index): raise ValueError( f"Length of values ({self._data.nrows}) does not " - f"match length of index ({len(self._index)})" + f"match length of index ({len(self.index)})" ) @property @@ -618,14 +618,14 @@ def copy(self, deep: bool = True) -> Self: return self._from_data( self._data.copy(deep=deep), # Indexes are immutable so copies can always be shallow. - self._index.copy(deep=False), + self.index.copy(deep=False), ) @_cudf_nvtx_annotate def equals(self, other): # noqa: D102 if not super().equals(other): return False - return self._index.equals(other._index) + return self.index.equals(other.index) @property def index(self): @@ -908,7 +908,7 @@ def replace( else: copy_data = self._data.copy(deep=True) - result = self._from_data(copy_data, self._index) + result = self._from_data(copy_data, self.index) return self._mimic_inplace(result, inplace=inplace) @@ -1033,7 +1033,7 @@ def clip(self, lower=None, upper=None, inplace=False, axis=1): name: col.clip(lower[i], upper[i]) for i, (name, col) in enumerate(self._data.items()) } - output = self._from_data(data, self._index) + output = self._from_data(data, self.index) output._copy_type_metadata(self, include_index=False) return self._mimic_inplace(output, inplace=inplace) @@ -1935,29 +1935,27 @@ def _copy_type_metadata( super()._copy_type_metadata(other, override_dtypes=override_dtypes) if ( include_index - and self._index is not None - and other._index is not None + and self.index is not None + and other.index is not None ): - self._index._copy_type_metadata(other._index) - # When other._index is a CategoricalIndex, the current index + self.index._copy_type_metadata(other.index) + # When other.index is a CategoricalIndex, the current index # will be a NumericalIndex with an underlying CategoricalColumn # (the above _copy_type_metadata call will have converted the # column). Calling cudf.Index on that column generates the # appropriate index. if isinstance( - other._index, cudf.core.index.CategoricalIndex - ) and not isinstance( - self._index, cudf.core.index.CategoricalIndex - ): - self._index = cudf.Index( - cast("cudf.Index", self._index)._column, - name=self._index.name, + other.index, cudf.core.index.CategoricalIndex + ) and not isinstance(self.index, cudf.core.index.CategoricalIndex): + self.index = cudf.Index( + cast("cudf.Index", self.index)._column, + name=self.index.name, ) - elif isinstance(other._index, cudf.MultiIndex) and not isinstance( - self._index, cudf.MultiIndex + elif isinstance(other.index, cudf.MultiIndex) and not isinstance( + self.index, cudf.MultiIndex ): - self._index = cudf.MultiIndex._from_data( - self._index._data, name=self._index.name + self.index = cudf.MultiIndex._from_data( + self.index._data, name=self.index.name ) return self @@ -2017,8 +2015,8 @@ def interpolate( data = self - if not isinstance(data._index, cudf.RangeIndex): - perm_sort = data._index.argsort() + if not isinstance(data.index, cudf.RangeIndex): + perm_sort = data.index.argsort() data = data._gather( GatherMap.from_column_unchecked( cudf.core.column.as_column(perm_sort), @@ -2040,13 +2038,13 @@ def interpolate( col = col.astype("float64").fillna(np.nan) # Interpolation methods may or may not need the index - columns[colname] = interpolator(col, index=data._index) + columns[colname] = interpolator(col, index=data.index) - result = self._from_data(columns, index=data._index) + result = self._from_data(columns, index=data.index) return ( result - if isinstance(data._index, cudf.RangeIndex) + if isinstance(data.index, cudf.RangeIndex) # TODO: This should be a scatter, avoiding an argsort. else result._gather( GatherMap.from_column_unchecked( @@ -2070,7 +2068,7 @@ def shift(self, periods=1, freq=None, axis=0, fill_value=None): col.shift(periods, fill_value) for col in self._columns ) return self.__class__._from_data( - zip(self._column_names, data_columns), self._index + zip(self._column_names, data_columns), self.index ) @_cudf_nvtx_annotate @@ -2254,7 +2252,7 @@ def truncate(self, before=None, after=None, axis=0, copy=True): if not copy: raise ValueError("Truncating with copy=False is not supported.") axis = self._get_axis_from_axis_arg(axis) - ax = self._index if axis == 0 else self._data.to_pandas_index() + ax = self.index if axis == 0 else self._data.to_pandas_index() if not ax.is_monotonic_increasing and not ax.is_monotonic_decreasing: raise ValueError("truncate requires a sorted index") @@ -2585,7 +2583,7 @@ def scale(self): vmin = self.min() vmax = self.max() scaled = (self - vmin) / (vmax - vmin) - scaled._index = self._index.copy(deep=False) + scaled.index = self.index.copy(deep=False) return scaled @_cudf_nvtx_annotate @@ -2919,14 +2917,14 @@ def _gather( raise IndexError("Gather map is out of bounds") return self._from_columns_like_self( libcudf.copying.gather( - list(self._index._columns + self._columns) + list(self.index._columns + self._columns) if keep_index else list(self._columns), gather_map.column, nullify=gather_map.nullify, ), self._column_names, - self._index.names if keep_index else None, + self.index.names if keep_index else None, ) def _slice(self, arg: slice, keep_index: bool = True) -> Self: @@ -3000,7 +2998,7 @@ def _slice(self, arg: slice, keep_index: bool = True) -> Self: columns_to_slice = [ *( - self._index._data.columns + self.index._data.columns if keep_index and not has_range_index else [] ), @@ -3009,7 +3007,7 @@ def _slice(self, arg: slice, keep_index: bool = True) -> Self: result = self._from_columns_like_self( libcudf.copying.columns_slice(columns_to_slice, [start, stop])[0], self._column_names, - None if has_range_index or not keep_index else self._index.names, + None if has_range_index or not keep_index else self.index.names, ) result._data.label_dtype = self._data.label_dtype result._data.rangeindex = self._data.rangeindex @@ -3028,7 +3026,7 @@ def _positions_from_column_names( indices returned corresponds to the column order in this Frame. """ num_index_columns = ( - len(self._index._data) if offset_by_index_columns else 0 + len(self.index._data) if offset_by_index_columns else 0 ) return [ i + num_index_columns @@ -3073,13 +3071,13 @@ def drop_duplicates( libcudf.stream_compaction.drop_duplicates( list(self._columns) if ignore_index - else list(self._index._columns + self._columns), + else list(self.index._columns + self._columns), keys=keys, keep=keep, nulls_are_equal=nulls_are_equal, ), self._column_names, - self._index.names if not ignore_index else None, + self.index.names if not ignore_index else None, ) @_cudf_nvtx_annotate @@ -3197,12 +3195,12 @@ def _empty_like(self, keep_index=True) -> Self: result = self._from_columns_like_self( libcudf.copying.columns_empty_like( [ - *(self._index._data.columns if keep_index else ()), + *(self.index._data.columns if keep_index else ()), *self._columns, ] ), self._column_names, - self._index.names if keep_index else None, + self.index.names if keep_index else None, ) result._data.label_dtype = self._data.label_dtype result._data.rangeindex = self._data.rangeindex @@ -3214,7 +3212,7 @@ def _split(self, splits, keep_index=True): columns_split = libcudf.copying.columns_split( [ - *(self._index._data.columns if keep_index else []), + *(self.index._data.columns if keep_index else []), *self._columns, ], splits, @@ -3224,7 +3222,7 @@ def _split(self, splits, keep_index=True): self._from_columns_like_self( columns_split[i], self._column_names, - self._index.names if keep_index else None, + self.index.names if keep_index else None, ) for i in range(len(splits) + 1) ] @@ -3244,12 +3242,12 @@ def fillna( "Use obj.ffill() or obj.bfill() instead.", FutureWarning, ) - old_index = self._index + old_index = self.index ret = super().fillna(value, method, axis, inplace, limit) if inplace: - self._index = old_index + self.index = old_index else: - ret._index = old_index + ret.index = old_index return ret @_cudf_nvtx_annotate @@ -3479,7 +3477,7 @@ def _apply(self, func, kernel_getter, *args, **kwargs): col = _post_process_output_col(ans_col, retty) col.set_base_mask(libcudf.transform.bools_to_mask(ans_mask)) - result = cudf.Series._from_data({None: col}, self._index) + result = cudf.Series._from_data({None: col}, self.index) return result @@ -3706,12 +3704,12 @@ def _reindex( df = self if index is not None: - if not df._index.is_unique: + if not df.index.is_unique: raise ValueError( "cannot reindex on an axis with duplicate labels" ) index = cudf.core.index.as_index( - index, name=getattr(index, "name", self._index.name) + index, name=getattr(index, "name", self.index.name) ) idx_dtype_match = (df.index.nlevels == index.nlevels) and all( @@ -3739,7 +3737,7 @@ def _reindex( else name: col for name, col in df._data.items() }, - index=df._index, + index=df.index, ) df = lhs.join(rhs, how="left", sort=True) # double-argsort to map back from sorted to unsorted positions @@ -3915,7 +3913,7 @@ def round(self, decimals=0, how="half_even"): multiindex=self._data.multiindex, level_names=self._data.level_names, ), - index=self._index, + index=self.index, ) def resample( @@ -4267,7 +4265,7 @@ def _drop_na_rows(self, how="any", subset=None, thresh=None): return self._from_columns_like_self( libcudf.stream_compaction.drop_nulls( - [*self._index._data.columns, *data_columns], + [*self.index._data.columns, *data_columns], how=how, keys=self._positions_from_column_names( subset, offset_by_index_columns=True @@ -4275,7 +4273,7 @@ def _drop_na_rows(self, how="any", subset=None, thresh=None): thresh=thresh, ), self._column_names, - self._index.names, + self.index.names, ) def _apply_boolean_mask(self, boolean_mask: BooleanMask, keep_index=True): @@ -4292,13 +4290,13 @@ def _apply_boolean_mask(self, boolean_mask: BooleanMask, keep_index=True): ) return self._from_columns_like_self( libcudf.stream_compaction.apply_boolean_mask( - list(self._index._columns + self._columns) + list(self.index._columns + self._columns) if keep_index else list(self._columns), boolean_mask.column, ), column_names=self._column_names, - index_names=self._index.names if keep_index else None, + index_names=self.index.names if keep_index else None, ) def take(self, indices, axis=0): @@ -4358,7 +4356,7 @@ def _reset_index(self, level, drop, col_level=0, col_fill=""): ) if not isinstance(level, (tuple, list)): level = (level,) - _check_duplicate_level_names(level, self._index.names) + _check_duplicate_level_names(level, self.index.names) index = self.index._new_index_for_reset_index(level, self.index.name) if index is None: @@ -4394,7 +4392,7 @@ def _first_or_last( self, offset, idx: int, op: Callable, side: str, slice_func: Callable ) -> "IndexedFrame": """Shared code path for ``first`` and ``last``.""" - if not isinstance(self._index, cudf.core.index.DatetimeIndex): + if not isinstance(self.index, cudf.core.index.DatetimeIndex): raise TypeError("'first' only supports a DatetimeIndex index.") if not isinstance(offset, str): raise NotImplementedError( @@ -4406,20 +4404,20 @@ def _first_or_last( pd_offset = pd.tseries.frequencies.to_offset(offset) to_search = op( - pd.Timestamp(self._index._column.element_indexing(idx)), pd_offset + pd.Timestamp(self.index._column.element_indexing(idx)), pd_offset ) if ( idx == 0 and not isinstance(pd_offset, pd.tseries.offsets.Tick) - and pd_offset.is_on_offset(pd.Timestamp(self._index[0])) + and pd_offset.is_on_offset(pd.Timestamp(self.index[0])) ): # Special handle is required when the start time of the index # is on the end of the offset. See pandas gh29623 for detail. to_search = to_search - pd_offset.base return self.loc[:to_search] - needle = as_column(to_search, dtype=self._index.dtype) + needle = as_column(to_search, dtype=self.index.dtype) end_point = int( - self._index._column.searchsorted( + self.index._column.searchsorted( needle, side=side ).element_indexing(0) ) @@ -4802,7 +4800,7 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): name: (col, None, False, None) for name, col in self._data.items() } - index = self._index + index = self.index data = self._apply_cupy_ufunc_to_operands( ufunc, cupy_func, inputs, **kwargs @@ -4880,7 +4878,7 @@ def repeat(self, repeats, axis=None): """ res = self._from_columns_like_self( Frame._repeat( - [*self._index._data.columns, *self._columns], repeats, axis + [*self.index._data.columns, *self._columns], repeats, axis ), self._column_names, self._index_names, @@ -5011,7 +5009,7 @@ def astype( raise e return self - return self._from_data(data, index=self._index) + return self._from_data(data, index=self.index) @_cudf_nvtx_annotate def drop( @@ -5220,8 +5218,7 @@ def drop( columns = _get_host_unique(columns) _drop_columns(dropped, columns, errors) - out._data = dropped._data - out._index = dropped._index + out._mimic_inplace(dropped, inplace=True) if not inplace: return out @@ -5234,18 +5231,18 @@ def _explode(self, explode_column: Any, ignore_index: bool): # exploded and will be replaced with a `RangeIndex`. if not isinstance(self._data[explode_column].dtype, ListDtype): data = self._data.copy(deep=True) - idx = None if ignore_index else self._index.copy(deep=True) + idx = None if ignore_index else self.index.copy(deep=True) return self.__class__._from_data(data, index=idx) column_index = self._column_names.index(explode_column) - if not ignore_index and self._index is not None: - index_offset = self._index.nlevels + if not ignore_index and self.index is not None: + index_offset = self.index.nlevels else: index_offset = 0 exploded = libcudf.lists.explode_outer( [ - *(self._index._data.columns if not ignore_index else ()), + *(self.index._data.columns if not ignore_index else ()), *self._columns, ], column_index + index_offset, @@ -5292,7 +5289,7 @@ def tile(self, count): """ return self._from_columns_like_self( libcudf.reshape.tile( - [*self._index._columns, *self._columns], count + [*self.index._columns, *self._columns], count ), column_names=self._column_names, index_names=self._index_names, @@ -6273,7 +6270,7 @@ def rank( return self.__class__._from_data( dict(zip(source._column_names, result_columns)), - index=source._index, + index=source.index, ).astype(np.float64) def convert_dtypes( @@ -6505,7 +6502,7 @@ def _is_series(obj): Checks if the `obj` is of type `cudf.Series` instead of checking for isinstance(obj, cudf.Series) """ - return isinstance(obj, Frame) and obj.ndim == 1 and obj._index is not None + return isinstance(obj, Frame) and obj.ndim == 1 and obj.index is not None @_cudf_nvtx_annotate @@ -6518,7 +6515,7 @@ def _drop_rows_by_labels( """Remove rows specified by `labels`. If `errors="raise"`, an error is raised if some items in `labels` do not - exist in `obj._index`. + exist in `obj.index`. Will raise if level(int) is greater or equal to index nlevels. """ @@ -6539,17 +6536,17 @@ def _drop_rows_by_labels( if isinstance(level, int): ilevel = level else: - ilevel = obj._index.names.index(level) + ilevel = obj.index.names.index(level) # 1. Merge Index df and data df along column axis: - # | id | ._index df | data column(s) | - idx_nlv = obj._index.nlevels - working_df = obj._index.to_frame(index=False) + # | id | .index df | data column(s) | + idx_nlv = obj.index.nlevels + working_df = obj.index.to_frame(index=False) working_df.columns = list(range(idx_nlv)) for i, col in enumerate(obj._data): working_df[idx_nlv + i] = obj._data[col] # 2. Set `level` as common index: - # | level | ._index df w/o level | data column(s) | + # | level | .index df w/o level | data column(s) | working_df = working_df.set_index(level) # 3. Use "leftanti" join to drop @@ -6560,11 +6557,11 @@ def _drop_rows_by_labels( # 4. Reconstruct original layout, and rename join_res._insert( - ilevel, name=join_res._index.name, value=join_res._index + ilevel, name=join_res.index.name, value=join_res.index ) midx = cudf.MultiIndex.from_frame( - join_res.iloc[:, 0:idx_nlv], names=obj._index.names + join_res.iloc[:, 0:idx_nlv], names=obj.index.names ) if isinstance(obj, cudf.Series): @@ -6596,7 +6593,7 @@ def _drop_rows_by_labels( # Join changes the index to common type, # but we need to preserve the type of # index being returned, Hence this type-cast. - res._index = res.index.astype(obj.index.dtype) + res.index = res.index.astype(obj.index.dtype) return res diff --git a/python/cudf/cudf/core/join/_join_helpers.py b/python/cudf/cudf/core/join/_join_helpers.py index 6a619945e75..05cbb4429b9 100644 --- a/python/cudf/cudf/core/join/_join_helpers.py +++ b/python/cudf/cudf/core/join/_join_helpers.py @@ -43,10 +43,10 @@ def set(self, obj: cudf.DataFrame, value: ColumnBase, validate=False): class _IndexIndexer(_Indexer): def get(self, obj: cudf.DataFrame) -> ColumnBase: - return obj._index._data[self.name] + return obj.index._data[self.name] def set(self, obj: cudf.DataFrame, value: ColumnBase, validate=False): - obj._index._data.set_by_label(self.name, value, validate=validate) + obj.index._data.set_by_label(self.name, value, validate=validate) def _match_join_keys( diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 1ef2915bc59..da999441ca3 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -373,10 +373,10 @@ def _merge_results( index: Optional[cudf.BaseIndex] if self._using_right_index: # right_index and left_on - index = left_result._index + index = left_result.index elif self._using_left_index: # left_index and right_on - index = right_result._index + index = right_result.index else: index = None @@ -400,7 +400,7 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame: # producing the input result. by: List[Any] = [] if self._using_left_index and self._using_right_index: - by.extend(result._index._data.columns) + by.extend(result.index._data.columns) if not self._using_left_index: by.extend([result._data[col.name] for col in self._left_keys]) if not self._using_right_index: @@ -408,8 +408,8 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame: if by: keep_index = self._using_left_index or self._using_right_index if keep_index: - to_sort = [*result._index._columns, *result._columns] - index_names = result._index.names + to_sort = [*result.index._columns, *result._columns] + index_names = result.index.names else: to_sort = [*result._columns] index_names = None @@ -547,4 +547,4 @@ class MergeSemi(Merge): def _merge_results(self, lhs: cudf.DataFrame, rhs: cudf.DataFrame): # semi-join result includes only lhs columns - return lhs._data, lhs._index + return lhs._data, lhs.index diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 0b44ab58f30..d4772d5b4c2 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -836,7 +836,7 @@ def get_dummies( dtype=dtype, ) result_data.update(col_enc_data) - return cudf.DataFrame._from_data(result_data, index=df._index) + return cudf.DataFrame._from_data(result_data, index=df.index) else: ser = cudf.Series(df) unique = _get_unique(column=ser._column, dummy_na=dummy_na) @@ -847,7 +847,7 @@ def get_dummies( prefix_sep=prefix_sep, dtype=dtype, ) - return cudf.DataFrame._from_data(data, index=ser._index) + return cudf.DataFrame._from_data(data, index=ser.index) def _merge_sorted( @@ -899,7 +899,7 @@ def _merge_sorted( raise ValueError("`by_index` and `ignore_index` cannot both be True") if by_index: - key_columns_indices = list(range(0, objs[0]._index.nlevels)) + key_columns_indices = list(range(0, objs[0].index.nlevels)) else: if keys is None: key_columns_indices = list(range(0, objs[0]._num_columns)) @@ -909,12 +909,12 @@ def _merge_sorted( ] if not ignore_index: key_columns_indices = [ - idx + objs[0]._index.nlevels for idx in key_columns_indices + idx + objs[0].index.nlevels for idx in key_columns_indices ] columns = [ [ - *(obj._index._data.columns if not ignore_index else ()), + *(obj.index._data.columns if not ignore_index else ()), *obj._columns, ] for obj in objs diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index c7bc97edd68..41fbf269699 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -296,7 +296,7 @@ def __getitem__(self, arg: Any) -> Union[ScalarLike, DataFrameOrSeries]: result = self._frame.index._get_row_major(self._frame, row_arg) if ( isinstance(arg, tuple) - and len(arg) == self._frame._index.nlevels + and len(arg) == self._frame.index.nlevels and not any(isinstance(x, slice) for x in arg) ): result = result.iloc[0] @@ -318,7 +318,7 @@ def __setitem__(self, key, value): and not isinstance(self._frame.index, cudf.MultiIndex) and is_scalar(value) ): - idx = self._frame._index + idx = self._frame.index if isinstance(idx, cudf.RangeIndex): if isinstance(key, int) and (key == idx[-1] + idx.step): idx_copy = cudf.RangeIndex( @@ -682,7 +682,7 @@ def _from_data( @_cudf_nvtx_annotate def __contains__(self, item): - return item in self._index + return item in self.index @classmethod @_cudf_nvtx_annotate @@ -832,7 +832,7 @@ def hasnans(self): def serialize(self): header, frames = super().serialize() - header["index"], index_frames = self._index.serialize() + header["index"], index_frames = self.index.serialize() header["index_frame_count"] = len(index_frames) # For backwards compatibility with older versions of cuDF, index # columns are placed before data columns. @@ -850,7 +850,7 @@ def deserialize(cls, header, frames): idx_typ = pickle.loads(header["index"]["type-serialized"]) index = idx_typ.deserialize(header["index"], frames[:index_nframes]) - obj._index = index + obj.index = index return obj @@ -995,7 +995,7 @@ def reindex(self, *args, **kwargs): "'index' passed as both positional and keyword argument" ) else: - index = kwargs.get("index", self._index) + index = kwargs.get("index", self.index) name = self.name or 0 series = self._reindex( @@ -1140,7 +1140,7 @@ def to_frame(self, name=None): @_cudf_nvtx_annotate def memory_usage(self, index=True, deep=False): return self._column.memory_usage + ( - self._index.memory_usage() if index else 0 + self.index.memory_usage() if index else 0 ) @_cudf_nvtx_annotate @@ -1506,7 +1506,7 @@ def _make_operands_and_index_for_binop( can_use_self_column_name = False operands = lhs._make_operands_for_binop(other, fill_value, reflect) - return operands, lhs._index, can_use_self_column_name + return operands, lhs.index, can_use_self_column_name @copy_docstring(CategoricalAccessor) # type: ignore @property @@ -1917,7 +1917,7 @@ def between(self, left, right, inclusive="both") -> Series: "Inclusive has to be either string of 'both', " "'left', 'right', or 'neither'." ) - return self._from_data({self.name: lmask & rmask}, self._index) + return self._from_data({self.name: lmask & rmask}, self.index) @_cudf_nvtx_annotate def all(self, axis=0, bool_only=None, skipna=True, **kwargs): @@ -3119,7 +3119,7 @@ def value_counts( # TODO: Remove this workaround once `observed` # parameter support is added to `groupby` res = res.reindex(self.dtype.categories).fillna(0) - res._index = res._index.astype(self.dtype) + res.index = res.index.astype(self.dtype) res.index.name = self.name @@ -3927,7 +3927,7 @@ def microsecond(self): * cudf.Scalar(1000, dtype="int32") ) + self.series._column.get_dt_field("microsecond"), - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4161,7 +4161,7 @@ def is_leap_year(self): res = libcudf.datetime.is_leap_year(self.series._column).fillna(False) return Series._from_data( ColumnAccessor({None: res}), - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4195,7 +4195,7 @@ def quarter(self): ) return Series._from_data( {None: res}, - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4299,7 +4299,7 @@ def days_in_month(self): res = libcudf.datetime.days_in_month(self.series._column) return Series._from_data( ColumnAccessor({None: res}), - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4345,7 +4345,7 @@ def is_month_end(self): last_day = libcudf.datetime.last_day_of_month(self.series._column) last_day = Series._from_data( ColumnAccessor({None: last_day}), - index=self.series._index, + index=self.series.index, name=self.series.name, ) return (self.day == last_day.dt.day).fillna(False) @@ -4395,7 +4395,7 @@ def is_quarter_start(self): result = ((day == cudf.Scalar(1)) & first_month).fillna(False) return Series._from_data( {None: result}, - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4446,7 +4446,7 @@ def is_quarter_end(self): result = ((day == last_day) & last_month).fillna(False) return Series._from_data( {None: result}, - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4481,7 +4481,7 @@ def is_year_start(self): ) == cudf.Scalar(1) return Series._from_data( {None: outcol.fillna(False)}, - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4520,7 +4520,7 @@ def is_year_end(self): result = result.fillna(False) return Series._from_data( {None: result}, - index=self.series._index, + index=self.series.index, name=self.series.name, ) @@ -4528,7 +4528,7 @@ def is_year_end(self): def _get_dt_field(self, field): out_column = self.series._column.get_dt_field(field) return Series( - data=out_column, index=self.series._index, name=self.series.name + data=out_column, index=self.series.index, name=self.series.name ) @_cudf_nvtx_annotate @@ -4565,7 +4565,7 @@ def ceil(self, freq): out_column = self.series._column.ceil(freq) return Series._from_data( - data={self.series.name: out_column}, index=self.series._index + data={self.series.name: out_column}, index=self.series.index ) @_cudf_nvtx_annotate @@ -4602,7 +4602,7 @@ def floor(self, freq): out_column = self.series._column.floor(freq) return Series._from_data( - data={self.series.name: out_column}, index=self.series._index + data={self.series.name: out_column}, index=self.series.index ) @_cudf_nvtx_annotate @@ -4642,7 +4642,7 @@ def round(self, freq): out_column = self.series._column.round(freq) return Series._from_data( - data={self.series.name: out_column}, index=self.series._index + data={self.series.name: out_column}, index=self.series.index ) @_cudf_nvtx_annotate @@ -4724,7 +4724,7 @@ def strftime(self, date_format, *args, **kwargs): dtype="str", format=date_format ) return Series( - data=str_col, index=self.series._index, name=self.series.name + data=str_col, index=self.series.index, name=self.series.name ) @copy_docstring(DatetimeIndex.tz_localize) @@ -4739,7 +4739,7 @@ def tz_localize( ) return Series._from_data( data={self.series.name: result_col}, - index=self.series._index, + index=self.series.index, ) @copy_docstring(DatetimeIndex.tz_convert) @@ -4755,7 +4755,7 @@ def tz_convert(self, tz: str | None): """ result_col = self.series._column.tz_convert(tz) return Series._from_data( - {self.series.name: result_col}, index=self.series._index + {self.series.name: result_col}, index=self.series.index ) @@ -4993,13 +4993,13 @@ def components(self): 3 0 0 35 35 656 0 0 4 37 13 12 14 234 0 0 """ # noqa: E501 - return self.series._column.components(index=self.series._index) + return self.series._column.components(index=self.series.index) @_cudf_nvtx_annotate def _get_td_field(self, field): out_column = getattr(self.series._column, field) return Series( - data=out_column, index=self.series._index, name=self.series.name + data=out_column, index=self.series.index, name=self.series.name ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a6c67d22af7..dbdb2093b72 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -69,6 +69,10 @@ def _write_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -102,6 +106,10 @@ def _write_parquet( "force_nullable_schema": force_nullable_schema, "header_version": header_version, "use_dictionary": use_dictionary, + "skip_compression": skip_compression, + "column_encoding": column_encoding, + "column_type_length": column_type_length, + "output_as_binary": output_as_binary, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -140,6 +148,12 @@ def write_to_dataset( max_page_size_rows=None, storage_options=None, force_nullable_schema=False, + header_version="1.0", + use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -204,6 +218,30 @@ def write_to_dataset( If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. + header_version : {{'1.0', '2.0'}}, default "1.0" + Controls whether to use version 1.0 or version 2.0 page headers when + encoding. Version 1.0 is more portable, but version 2.0 enables the + use of newer encoding schemes. + force_nullable_schema : bool, default False. + If True, writes all columns as `null` in schema. + If False, columns are written as `null` if they contain null values, + otherwise as `not null`. + skip_compression : set, optional, default None + If a column name is present in the set, that column will not be compressed, + regardless of the ``compression`` setting. + column_encoding : dict, optional, default None + Sets the page encoding to use on a per-column basis. The key is a column + name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or + 'USE_DEFAULT'. + column_type_length : dict, optional, default None + Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. + The key is a column name and the value is an integer. The named column + will be output as unannotated binary (i.e. the column will behave as if + ``output_as_binary`` was set). + output_as_binary : set, optional, default None + If a column name is present in the set, that column will be output as + unannotated binary, rather than the default 'UTF-8'. """ fs = ioutils._ensure_filesystem(fs, root_path, storage_options) @@ -241,6 +279,12 @@ def write_to_dataset( max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) else: @@ -262,6 +306,12 @@ def write_to_dataset( max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) return metadata @@ -906,6 +956,10 @@ def to_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, *args, **kwargs, ): @@ -955,6 +1009,12 @@ def to_parquet( return_metadata=return_metadata, storage_options=storage_options, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) partition_info = ( @@ -983,6 +1043,10 @@ def to_parquet( force_nullable_schema=force_nullable_schema, header_version=header_version, use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) else: diff --git a/python/cudf/cudf/tests/test_dlpack.py b/python/cudf/cudf/tests/test_dlpack.py index 6e34817c4fd..aafe920d3a1 100644 --- a/python/cudf/cudf/tests/test_dlpack.py +++ b/python/cudf/cudf/tests/test_dlpack.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. import itertools from contextlib import ExitStack as does_not_raise @@ -201,12 +201,7 @@ def test_to_dlpack_mixed_dtypes(): "shape", [ (0, 3), - pytest.param( - (3, 0), - marks=pytest.mark.xfail( - reason="Index information not available via from_dlpack" - ), - ), + (3, 0), (0, 0), ], ) diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index 3033a3e75e3..51287fe26a0 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -355,6 +355,14 @@ def test_json_lines_basic(json_input, engine): np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_numpy()) +@pytest.mark.filterwarnings("ignore:Using CPU") +@pytest.mark.parametrize("engine", ["auto", "cudf", "pandas"]) +def test_nonexistent_json_correct_error(engine): + json_input = "doesnotexist.json" + with pytest.raises(FileNotFoundError): + cudf.read_json(json_input, engine=engine) + + @pytest.mark.skipif( PANDAS_VERSION < PANDAS_CURRENT_SUPPORTED_VERSION, reason="warning not present in older pandas versions", diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index b2896d55b80..e32fdacd8d6 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2870,6 +2870,82 @@ def flba(i): assert_eq(expect, got) +def test_parquet_flba_round_trip(tmpdir): + def flba(i): + hasher = hashlib.sha256() + hasher.update(i.to_bytes(4, "little")) + return hasher.digest() + + # use pyarrow to write table of fixed_len_byte_array + num_rows = 200 + data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32)) + padf = pa.Table.from_arrays([data], names=["flba"]) + padf_fname = tmpdir.join("padf.parquet") + pq.write_table(padf, padf_fname) + + # round trip data with cudf + cdf = cudf.read_parquet(padf_fname) + cdf_fname = tmpdir.join("cdf.parquet") + cdf.to_parquet(cdf_fname, column_type_length={"flba": 32}) + + # now read back in with pyarrow to test it was written properly by cudf + padf2 = pq.read_table(padf_fname) + padf3 = pq.read_table(cdf_fname) + assert_eq(padf2, padf3) + assert_eq(padf2.schema[0].type, padf3.schema[0].type) + + +@pytest.mark.parametrize( + "encoding", + [ + "PLAIN", + "DICTIONARY", + "DELTA_BINARY_PACKED", + "BYTE_STREAM_SPLIT", + "USE_DEFAULT", + ], +) +def test_per_column_options(tmpdir, encoding): + pdf = pd.DataFrame({"ilist": [[1, 2, 3, 1, 2, 3]], "i1": [1]}) + cdf = cudf.from_pandas(pdf) + fname = tmpdir.join("ilist.parquet") + cdf.to_parquet( + fname, + column_encoding={"ilist.list.element": encoding}, + compression="SNAPPY", + skip_compression={"ilist.list.element"}, + ) + # DICTIONARY and USE_DEFAULT should both result in a PLAIN_DICTIONARY encoding in parquet + encoding_name = ( + "PLAIN_DICTIONARY" + if encoding == "DICTIONARY" or encoding == "USE_DEFAULT" + else encoding + ) + pf = pq.ParquetFile(fname) + fmd = pf.metadata + assert encoding_name in fmd.row_group(0).column(0).encodings + assert fmd.row_group(0).column(0).compression == "UNCOMPRESSED" + assert fmd.row_group(0).column(1).compression == "SNAPPY" + + +@pytest.mark.parametrize( + "encoding", + ["DELTA_LENGTH_BYTE_ARRAY", "DELTA_BYTE_ARRAY"], +) +def test_per_column_options_string_col(tmpdir, encoding): + pdf = pd.DataFrame({"s": ["a string"], "i1": [1]}) + cdf = cudf.from_pandas(pdf) + fname = tmpdir.join("strcol.parquet") + cdf.to_parquet( + fname, + column_encoding={"s": encoding}, + compression="SNAPPY", + ) + pf = pq.ParquetFile(fname) + fmd = pf.metadata + assert encoding in fmd.row_group(0).column(0).encodings + + def test_parquet_reader_rle_boolean(datadir): fname = datadir / "rle_boolean_encoding.parquet" diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 18e81078587..1366a0b8e84 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -306,6 +306,22 @@ If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. +skip_compression : set, optional, default None + If a column name is present in the set, that column will not be compressed, + regardless of the ``compression`` setting. +column_encoding : dict, optional, default None + Sets the page encoding to use on a per-column basis. The key is a column + name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or + 'USE_DEFAULT'. +column_type_length : dict, optional, default None + Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. + The key is a column name and the value is an integer. The named column + will be output as unannotated binary (i.e. the column will behave as if + ``output_as_binary`` was set). +output_as_binary : set, optional, default None + If a column name is present in the set, that column will be output as + unannotated binary, rather than the default 'UTF-8'. **kwargs Additional parameters will be passed to execution engines other than ``cudf``. @@ -1718,10 +1734,32 @@ def get_reader_filepath_or_buffer( if _is_local_filesystem(fs): # Doing this as `read_json` accepts a json string # path_or_data need not be a filepath like string + + # helper for checking if raw text looks like a json filename + compression_extensions = [ + ".tar", + ".tar.gz", + ".tar.bz2", + ".tar.xz", + ".gz", + ".bz2", + ".zip", + ".xz", + ".zst", + "", + ] + if len(paths): if fs.exists(paths[0]): path_or_data = paths if len(paths) > 1 else paths[0] - elif not allow_raw_text_input: + + # raise FileNotFound if path looks like json + # following pandas + # see + # https://github.com/pandas-dev/pandas/pull/46718/files#diff-472ce5fe087e67387942e1e1c409a5bc58dde9eb8a2db6877f1a45ae4974f694R724-R729 + elif not allow_raw_text_input or paths[0].lower().endswith( + tuple(f".json{c}" for c in compression_extensions) + ): raise FileNotFoundError( f"{path_or_data} could not be resolved to any files" ) diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index 3f0cfeb6d2c..3bd455a3a57 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -167,6 +167,14 @@ def set_index( pre_sorted = sorted del sorted + if divisions == "quantile": + warnings.warn( + "Using divisions='quantile' is now deprecated. " + "Please raise an issue on github if you believe " + "this feature is necessary.", + FutureWarning, + ) + if ( divisions == "quantile" or isinstance(divisions, (cudf.DataFrame, cudf.Series)) diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py index 926b7cfaf0e..f60e4ff81ef 100644 --- a/python/dask_cudf/dask_cudf/expr/_collection.py +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -15,7 +15,6 @@ from dask import config from dask.dataframe.core import is_dataframe_like -from dask.dataframe.dispatch import is_categorical_dtype import cudf @@ -82,23 +81,23 @@ def from_dict(cls, *args, **kwargs): with config.set({"dataframe.backend": "cudf"}): return DXDataFrame.from_dict(*args, **kwargs) - def sort_values( + def set_index( self, - by, + *args, + divisions=None, **kwargs, ): - # Raise if the first column is categorical, otherwise the - # upstream divisions logic may produce errors - # (See: https://github.com/rapidsai/cudf/issues/11795) - check_by = by[0] if isinstance(by, list) else by - if is_categorical_dtype(self.dtypes.get(check_by, None)): - raise NotImplementedError( - "Dask-cudf does not support sorting on categorical " - "columns when query-planning is enabled. Please use " - "the legacy API for now." + if divisions == "quantile": + divisions = None + warnings.warn( + "Ignoring divisions='quantile'. This option is now " + "deprecated. Please use the legacy API and raise an " + "issue on github if this feature is necessary." f"\n{_LEGACY_WORKAROUND}", + FutureWarning, ) - return super().sort_values(by, **kwargs) + + return super().set_index(*args, divisions=divisions, **kwargs) def groupby( self, diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index ff037b9520c..8fccaccb695 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -1,11 +1,14 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import functools +import dask_expr._shuffle as _shuffle_module +from dask_expr import new_collection from dask_expr._cumulative import CumulativeBlockwise from dask_expr._expr import Expr, VarColumns from dask_expr._reductions import Reduction, Var from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty +from dask.dataframe.dispatch import is_categorical_dtype ## ## Custom expression patching @@ -121,3 +124,25 @@ def _patched_var( Expr.var = _patched_var + + +# Temporary work-around for missing cudf + categorical support +# See: https://github.com/rapidsai/cudf/issues/11795 +# TODO: Fix RepartitionQuantiles and remove this in cudf>24.06 + +_original_get_divisions = _shuffle_module._get_divisions + + +def _patched_get_divisions(frame, other, *args, **kwargs): + # NOTE: The following two lines contains the "patch" + # (we simply convert the partitioning column to pandas) + if is_categorical_dtype(other._meta.dtype) and hasattr( + other.frame._meta, "to_pandas" + ): + other = new_collection(other).to_backend("pandas")._expr + + # Call "original" function + return _original_get_divisions(frame, other, *args, **kwargs) + + +_shuffle_module._get_divisions = _patched_get_divisions diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py index 116893891e3..65688115b59 100644 --- a/python/dask_cudf/dask_cudf/expr/_groupby.py +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -9,19 +9,21 @@ from dask.dataframe.groupby import Aggregation +from cudf.core.groupby.groupby import _deprecate_collect + ## ## Custom groupby classes ## -class Collect(SingleAggregation): +class ListAgg(SingleAggregation): @staticmethod def groupby_chunk(arg): - return arg.agg("collect") + return arg.agg(list) @staticmethod def groupby_aggregate(arg): - gb = arg.agg("collect") + gb = arg.agg(list) if gb.ndim > 1: for col in gb.columns: gb[col] = gb[col].list.concat() @@ -30,10 +32,10 @@ def groupby_aggregate(arg): return gb.list.concat() -collect_aggregation = Aggregation( - name="collect", - chunk=Collect.groupby_chunk, - agg=Collect.groupby_aggregate, +list_aggregation = Aggregation( + name="list", + chunk=ListAgg.groupby_chunk, + agg=ListAgg.groupby_aggregate, ) @@ -41,13 +43,13 @@ def _translate_arg(arg): # Helper function to translate args so that # they can be processed correctly by upstream # dask & dask-expr. Right now, the only necessary - # translation is "collect" aggregations. + # translation is list aggregations. if isinstance(arg, dict): return {k: _translate_arg(v) for k, v in arg.items()} elif isinstance(arg, list): return [_translate_arg(x) for x in arg] elif arg in ("collect", "list", list): - return collect_aggregation + return list_aggregation else: return arg @@ -84,7 +86,8 @@ def __getitem__(self, key): return g def collect(self, **kwargs): - return self._single_agg(Collect, **kwargs) + _deprecate_collect() + return self._single_agg(ListAgg, **kwargs) def aggregate(self, arg, **kwargs): return super().aggregate(_translate_arg(arg), **kwargs) @@ -96,7 +99,8 @@ def __init__(self, *args, observed=None, **kwargs): super().__init__(*args, observed=observed, **kwargs) def collect(self, **kwargs): - return self._single_agg(Collect, **kwargs) + _deprecate_collect() + return self._single_agg(ListAgg, **kwargs) def aggregate(self, arg, **kwargs): return super().aggregate(_translate_arg(arg), **kwargs) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 43ad4f0fee3..ef47ea436c7 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -15,6 +15,7 @@ from dask.utils import funcname import cudf +from cudf.core.groupby.groupby import _deprecate_collect from cudf.utils.nvtx_annotation import _dask_cudf_nvtx_annotate from dask_cudf.sorting import _deprecate_shuffle_kwarg @@ -28,7 +29,7 @@ "sum", "min", "max", - "collect", + list, "first", "last", ) @@ -164,9 +165,10 @@ def max(self, split_every=None, split_out=1): @_dask_cudf_nvtx_annotate @_check_groupby_optimized def collect(self, split_every=None, split_out=1): + _deprecate_collect() return _make_groupby_agg_call( self, - self._make_groupby_method_aggs("collect"), + self._make_groupby_method_aggs(list), split_every, split_out, ) @@ -308,9 +310,10 @@ def max(self, split_every=None, split_out=1): @_dask_cudf_nvtx_annotate @_check_groupby_optimized def collect(self, split_every=None, split_out=1): + _deprecate_collect() return _make_groupby_agg_call( self, - {self._slice: "collect"}, + {self._slice: list}, split_every, split_out, )[self._slice] @@ -472,7 +475,7 @@ def groupby_agg( This aggregation algorithm only supports the following options - * "collect" + * "list" * "count" * "first" * "last" @@ -667,8 +670,8 @@ def _redirect_aggs(arg): sum: "sum", max: "max", min: "min", - list: "collect", - "list": "collect", + "collect": list, + "list": list, } if isinstance(arg, dict): new_arg = dict() @@ -704,7 +707,7 @@ def _aggs_optimized(arg, supported: set): _global_set = set(arg) return bool(_global_set.issubset(supported)) - elif isinstance(arg, str): + elif isinstance(arg, (str, type)): return arg in supported return False @@ -783,6 +786,8 @@ def _tree_node_agg(df, gb_cols, dropna, sort, sep): agg = col.split(sep)[-1] if agg in ("count", "sum"): agg_dict[col] = ["sum"] + elif agg == "list": + agg_dict[col] = [list] elif agg in OPTIMIZED_AGGS: agg_dict[col] = [agg] else: @@ -873,8 +878,8 @@ def _finalize_gb_agg( gb.drop(columns=[sum_name], inplace=True) if "count" not in agg_list: gb.drop(columns=[count_name], inplace=True) - if "collect" in agg_list: - collect_name = _make_name((col, "collect"), sep=sep) + if list in agg_list: + collect_name = _make_name((col, "list"), sep=sep) gb[collect_name] = gb[collect_name].list.concat() # Ensure sorted keys if `sort=True` diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index 18a9e3b496f..7f8a619ae22 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -231,7 +231,6 @@ def test_set_index(nelem): dd.assert_eq(expect, got, check_index=False, check_divisions=False) -@xfail_dask_expr("missing support for divisions='quantile'") @pytest.mark.parametrize("by", ["a", "b"]) @pytest.mark.parametrize("nelem", [10, 500]) @pytest.mark.parametrize("nparts", [1, 10]) @@ -241,7 +240,8 @@ def test_set_index_quantile(nelem, nparts, by): df["b"] = np.random.choice(cudf.datasets.names, size=nelem) ddf = dd.from_pandas(df, npartitions=nparts) - got = ddf.set_index(by, divisions="quantile") + with pytest.warns(FutureWarning, match="deprecated"): + got = ddf.set_index(by, divisions="quantile") expect = df.sort_values(by=by).set_index(by) dd.assert_eq(got, expect) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index dc279bfa690..cf916b713b2 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -9,6 +9,7 @@ from dask.utils_test import hlg_layer import cudf +from cudf.testing._utils import expect_warning_if import dask_cudf from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized @@ -47,7 +48,13 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) +# NOTE: We only want to test aggregation "methods" here, +# so we need to leave out `list`. We also include a +# deprecation check for "collect". +@pytest.mark.parametrize( + "aggregation", + sorted(tuple(set(OPTIMIZED_AGGS) - {list}) + ("collect",)), +) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) @@ -62,8 +69,9 @@ def test_groupby_basic(series, aggregation, pdf): check_dtype = aggregation != "count" - expect = getattr(gdf_grouped, aggregation)() - actual = getattr(ddf_grouped, aggregation)() + with expect_warning_if(aggregation == "collect"): + expect = getattr(gdf_grouped, aggregation)() + actual = getattr(ddf_grouped, aggregation)() if not QUERY_PLANNING_ON: assert_cudf_groupby_layers(actual) diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 9d9fe297248..9bbbbc79561 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -10,7 +10,7 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import QUERY_PLANNING_ON, xfail_dask_expr +from dask_cudf.tests.utils import xfail_dask_expr @pytest.mark.parametrize("ascending", [True, False]) @@ -20,12 +20,7 @@ "a", "b", "c", - pytest.param( - "d", - marks=xfail_dask_expr( - "Possible segfault when sorting by categorical column.", - ), - ), + "d", ["a", "b"], ["c", "d"], ], @@ -47,20 +42,6 @@ def test_sort_values(nelem, nparts, by, ascending): dd.assert_eq(got, expect, check_index=False) -@pytest.mark.parametrize("by", ["b", ["b", "a"]]) -def test_sort_values_categorical_raises(by): - df = cudf.DataFrame() - df["a"] = np.ascontiguousarray(np.arange(10)[::-1]) - df["b"] = df["a"].astype("category") - ddf = dd.from_pandas(df, npartitions=10) - - if QUERY_PLANNING_ON: - with pytest.raises( - NotImplementedError, match="sorting on categorical" - ): - ddf.sort_values(by=by) - - @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) def test_sort_values_single_partition(by, ascending):