diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 2811711d58c..7999ada9282 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -925,6 +925,11 @@ if(CUDF_BUILD_STREAMS_TEST_UTIL) add_library( ${_tgt} SHARED src/utilities/stacktrace.cpp tests/utilities/identify_stream_usage.cpp ) + if(CUDF_USE_PER_THREAD_DEFAULT_STREAM) + target_compile_definitions( + ${_tgt} PUBLIC CUDA_API_PER_THREAD_DEFAULT_STREAM CUDF_USE_PER_THREAD_DEFAULT_STREAM + ) + endif() set_target_properties( ${_tgt} diff --git a/cpp/src/interop/to_arrow.cu b/cpp/src/interop/to_arrow.cu index 2b3aa2f08f1..62b85891adb 100644 --- a/cpp/src/interop/to_arrow.cu +++ b/cpp/src/interop/to_arrow.cu @@ -376,7 +376,12 @@ std::shared_ptr dispatch_to_arrow::operator()( metadata.children_meta.empty() ? std::vector{{}, {}} : metadata.children_meta; auto child_arrays = fetch_child_array(input_view, children_meta, ar_mr, stream); if (child_arrays.empty()) { - return std::make_shared(arrow::list(arrow::null()), 0, nullptr, nullptr); + // Empty list will have only one value in offset of 4 bytes + auto tmp_offset_buffer = allocate_arrow_buffer(sizeof(int32_t), ar_mr); + memset(tmp_offset_buffer->mutable_data(), 0, sizeof(int32_t)); + + return std::make_shared( + arrow::list(arrow::null()), 0, std::move(tmp_offset_buffer), nullptr); } auto offset_buffer = child_arrays[0]->data()->buffers[1]; diff --git a/java/README.md b/java/README.md index 2d8e2190fee..0d9e060b7cd 100644 --- a/java/README.md +++ b/java/README.md @@ -51,9 +51,13 @@ CUDA 11.0: ## Build From Source Build [libcudf](../cpp) first, and make sure the JDK is installed and available. Specify -the cmake option `-DCUDF_USE_ARROW_STATIC=ON -DCUDF_ENABLE_ARROW_S3=OFF` when building so -that Apache Arrow is linked statically to libcudf, as this will help create a jar that -does not require Arrow and its dependencies to be available in the runtime environment. +the following cmake options to the libcudf build: +``` +-DCUDF_LARGE_STRINGS_DISABLED=ON -DCUDF_USE_ARROW_STATIC=ON -DCUDF_ENABLE_ARROW_S3=OFF +``` +These options: +- Disable large string support, see https://github.com/rapidsai/cudf/issues/16215 +- Statically link Arrow to libcudf to remove Arrow as a runtime dependency. After building libcudf, the Java bindings can be built via Maven, e.g.: ``` diff --git a/java/ci/build-in-docker.sh b/java/ci/build-in-docker.sh index 72b1742f7cb..5a429bdc739 100755 --- a/java/ci/build-in-docker.sh +++ b/java/ci/build-in-docker.sh @@ -1,7 +1,7 @@ #!/bin/bash # -# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -58,6 +58,7 @@ cmake .. -G"${CMAKE_GENERATOR}" \ -DCMAKE_INSTALL_PREFIX=$INSTALL_PREFIX \ -DCUDA_STATIC_RUNTIME=$ENABLE_CUDA_STATIC_RUNTIME \ -DUSE_NVTX=$ENABLE_NVTX \ + -DCUDF_LARGE_STRINGS_DISABLED=ON \ -DCUDF_USE_ARROW_STATIC=ON \ -DCUDF_ENABLE_ARROW_S3=OFF \ -DBUILD_TESTS=$BUILD_CPP_TESTS \ diff --git a/python/cudf/cudf/_lib/io/utils.pxd b/python/cudf/cudf/_lib/io/utils.pxd index 252d986843a..680a87c789e 100644 --- a/python/cudf/cudf/_lib/io/utils.pxd +++ b/python/cudf/cudf/_lib/io/utils.pxd @@ -16,6 +16,10 @@ cdef source_info make_source_info(list src) except* cdef sink_info make_sinks_info( list src, vector[unique_ptr[data_sink]] & data) except* cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except* +cdef add_df_col_struct_names( + df, + child_names_dict +) cdef update_struct_field_names( table, vector[column_name_info]& schema_info) diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index 1d7c56888d9..58956b9e9b7 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -147,10 +147,37 @@ cdef cppclass iobase_data_sink(data_sink): return buf.tell() +cdef add_df_col_struct_names(df, child_names_dict): + for name, child_names in child_names_dict.items(): + col = df._data[name] + + df._data[name] = update_col_struct_field_names(col, child_names) + + +cdef update_col_struct_field_names(Column col, child_names): + if col.children: + children = list(col.children) + for i, (child, names) in enumerate(zip(children, child_names.values())): + children[i] = update_col_struct_field_names( + child, + names + ) + col.set_base_children(tuple(children)) + + if isinstance(col.dtype, StructDtype): + col = col._rename_fields( + child_names.keys() + ) + + return col + + cdef update_struct_field_names( table, vector[column_name_info]& schema_info ): + # Deprecated, remove in favor of add_col_struct_names + # when a reader is ported to pylibcudf for i, (name, col) in enumerate(table._data.items()): table._data[name] = update_column_struct_field_names( col, schema_info[i] diff --git a/python/cudf/cudf/_lib/json.pyx b/python/cudf/cudf/_lib/json.pyx index 22e34feb547..9c646e3357b 100644 --- a/python/cudf/cudf/_lib/json.pyx +++ b/python/cudf/cudf/_lib/json.pyx @@ -8,26 +8,16 @@ import cudf from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool -from libcpp.map cimport map -from libcpp.string cimport string -from libcpp.utility cimport move -from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types -from cudf._lib.io.utils cimport make_source_info, update_struct_field_names -from cudf._lib.pylibcudf.libcudf.io.json cimport ( - json_reader_options, - json_recovery_mode_t, - read_json as libcudf_read_json, - schema_element, -) -from cudf._lib.pylibcudf.libcudf.io.types cimport ( - compression_type, - table_with_metadata, -) -from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type +from cudf._lib.io.utils cimport add_df_col_struct_names +from cudf._lib.pylibcudf.io.types cimport compression_type +from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t +from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type +from cudf._lib.pylibcudf.libcudf.types cimport data_type, type_id +from cudf._lib.pylibcudf.types cimport DataType from cudf._lib.types cimport dtype_to_data_type -from cudf._lib.utils cimport data_from_unique_ptr +from cudf._lib.utils cimport data_from_pylibcudf_io import cudf._lib.pylibcudf as plc @@ -62,6 +52,7 @@ cpdef read_json(object filepaths_or_buffers, # If input data is a JSON string (or StringIO), hold a reference to # the encoded memoryview externally to ensure the encoded buffer # isn't destroyed before calling libcudf `read_json()` + for idx in range(len(filepaths_or_buffers)): if isinstance(filepaths_or_buffers[idx], io.StringIO): filepaths_or_buffers[idx] = \ @@ -71,17 +62,7 @@ cpdef read_json(object filepaths_or_buffers, filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode() # Setup arguments - cdef vector[data_type] c_dtypes_list - cdef map[string, schema_element] c_dtypes_schema_map cdef cudf_io_types.compression_type c_compression - # Determine byte read offsets if applicable - cdef size_type c_range_offset = ( - byte_range[0] if byte_range is not None else 0 - ) - cdef size_type c_range_size = ( - byte_range[1] if byte_range is not None else 0 - ) - cdef bool c_lines = lines if compression is not None: if compression == 'gzip': @@ -94,56 +75,50 @@ cpdef read_json(object filepaths_or_buffers, c_compression = cudf_io_types.compression_type.AUTO else: c_compression = cudf_io_types.compression_type.NONE - is_list_like_dtypes = False + + processed_dtypes = None + if dtype is False: raise ValueError("False value is unsupported for `dtype`") elif dtype is not True: + processed_dtypes = [] if isinstance(dtype, abc.Mapping): for k, v in dtype.items(): - c_dtypes_schema_map[str(k).encode()] = \ - _get_cudf_schema_element_from_dtype(v) + # Make sure keys are string + k = str(k) + lib_type, child_types = _get_cudf_schema_element_from_dtype(v) + processed_dtypes.append((k, lib_type, child_types)) elif isinstance(dtype, abc.Collection): - is_list_like_dtypes = True - c_dtypes_list.reserve(len(dtype)) for col_dtype in dtype: - c_dtypes_list.push_back( - _get_cudf_data_type_from_dtype( - col_dtype)) + processed_dtypes.append( + # Ignore child columns since we cannot specify their dtypes + # when passing a list + _get_cudf_schema_element_from_dtype(col_dtype)[0] + ) else: raise TypeError("`dtype` must be 'list like' or 'dict'") - cdef json_reader_options opts = move( - json_reader_options.builder(make_source_info(filepaths_or_buffers)) - .compression(c_compression) - .lines(c_lines) - .byte_range_offset(c_range_offset) - .byte_range_size(c_range_size) - .recovery_mode(_get_json_recovery_mode(on_bad_lines)) - .build() + table_w_meta = plc.io.json.read_json( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + lines, + byte_range_offset = byte_range[0] if byte_range is not None else 0, + byte_range_size = byte_range[1] if byte_range is not None else 0, + keep_quotes = keep_quotes, + mixed_types_as_string = mixed_types_as_string, + prune_columns = prune_columns, + recovery_mode = _get_json_recovery_mode(on_bad_lines) ) - if is_list_like_dtypes: - opts.set_dtypes(c_dtypes_list) - else: - opts.set_dtypes(c_dtypes_schema_map) - - opts.enable_keep_quotes(keep_quotes) - opts.enable_mixed_types_as_string(mixed_types_as_string) - opts.enable_prune_columns(prune_columns) - - # Read JSON - cdef cudf_io_types.table_with_metadata c_result - with nogil: - c_result = move(libcudf_read_json(opts)) - - meta_names = [info.name.decode() for info in c_result.metadata.schema_info] - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=meta_names - )) - - update_struct_field_names(df, c_result.metadata.schema_info) + df = cudf.DataFrame._from_data( + *data_from_pylibcudf_io( + table_w_meta + ) + ) + # Post-processing to add in struct column names + add_df_col_struct_names(df, table_w_meta.child_names) return df @@ -192,28 +167,32 @@ def write_json( ) -cdef schema_element _get_cudf_schema_element_from_dtype(object dtype) except *: - cdef schema_element s_element - cdef data_type lib_type +cdef _get_cudf_schema_element_from_dtype(object dtype) except *: dtype = cudf.dtype(dtype) if isinstance(dtype, cudf.CategoricalDtype): raise NotImplementedError( "CategoricalDtype as dtype is not yet " "supported in JSON reader" ) - lib_type = dtype_to_data_type(dtype) - s_element.type = lib_type + + lib_type = DataType.from_libcudf(dtype_to_data_type(dtype)) + child_types = [] + if isinstance(dtype, cudf.StructDtype): for name, child_type in dtype.fields.items(): - s_element.child_types[name.encode()] = \ + child_lib_type, grandchild_types = \ _get_cudf_schema_element_from_dtype(child_type) + child_types.append((name, child_lib_type, grandchild_types)) elif isinstance(dtype, cudf.ListDtype): - s_element.child_types["offsets".encode()] = \ - _get_cudf_schema_element_from_dtype(cudf.dtype("int32")) - s_element.child_types["element".encode()] = \ + child_lib_type, grandchild_types = \ _get_cudf_schema_element_from_dtype(dtype.element_type) - return s_element + child_types = [ + ("offsets", DataType.from_libcudf(data_type(type_id.INT32)), []), + ("element", child_lib_type, grandchild_types) + ] + + return lib_type, child_types cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pxd b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd index a91d574131f..f7f733a493d 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/json.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd @@ -1,11 +1,30 @@ # Copyright (c) 2024, NVIDIA CORPORATION. - from libcpp cimport bool -from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata +from cudf._lib.pylibcudf.io.types cimport ( + SinkInfo, + SourceInfo, + TableWithMetadata, + compression_type, +) +from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t from cudf._lib.pylibcudf.libcudf.types cimport size_type +cpdef TableWithMetadata read_json( + SourceInfo source_info, + list dtypes = *, + compression_type compression = *, + bool lines = *, + size_type byte_range_offset = *, + size_type byte_range_size = *, + bool keep_quotes = *, + bool mixed_types_as_string = *, + bool prune_columns = *, + json_recovery_mode_t recovery_mode = *, +) + + cpdef void write_json( SinkInfo sink_info, TableWithMetadata tbl, diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pyx b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx index 7530eba3803..354cb4981de 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/json.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx @@ -1,16 +1,130 @@ # Copyright (c) 2024, NVIDIA CORPORATION. - from libcpp cimport bool from libcpp.limits cimport numeric_limits +from libcpp.map cimport map from libcpp.string cimport string +from libcpp.utility cimport move +from libcpp.vector cimport vector -from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata +from cudf._lib.pylibcudf.io.types cimport ( + SinkInfo, + SourceInfo, + TableWithMetadata, +) from cudf._lib.pylibcudf.libcudf.io.json cimport ( + json_reader_options, + json_recovery_mode_t, json_writer_options, + read_json as cpp_read_json, + schema_element, write_json as cpp_write_json, ) -from cudf._lib.pylibcudf.libcudf.io.types cimport table_metadata -from cudf._lib.pylibcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.io.types cimport ( + compression_type, + table_metadata, + table_with_metadata, +) +from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type +from cudf._lib.pylibcudf.types cimport DataType + + +cdef map[string, schema_element] _generate_schema_map(list dtypes): + cdef map[string, schema_element] schema_map + cdef schema_element s_elem + cdef string c_name + + for name, dtype, child_dtypes in dtypes: + if not (isinstance(name, str) and + isinstance(dtype, DataType) and + isinstance(child_dtypes, list)): + + raise ValueError("Must pass a list of a tuple containing " + "(column_name, column_dtype, list of child_dtypes)") + + c_name = name.encode() + + s_elem.type = (dtype).c_obj + s_elem.child_types = _generate_schema_map(child_dtypes) + + schema_map[c_name] = s_elem + return schema_map + + +cpdef TableWithMetadata read_json( + SourceInfo source_info, + list dtypes = None, + compression_type compression = compression_type.AUTO, + bool lines = False, + size_type byte_range_offset = 0, + size_type byte_range_size = 0, + bool keep_quotes = False, + bool mixed_types_as_string = False, + bool prune_columns = False, + json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, +): + """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo object to read the JSON file from. + dtypes : list, default None + Set data types for the columns in the JSON file. + + Each element of the list has the format + (column_name, column_dtype, list of child dtypes), where + the list of child dtypes is an empty list if the child is not + a nested type (list or struct dtype), and is of format + (column_child_name, column_child_type, list of grandchild dtypes). + compression_type: CompressionType, default CompressionType.AUTO + The compression format of the JSON source. + byte_range_offset : size_type, default 0 + Number of bytes to skip from source start. + byte_range_size : size_type, default 0 + Number of bytes to read. By default, will read all bytes. + keep_quotes : bool, default False + Whether the reader should keep quotes of string values. + prune_columns : bool, default False + Whether to only read columns specified in dtypes. + recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL + Whether to raise an error or set corresponding values to null + when encountering an invalid JSON line. + + Returns + ------- + TableWithMetadata + The Table and its corresponding metadata (column names) that were read in. + """ + cdef vector[data_type] types_vec + cdef json_reader_options opts = move( + json_reader_options.builder(source_info.c_obj) + .compression(compression) + .lines(lines) + .byte_range_offset(byte_range_offset) + .byte_range_size(byte_range_size) + .recovery_mode(recovery_mode) + .build() + ) + + if dtypes is not None: + if isinstance(dtypes[0], tuple): + opts.set_dtypes(move(_generate_schema_map(dtypes))) + else: + for dtype in dtypes: + types_vec.push_back((dtype).c_obj) + opts.set_dtypes(types_vec) + + opts.enable_keep_quotes(keep_quotes) + opts.enable_mixed_types_as_string(mixed_types_as_string) + opts.enable_prune_columns(prune_columns) + + # Read JSON + cdef table_with_metadata c_result + + with nogil: + c_result = move(cpp_read_json(opts)) + + return TableWithMetadata.from_libcudf(c_result) cpdef void write_json( diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd index 88daf54f33b..ab223c16a72 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd @@ -28,6 +28,11 @@ cdef class TableWithMetadata: cdef vector[column_name_info] _make_column_info(self, list column_names) + cdef list _make_columns_list(self, dict child_dict) + + @staticmethod + cdef dict _parse_col_names(vector[column_name_info] infos) + @staticmethod cdef TableWithMetadata from_libcudf(table_with_metadata& tbl) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index f94e20970a4..df0b729b711 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -22,6 +22,11 @@ import errno import io import os +from cudf._lib.pylibcudf.libcudf.io.json import \ + json_recovery_mode_t as JSONRecoveryMode # no-cython-lint +from cudf._lib.pylibcudf.libcudf.io.types import \ + compression_type as CompressionType # no-cython-lint + cdef class TableWithMetadata: """A container holding a table and its associated metadata @@ -69,16 +74,44 @@ cdef class TableWithMetadata: """ return self.tbl.columns() - @property - def column_names(self): + cdef list _make_columns_list(self, dict child_dict): + cdef list names = [] + for child in child_dict: + grandchildren = self._make_columns_list(child_dict[child]) + names.append((child, grandchildren)) + return names + + def column_names(self, include_children=False): """ Return a list containing the column names of the table """ cdef list names = [] + cdef str name + cdef dict child_names = self.child_names for col_info in self.metadata.schema_info: - # TODO: Handle nesting (columns with child columns) - assert col_info.children.size() == 0, "Child column names are not handled!" - names.append(col_info.name.decode()) + name = col_info.name.decode() + if include_children: + children = self._make_columns_list(child_names[name]) + names.append((name, children)) + else: + names.append(name) + return names + + @property + def child_names(self): + """ + Return a dictionary mapping the names of columns with children + to the names of their child columns + """ + return TableWithMetadata._parse_col_names(self.metadata.schema_info) + + @staticmethod + cdef dict _parse_col_names(vector[column_name_info] infos): + cdef dict child_names = dict() + cdef dict names = dict() + for col_info in infos: + child_names = TableWithMetadata._parse_col_names(col_info.children) + names[col_info.name.decode()] = child_names return names @staticmethod @@ -137,6 +170,15 @@ cdef class SourceInfo: cdef vector[host_buffer] c_host_buffers cdef const unsigned char[::1] c_buffer cdef bint empty_buffer = False + cdef list new_sources = [] + + if isinstance(sources[0], io.StringIO): + for buffer in sources: + if not isinstance(buffer, io.StringIO): + raise ValueError("All sources must be of the same type!") + new_sources.append(buffer.read().encode()) + sources = new_sources + if isinstance(sources[0], bytes): empty_buffer = True for buffer in sources: @@ -156,7 +198,10 @@ cdef class SourceInfo: c_buffer.shape[0])) else: raise ValueError("Sources must be a list of str/paths, " - "bytes, io.BytesIO, or a Datasource") + "bytes, io.BytesIO, io.StringIO, or a Datasource") + + if empty_buffer is True: + c_host_buffers.push_back(host_buffer(NULL, 0)) self.c_obj = source_info(c_host_buffers) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt index 6c66d01ca57..699e85ce567 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt @@ -22,4 +22,5 @@ rapids_cython_create_modules( SOURCE_FILES "${cython_sources}" LINKED_LIBRARIES "${linked_libraries}" ASSOCIATED_TARGETS cudf MODULE_PREFIX cpp ) +add_subdirectory(io) add_subdirectory(strings) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt new file mode 100644 index 00000000000..6831063ecb9 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt @@ -0,0 +1,26 @@ +# ============================================================================= +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +set(cython_sources json.pyx types.pyx) + +set(linked_libraries cudf::cudf) + +rapids_cython_create_modules( + CXX + SOURCE_FILES "${cython_sources}" + LINKED_LIBRARIES "${linked_libraries}" ASSOCIATED_TARGETS cudf MODULE_PREFIX cpp_io_ +) + +set(targets_using_arrow_headers cpp_io_json cpp_io_types) +link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/json.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/json.pxd index 2e50cccd132..86621ae184f 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/json.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/json.pxd @@ -1,6 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t +from libc.stdint cimport int32_t, uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport shared_ptr, unique_ptr @@ -19,9 +19,9 @@ cdef extern from "cudf/io/json.hpp" \ data_type type map[string, schema_element] child_types - cdef enum json_recovery_mode_t: - FAIL "cudf::io::json_recovery_mode_t::FAIL" - RECOVER_WITH_NULL "cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL" + cpdef enum class json_recovery_mode_t(int32_t): + FAIL + RECOVER_WITH_NULL cdef cppclass json_reader_options: json_reader_options() except + diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/json.pyx b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/json.pyx new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pyx new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/lists_column_view.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/lists_column_view.pxd index fd21e7b334b..8917a6ac899 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/lists_column_view.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/lists_column_view.pxd @@ -10,7 +10,9 @@ from cudf._lib.pylibcudf.libcudf.types cimport size_type cdef extern from "cudf/lists/lists_column_view.hpp" namespace "cudf" nogil: cdef cppclass lists_column_view(column_view): lists_column_view() except + + lists_column_view(const lists_column_view& lists_column) except + lists_column_view(const column_view& lists_column) except + + lists_column_view& operator=(const lists_column_view&) except + column_view parent() except + column_view offsets() except + column_view child() except + diff --git a/python/cudf/cudf/_lib/utils.pyx b/python/cudf/cudf/_lib/utils.pyx index de6b9f690b6..f136cd997a7 100644 --- a/python/cudf/cudf/_lib/utils.pyx +++ b/python/cudf/cudf/_lib/utils.pyx @@ -322,7 +322,7 @@ cdef data_from_pylibcudf_io(tbl_with_meta): """ return _data_from_columns( columns=[Column.from_pylibcudf(plc) for plc in tbl_with_meta.columns], - column_names=tbl_with_meta.column_names, + column_names=tbl_with_meta.column_names(include_children=False), index_names=None ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index b249410c2e4..3e5ff9c18b5 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -462,6 +462,10 @@ def _setitem_tuple_arg(self, key, value): self._frame[col].loc[key[0]] = value[i] +class _DataFrameAtIndexer(_DataFrameLocIndexer): + pass + + class _DataFrameIlocIndexer(_DataFrameIndexer): """ For selection by index. @@ -584,6 +588,10 @@ def _setitem_tuple_arg(self, key, value): self._frame[col].iloc[key[0]] = value[i] +class _DataFrameiAtIndexer(_DataFrameIlocIndexer): + pass + + class DataFrame(IndexedFrame, Serializable, GetAttrGetItemMixin): """ A GPU Dataframe object. @@ -2581,14 +2589,14 @@ def iat(self): """ Alias for ``DataFrame.iloc``; provided for compatibility with Pandas. """ - return self.iloc + return _DataFrameiAtIndexer(self) @property def at(self): """ Alias for ``DataFrame.loc``; provided for compatibility with Pandas. """ - return self.loc + return _DataFrameAtIndexer(self) @property # type: ignore @_external_only_api( diff --git a/python/cudf/cudf/pandas/_wrappers/pandas.py b/python/cudf/cudf/pandas/_wrappers/pandas.py index a64bf7772fe..dd6f6fe76ba 100644 --- a/python/cudf/cudf/pandas/_wrappers/pandas.py +++ b/python/cudf/cudf/pandas/_wrappers/pandas.py @@ -775,6 +775,18 @@ def Index__new__(cls, *args, **kwargs): pd.core.indexing._LocIndexer, ) +_AtIndexer = make_intermediate_proxy_type( + "_AtIndexer", + cudf.core.dataframe._DataFrameAtIndexer, + pd.core.indexing._AtIndexer, +) + +_iAtIndexer = make_intermediate_proxy_type( + "_iAtIndexer", + cudf.core.dataframe._DataFrameiAtIndexer, + pd.core.indexing._iAtIndexer, +) + FixedForwardWindowIndexer = make_final_proxy_type( "FixedForwardWindowIndexer", _Unusable, diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index d41e6c720bf..46603ff32b8 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -8,13 +8,14 @@ import pytest from cudf._lib import pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType def metadata_from_arrow_type( pa_type: pa.Array, name: str = "", ) -> plc.interop.ColumnMetadata | None: - metadata = plc.interop.ColumnMetadata(name) # None + metadata = plc.interop.ColumnMetadata(name) if pa.types.is_list(pa_type): child_meta = [plc.interop.ColumnMetadata("offsets")] for i in range(pa_type.num_fields): @@ -39,9 +40,25 @@ def metadata_from_arrow_type( def assert_column_eq( - lhs: pa.Array | plc.Column, rhs: pa.Array | plc.Column + lhs: pa.Array | plc.Column, + rhs: pa.Array | plc.Column, + check_field_nullability=True, ) -> None: - """Verify that a pylibcudf array and PyArrow array are equal.""" + """Verify that a pylibcudf array and PyArrow array are equal. + + Parameters + ---------- + lhs: Union[pa.Array, plc.Column] + The array with the expected values + rhs: Union[pa.Array, plc.Column] + The array to check + check_field_nullability: + For list/struct dtypes, whether to check if the nullable attributes + on child fields are equal. + + Useful for checking roundtripping of lossy formats like JSON that may not + preserve this information. + """ # Nested types require children metadata to be passed to the conversion function. if isinstance(lhs, (pa.Array, pa.ChunkedArray)) and isinstance( rhs, plc.Column @@ -65,6 +82,33 @@ def assert_column_eq( if isinstance(rhs, pa.ChunkedArray): rhs = rhs.combine_chunks() + def _make_fields_nullable(typ): + new_fields = [] + for i in range(typ.num_fields): + child_field = typ.field(i) + if not child_field.nullable: + child_type = child_field.type + if isinstance(child_field.type, (pa.StructType, pa.ListType)): + child_type = _make_fields_nullable(child_type) + new_fields.append( + pa.field(child_field.name, child_type, nullable=True) + ) + else: + new_fields.append(child_field) + + if isinstance(typ, pa.StructType): + return pa.struct(new_fields) + elif isinstance(typ, pa.ListType): + return pa.list_(new_fields[0]) + return typ + + if not check_field_nullability: + rhs_type = _make_fields_nullable(rhs.type) + rhs = rhs.cast(rhs_type) + + lhs_type = _make_fields_nullable(lhs.type) + lhs = rhs.cast(lhs_type) + assert lhs.equals(rhs) @@ -78,20 +122,24 @@ def assert_table_eq(pa_table: pa.Table, plc_table: plc.Table) -> None: def assert_table_and_meta_eq( - plc_table_w_meta: plc.io.types.TableWithMetadata, pa_table: pa.Table + pa_table: pa.Table, + plc_table_w_meta: plc.io.types.TableWithMetadata, + check_field_nullability=True, ) -> None: """Verify that the pylibcudf TableWithMetadata and PyArrow table are equal""" plc_table = plc_table_w_meta.tbl plc_shape = (plc_table.num_rows(), plc_table.num_columns()) - assert plc_shape == pa_table.shape + assert ( + plc_shape == pa_table.shape + ), f"{plc_shape} is not equal to {pa_table.shape}" for plc_col, pa_col in zip(plc_table.columns(), pa_table.columns): - assert_column_eq(plc_col, pa_col) + assert_column_eq(pa_col, plc_col, check_field_nullability) # Check column name equality - assert plc_table_w_meta.column_names == pa_table.column_names + assert plc_table_w_meta.column_names() == pa_table.column_names def cudf_raises(expected_exception: BaseException, *args, **kwargs): @@ -182,4 +230,26 @@ def sink_to_str(sink): + DEFAULT_PA_STRUCT_TESTING_TYPES ) +# Map pylibcudf compression types to pandas ones +# Not all compression types map cleanly, read the comments to learn more! +# If a compression type is unsupported, it maps to False. + +COMPRESSION_TYPE_TO_PANDAS = { + CompressionType.NONE: None, + # Users of this dict will have to special case + # AUTO + CompressionType.AUTO: None, + CompressionType.GZIP: "gzip", + CompressionType.BZIP2: "bz2", + CompressionType.ZIP: "zip", + CompressionType.XZ: "xz", + CompressionType.ZSTD: "zstd", + # Unsupported + CompressionType.ZLIB: False, + CompressionType.LZ4: False, + CompressionType.LZO: False, + # These only work for parquet + CompressionType.SNAPPY: "snappy", + CompressionType.BROTLI: "brotli", +} ALL_PA_TYPES = DEFAULT_PA_TYPES diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index e4760ea7ac8..39832eb4bba 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -121,6 +121,11 @@ def source_or_sink(request, tmp_path): return fp_or_buf() +@pytest.fixture(params=[opt for opt in plc.io.types.CompressionType]) +def compression_type(request): + return request.param + + @pytest.fixture( scope="session", params=[opt for opt in plc.types.Interpolation] ) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_avro.py b/python/cudf/cudf/pylibcudf_tests/io/test_avro.py index d6cd86768cd..061d6792ce3 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_avro.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_avro.py @@ -120,4 +120,4 @@ def test_read_avro(avro_dtypes, avro_dtype_data, row_opts, columns, nullable): if columns != []: expected = expected.select(columns) - assert_table_and_meta_eq(res, expected) + assert_table_and_meta_eq(expected, res) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py index d6b8bfa6976..c13eaf40625 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_json.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -1,11 +1,49 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import io +import pandas as pd import pyarrow as pa import pytest -from utils import sink_to_str +from utils import ( + COMPRESSION_TYPE_TO_PANDAS, + assert_table_and_meta_eq, + sink_to_str, +) import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType + + +def make_json_source(path_or_buf, pa_table, **kwargs): + """ + Uses pandas to write a pyarrow Table to a JSON file. + + The caller is responsible for making sure that no arguments + unsupported by pandas are passed in. + """ + df = pa_table.to_pandas() + if "compression" in kwargs: + kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ + kwargs["compression"] + ] + df.to_json(path_or_buf, orient="records", **kwargs) + if isinstance(path_or_buf, io.IOBase): + path_or_buf.seek(0) + return path_or_buf + + +def write_json_bytes(source, json_str): + """ + Write a JSON string to the source + """ + if not isinstance(source, io.IOBase): + with open(source, "w") as source_f: + source_f.write(json_str) + else: + if isinstance(source, io.BytesIO): + json_str = json_str.encode("utf-8") + source.write(json_str) + source.seek(0) @pytest.mark.parametrize("rows_per_chunk", [8, 100]) @@ -114,3 +152,238 @@ def test_write_json_bool_opts(true_value, false_value): pd_result = pd_result.replace("false", false_value) assert str_result == pd_result + + +@pytest.mark.parametrize("lines", [True, False]) +def test_read_json_basic( + table_data, source_or_sink, lines, compression_type, request +): + if compression_type in { + # Not supported by libcudf + CompressionType.SNAPPY, + CompressionType.XZ, + CompressionType.ZSTD, + # Not supported by pandas + # TODO: find a way to test these + CompressionType.BROTLI, + CompressionType.LZ4, + CompressionType.LZO, + CompressionType.ZLIB, + }: + pytest.skip("unsupported compression type by pandas/libcudf") + + # can't compress non-binary data with pandas + if isinstance(source_or_sink, io.StringIO): + compression_type = CompressionType.NONE + + _, pa_table = table_data + + source = make_json_source( + source_or_sink, pa_table, lines=lines, compression=compression_type + ) + + request.applymarker( + pytest.mark.xfail( + condition=( + len(pa_table) > 0 + and compression_type + not in {CompressionType.NONE, CompressionType.AUTO} + ), + # note: wasn't able to narrow down the specific types that were failing + # seems to be a little non-deterministic, but always fails with + # cudaErrorInvalidValue invalid argument + reason="libcudf json reader crashes on compressed non empty table_data", + ) + ) + + if isinstance(source, io.IOBase): + source.seek(0) + + res = plc.io.json.read_json( + plc.io.SourceInfo([source]), + compression=compression_type, + lines=lines, + ) + + # Adjustments to correct for the fact orient=records is lossy + # and doesn't + # 1) preserve colnames when zero rows in table + # 2) preserve struct nullability + # 3) differentiate int64/uint64 + if len(pa_table) == 0: + pa_table = pa.table([]) + + new_fields = [] + for i in range(len(pa_table.schema)): + curr_field = pa_table.schema.field(i) + if curr_field.type == pa.uint64(): + try: + curr_field = curr_field.with_type(pa.int64()) + except OverflowError: + # There will be no confusion, values are too large + # for int64 anyways + pass + new_fields.append(curr_field) + + pa_table = pa_table.cast(pa.schema(new_fields)) + + # Convert non-nullable struct fields to nullable fields + # since nullable=False cannot roundtrip through orient='records' + # JSON format + assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) + + +def test_read_json_dtypes(table_data, source_or_sink): + # Simple test for dtypes where we read in + # all numeric data as floats + _, pa_table = table_data + source = make_json_source( + source_or_sink, + pa_table, + lines=True, + ) + + dtypes = [] + new_fields = [] + for i in range(len(pa_table.schema)): + field = pa_table.schema.field(i) + child_types = [] + + def get_child_types(typ): + typ_child_types = [] + for i in range(typ.num_fields): + curr_field = typ.field(i) + typ_child_types.append( + ( + curr_field.name, + curr_field.type, + get_child_types(curr_field.type), + ) + ) + return typ_child_types + + plc_type = plc.interop.from_arrow(field.type) + if pa.types.is_integer(field.type) or pa.types.is_unsigned_integer( + field.type + ): + plc_type = plc.interop.from_arrow(pa.float64()) + field = field.with_type(pa.float64()) + + dtypes.append((field.name, plc_type, child_types)) + + new_fields.append(field) + + new_schema = pa.schema(new_fields) + + res = plc.io.json.read_json( + plc.io.SourceInfo([source]), dtypes=dtypes, lines=True + ) + new_table = pa_table.cast(new_schema) + + # orient=records is lossy + # and doesn't preserve column names when there's zero rows in the table + if len(new_table) == 0: + new_table = pa.table([]) + + assert_table_and_meta_eq(new_table, res, check_field_nullability=False) + + +@pytest.mark.parametrize("chunk_size", [10, 15, 20]) +def test_read_json_lines_byte_range(source_or_sink, chunk_size): + source = source_or_sink + if isinstance(source_or_sink, io.StringIO): + pytest.skip("byte_range doesn't work on StringIO") + + json_str = "[1, 2, 3]\n[4, 5, 6]\n[7, 8, 9]\n" + write_json_bytes(source, json_str) + + tbls_w_meta = [] + for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size): + tbls_w_meta.append( + plc.io.json.read_json( + plc.io.SourceInfo([source]), + lines=True, + byte_range_offset=chunk_start, + byte_range_size=chunk_start + chunk_size, + ) + ) + + if isinstance(source, io.IOBase): + source.seek(0) + exp = pd.read_json(source, orient="records", lines=True) + + # TODO: can do this operation using pylibcudf + tbls = [] + for tbl_w_meta in tbls_w_meta: + if tbl_w_meta.tbl.num_rows() > 0: + tbls.append(plc.interop.to_arrow(tbl_w_meta.tbl)) + full_tbl = pa.concat_tables(tbls) + + full_tbl_plc = plc.io.TableWithMetadata( + plc.interop.from_arrow(full_tbl), + tbls_w_meta[0].column_names(include_children=True), + ) + assert_table_and_meta_eq(pa.Table.from_pandas(exp), full_tbl_plc) + + +@pytest.mark.parametrize("keep_quotes", [True, False]) +def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): + source = source_or_sink + + json_bytes = '["a", "b", "c"]\n' + write_json_bytes(source, json_bytes) + + tbl_w_meta = plc.io.json.read_json( + plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes + ) + + template = "{0}" + if keep_quotes: + template = '"{0}"' + + exp = pa.Table.from_arrays( + [ + [template.format("a")], + [template.format("b")], + [template.format("c")], + ], + names=["0", "1", "2"], + ) + + assert_table_and_meta_eq(exp, tbl_w_meta) + + +@pytest.mark.parametrize( + "recovery_mode", [opt for opt in plc.io.types.JSONRecoveryMode] +) +def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink): + source = source_or_sink + + json_bytes = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' + write_json_bytes(source, json_bytes) + + if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL: + with pytest.raises(RuntimeError): + plc.io.json.read_json( + plc.io.SourceInfo([source]), + lines=True, + recovery_mode=recovery_mode, + ) + else: + # Recover case (bad values replaced with nulls) + tbl_w_meta = plc.io.json.read_json( + plc.io.SourceInfo([source]), + lines=True, + recovery_mode=recovery_mode, + ) + exp = pa.Table.from_arrays( + [[1, 2, None, 3], [10, 11, None, 12]], names=["a", "b"] + ) + assert_table_and_meta_eq(exp, tbl_w_meta) + + +# TODO: Add tests for these! +# Tests were not added in the initial PR porting the JSON reader to pylibcudf +# to save time (and since there are no existing tests for these in Python cuDF) +# mixed_types_as_string = mixed_types_as_string, +# prune_columns = prune_columns, diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index 297040b6d95..9222f6d23db 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -1077,8 +1077,13 @@ def test_json_dtypes_nested_data(): ) pdf = pd.read_json( - StringIO(expected_json_str), orient="records", lines=True + StringIO(expected_json_str), + orient="records", + lines=True, ) + + assert_eq(df, pdf) + pdf.columns = pdf.columns.astype("str") pa_table_pdf = pa.Table.from_pandas( pdf, schema=df.to_arrow().schema, safe=False diff --git a/python/cudf/cudf_pandas_tests/test_cudf_pandas.py b/python/cudf/cudf_pandas_tests/test_cudf_pandas.py index f51ce103677..b0aeaba3916 100644 --- a/python/cudf/cudf_pandas_tests/test_cudf_pandas.py +++ b/python/cudf/cudf_pandas_tests/test_cudf_pandas.py @@ -1566,3 +1566,22 @@ def test_arrow_string_arrays(): ) tm.assert_equal(cu_arr, pd_arr) + + +@pytest.mark.parametrize("indexer", ["at", "iat"]) +def test_at_iat(indexer): + df = xpd.DataFrame(range(3)) + result = getattr(df, indexer)[0, 0] + assert result == 0 + + getattr(df, indexer)[0, 0] = 1 + expected = pd.DataFrame([1, 1, 2]) + tm.assert_frame_equal(df, expected) + + +def test_at_setitem_empty(): + df = xpd.DataFrame({"name": []}, dtype="float64") + df.at[0, "name"] = 1.0 + df.at[0, "new"] = 2.0 + expected = pd.DataFrame({"name": [1.0], "new": [2.0]}) + tm.assert_frame_equal(df, expected)