diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9c287cd828b..c5fd1283356 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -392,7 +392,6 @@ add_library( src/io/csv/reader_impl.cu src/io/csv/writer_impl.cu src/io/functions.cpp - src/io/json/byte_range_info.cu src/io/json/json_column.cu src/io/json/json_normalization.cu src/io/json/json_tree.cu diff --git a/cpp/include/cudf/io/detail/json.hpp b/cpp/include/cudf/io/detail/json.hpp index 42b10a78ce8..38ba4f675c3 100644 --- a/cpp/include/cudf/io/detail/json.hpp +++ b/cpp/include/cudf/io/detail/json.hpp @@ -61,7 +61,7 @@ void write_json(data_sink* sink, * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -void normalize_single_quotes(datasource::owning_buffer>& indata, +void normalize_single_quotes(datasource::owning_buffer& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); @@ -72,7 +72,7 @@ void normalize_single_quotes(datasource::owning_buffer * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -void normalize_whitespace(datasource::owning_buffer>& indata, +void normalize_whitespace(datasource::owning_buffer& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); } // namespace io::json::detail diff --git a/cpp/include/cudf/strings/replace.hpp b/cpp/include/cudf/strings/replace.hpp index 5b4ffb98f99..f450b77ad7a 100644 --- a/cpp/include/cudf/strings/replace.hpp +++ b/cpp/include/cudf/strings/replace.hpp @@ -160,18 +160,6 @@ std::unique_ptr replace_multiple( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); -/** - * @copydoc cudf::strings::replace_multiple - * - * @deprecated since 24.08 - */ -[[deprecated]] std::unique_ptr replace( - strings_column_view const& input, - strings_column_view const& targets, - strings_column_view const& repls, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - /** @} */ // end of doxygen group } // namespace strings } // namespace CUDF_EXPORT cudf diff --git a/cpp/include/cudf/utilities/type_checks.hpp b/cpp/include/cudf/utilities/type_checks.hpp index 4fcbca09d17..aeb5db57830 100644 --- a/cpp/include/cudf/utilities/type_checks.hpp +++ b/cpp/include/cudf/utilities/type_checks.hpp @@ -22,25 +22,6 @@ namespace CUDF_EXPORT cudf { -/** - * @brief Compare the types of two `column_view`s - * - * @deprecated Since 24.06. Use cudf::have_same_types instead. - * - * This function returns true if the type of `lhs` equals that of `rhs`. - * - For fixed point types, the scale is compared. - * - For dictionary types, the type of the keys are compared if both are - * non-empty columns. - * - For lists types, the type of child columns are compared recursively. - * - For struct types, the type of each field are compared in order. - * - For all other types, the `id` of `data_type` is compared. - * - * @param lhs The first `column_view` to compare - * @param rhs The second `column_view` to compare - * @return true if column types match - */ -[[deprecated]] bool column_types_equal(column_view const& lhs, column_view const& rhs); - /** * @brief Compare the type IDs of two `column_view`s * diff --git a/cpp/src/io/json/byte_range_info.cu b/cpp/src/io/json/byte_range_info.cu deleted file mode 100644 index 258a40b0dd3..00000000000 --- a/cpp/src/io/json/byte_range_info.cu +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include - -#include - -namespace cudf::io::json::detail { - -// Extract the first character position in the string. -size_type find_first_delimiter(device_span d_data, - char const delimiter, - rmm::cuda_stream_view stream) -{ - auto const first_delimiter_position = - thrust::find(rmm::exec_policy(stream), d_data.begin(), d_data.end(), delimiter); - return first_delimiter_position != d_data.end() ? first_delimiter_position - d_data.begin() : -1; -} - -} // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/json_normalization.cu b/cpp/src/io/json/json_normalization.cu index 760b2214365..cb8b4e97ebb 100644 --- a/cpp/src/io/json/json_normalization.cu +++ b/cpp/src/io/json/json_normalization.cu @@ -298,7 +298,7 @@ struct TransduceToNormalizedWS { namespace detail { -void normalize_single_quotes(datasource::owning_buffer>& indata, +void normalize_single_quotes(datasource::owning_buffer& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -311,22 +311,22 @@ void normalize_single_quotes(datasource::owning_buffer outbuf(indata.size() * 2, stream, mr); + rmm::device_buffer outbuf(indata.size() * 2, stream, mr); rmm::device_scalar outbuf_size(stream, mr); - parser.Transduce(indata.data(), + parser.Transduce(reinterpret_cast(indata.data()), static_cast(indata.size()), - outbuf.data(), + static_cast(outbuf.data()), thrust::make_discard_iterator(), outbuf_size.data(), normalize_quotes::start_state, stream); outbuf.resize(outbuf_size.value(stream), stream); - datasource::owning_buffer> outdata(std::move(outbuf)); + datasource::owning_buffer outdata(std::move(outbuf)); std::swap(indata, outdata); } -void normalize_whitespace(datasource::owning_buffer>& indata, +void normalize_whitespace(datasource::owning_buffer& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -339,18 +339,18 @@ void normalize_whitespace(datasource::owning_buffer normalize_whitespace::TransduceToNormalizedWS{}), stream); - rmm::device_uvector outbuf(indata.size(), stream, mr); + rmm::device_buffer outbuf(indata.size(), stream, mr); rmm::device_scalar outbuf_size(stream, mr); - parser.Transduce(indata.data(), + parser.Transduce(reinterpret_cast(indata.data()), static_cast(indata.size()), - outbuf.data(), + static_cast(outbuf.data()), thrust::make_discard_iterator(), outbuf_size.data(), normalize_whitespace::start_state, stream); outbuf.resize(outbuf_size.value(stream), stream); - datasource::owning_buffer> outdata(std::move(outbuf)); + datasource::owning_buffer outdata(std::move(outbuf)); std::swap(indata, outdata); } diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 590f70864b1..2658cbbed2f 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -31,6 +31,7 @@ #include #include +#include #include #include @@ -38,11 +39,14 @@ namespace cudf::io::json::detail { -size_t sources_size(host_span> const sources, - size_t range_offset, - size_t range_size) +namespace { + +// Return total size of sources enclosing the passed range +std::size_t sources_size(host_span> const sources, + std::size_t range_offset, + std::size_t range_size) { - return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) { + return std::accumulate(sources.begin(), sources.end(), 0ul, [=](std::size_t sum, auto& source) { auto const size = source->size(); // TODO take care of 0, 0, or *, 0 case. return sum + @@ -50,109 +54,55 @@ size_t sources_size(host_span> const sources, }); } +// Return estimated size of subchunk using a heuristic involving the byte range size and the minimum +// subchunk size +std::size_t estimate_size_per_subchunk(std::size_t chunk_size) +{ + auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; + // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + // 10kb) and the byte range size + return geometric_mean(std::ceil(static_cast(chunk_size) / num_subchunks), + min_subchunk_size); +} + /** - * @brief Read from array of data sources into RMM buffer. The size of the returned device span - can be larger than the number of bytes requested from the list of sources when - the range to be read spans across multiple sources. This is due to the delimiter - characters inserted after the end of each accessed source. + * @brief Return the upper bound on the batch size for the JSON reader. * - * @param buffer Device span buffer to which data is read - * @param sources Array of data sources - * @param compression Compression format of source - * @param range_offset Number of bytes to skip from source start - * @param range_size Number of bytes to read from source - * @param stream CUDA stream used for device memory operations and kernel launches - * @returns A subspan of the input device span containing data read + * The datasources passed to the JSON reader are split into batches demarcated by byte range + * offsets and read iteratively. The batch size is capped at INT_MAX bytes, which is the + * default value returned by the function. This value can be overridden at runtime using the + * environment variable LIBCUDF_JSON_BATCH_SIZE + * + * @return size in bytes */ -device_span ingest_raw_input(device_span buffer, - host_span> sources, - compression_type compression, - size_t range_offset, - size_t range_size, - rmm::cuda_stream_view stream) +std::size_t get_batch_size_upper_bound() { - CUDF_FUNC_RANGE(); - // We append a line delimiter between two files to make sure the last line of file i and the first - // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line - // delimiter. - auto constexpr num_delimiter_chars = 1; - - if (compression == compression_type::NONE) { - auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); - std::vector prefsum_source_sizes(sources.size()); - std::vector> h_buffers; - size_t bytes_read = 0; - std::transform_inclusive_scan(sources.begin(), - sources.end(), - prefsum_source_sizes.begin(), - std::plus{}, - [](std::unique_ptr const& s) { return s->size(); }); - auto upper = - std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); - size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - - auto const total_bytes_to_read = - std::min(range_size, prefsum_source_sizes.back() - range_offset); - range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { - if (sources[i]->is_empty()) continue; - auto data_size = - std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); - auto destination = reinterpret_cast(buffer.data()) + bytes_read + - (num_delimiter_chars * delimiter_map.size()); - if (sources[i]->is_device_read_preferred(data_size)) { - bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); - bytes_read += h_buffer->size(); - } - range_offset = 0; - delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); - } - // Removing delimiter inserted after last non-empty source is read - if (!delimiter_map.empty()) { delimiter_map.pop_back(); } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator('\n'); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - delimiter_map, stream, rmm::mr::get_current_device_resource()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - buffer.data()); - } - stream.synchronize(); - return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); - } - // TODO: allow byte range reading from multiple compressed files. - auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); - auto hbuffer = std::vector(remaining_bytes_to_read); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); - auto uncomp_data = decompress(compression, hbuffer); - CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), - reinterpret_cast(uncomp_data.data()), - uncomp_data.size() * sizeof(char), - cudaMemcpyHostToDevice, - stream.value())); - stream.synchronize(); - return buffer.first(uncomp_data.size()); + auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); + int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; + auto const batch_limit = static_cast(std::numeric_limits::max()); + auto const batch_size_upper_bound = static_cast( + (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); + return batch_size_upper_bound; } -size_t estimate_size_per_subchunk(size_t chunk_size) +/** + * @brief Extract the first delimiter character position in the string + * + * @param d_data Device span in which to search for delimiter character + * @param delimiter Delimiter character to search for + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Position of first delimiter character in device array + */ +size_type find_first_delimiter(device_span d_data, + char const delimiter, + rmm::cuda_stream_view stream) { - auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; - // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to - // 10kb) and the byte range size - return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); + auto const first_delimiter_position = + thrust::find(rmm::exec_policy(stream), d_data.begin(), d_data.end(), delimiter); + return first_delimiter_position != d_data.end() + ? static_cast(thrust::distance(d_data.begin(), first_delimiter_position)) + : -1; } /** @@ -168,19 +118,19 @@ size_t estimate_size_per_subchunk(size_t chunk_size) * @param stream CUDA stream used for device memory operations and kernel launches * @returns Data source owning buffer enclosing the bytes read */ -datasource::owning_buffer> get_record_range_raw_input( +datasource::owning_buffer get_record_range_raw_input( host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); - size_t const total_source_size = sources_size(sources, 0, 0); + std::size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); compression_type const reader_compression = reader_opts.get_compression(); - size_t const chunk_offset = reader_opts.get_byte_range_offset(); - size_t chunk_size = reader_opts.get_byte_range_size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -188,20 +138,20 @@ datasource::owning_buffer> get_record_range_raw_input( auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; - int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; - size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; + std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. auto constexpr header_size = 4096; - size_t const buffer_size = + std::size_t const buffer_size = reader_compression != compression_type::NONE ? total_source_size * estimated_compression_ratio + header_size : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; - rmm::device_uvector buffer(buffer_size, stream); - device_span bufspan(buffer); + rmm::device_buffer buffer(buffer_size, stream); + device_span bufspan(reinterpret_cast(buffer.data()), buffer.size()); // Offset within buffer indicating first read position std::int64_t buffer_offset = 0; @@ -213,12 +163,12 @@ datasource::owning_buffer> get_record_range_raw_input( chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); if (first_delim_pos == -1) { // return empty owning datasource buffer - auto empty_buf = rmm::device_uvector(0, stream); - return datasource::owning_buffer>(std::move(empty_buf)); + auto empty_buf = rmm::device_buffer(0, stream); + return datasource::owning_buffer(std::move(empty_buf)); } else if (!should_load_all_sources) { // Find next delimiter - std::int64_t next_delim_pos = -1; - size_t next_subchunk_start = chunk_offset + chunk_size; + std::int64_t next_delim_pos = -1; + std::size_t next_subchunk_start = chunk_offset + chunk_size; while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) { buffer_offset += readbufspan.size(); readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), @@ -232,24 +182,26 @@ datasource::owning_buffer> get_record_range_raw_input( } if (next_delim_pos < buffer_offset) next_delim_pos = buffer_offset + readbufspan.size(); - return datasource::owning_buffer>( + return datasource::owning_buffer( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, next_delim_pos - first_delim_pos - shift_for_nonzero_offset); } - return datasource::owning_buffer>( + return datasource::owning_buffer( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, readbufspan.size() - first_delim_pos - shift_for_nonzero_offset); } +// Helper function to read the current batch using byte range offsets and size +// passed table_with_metadata read_batch(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - datasource::owning_buffer> bufview = + datasource::owning_buffer bufview = get_record_range_raw_input(sources, reader_opts, stream); // If input JSON buffer has single quotes and option to normalize single quotes is enabled, @@ -270,6 +222,92 @@ table_with_metadata read_batch(host_span> sources, return device_parse_nested_json(buffer, reader_opts, stream, mr); } +} // anonymous namespace + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + compression_type compression, + std::size_t range_offset, + std::size_t range_size, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + // We append a line delimiter between two files to make sure the last line of file i and the first + // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line + // delimiter. + auto constexpr num_delimiter_chars = 1; + + if (compression == compression_type::NONE) { + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; + std::size_t bytes_read = 0; + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](std::unique_ptr const& s) { return s->size(); }); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + auto const total_bytes_to_read = + std::min(range_size, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; + i++) { + if (sources[i]->is_empty()) continue; + auto data_size = + std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read + + (num_delimiter_chars * delimiter_map.size()); + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); + bytes_read += h_buffer->size(); + } + range_offset = 0; + delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); + } + // Removing delimiter inserted after last non-empty source is read + if (!delimiter_map.empty()) { delimiter_map.pop_back(); } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator('\n'); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + delimiter_map, stream, rmm::mr::get_current_device_resource()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + buffer.data()); + } + stream.synchronize(); + return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); + } + // TODO: allow byte range reading from multiple compressed files. + auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); + auto hbuffer = std::vector(remaining_bytes_to_read); + // Single read because only a single compressed source is supported + // Reading to host because decompression of a single block is much faster on the CPU + sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); + auto uncomp_data = decompress(compression, hbuffer); + CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), + reinterpret_cast(uncomp_data.data()), + uncomp_data.size() * sizeof(char), + cudaMemcpyHostToDevice, + stream.value())); + stream.synchronize(); + return buffer.first(uncomp_data.size()); +} + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, @@ -296,15 +334,16 @@ table_with_metadata read_json(host_span> sources, * Note that the batched reader does not work for compressed inputs or for regular * JSON inputs. */ - size_t const total_source_size = sources_size(sources, 0, 0); - size_t chunk_offset = reader_opts.get_byte_range_offset(); - size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? total_source_size - chunk_offset - : std::min(chunk_size, total_source_size - chunk_offset); + std::size_t const total_source_size = sources_size(sources, 0, 0); + std::size_t chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); - size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - size_t const batch_size_ub = - std::numeric_limits::max() - (max_subchunks_prealloced * size_per_subchunk); + std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + std::size_t const batch_size_upper_bound = get_batch_size_upper_bound(); + std::size_t const batch_size = + batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); /* * Identify the position (zero-indexed) of starting source file from which to begin @@ -314,10 +353,10 @@ table_with_metadata read_json(host_span> sources, */ // Prefix sum of source file sizes - size_t pref_source_size = 0; + std::size_t pref_source_size = 0; // Starting source file from which to being batching evaluated using byte range offset - size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { - for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { + std::size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { + for (std::size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } pref_source_size += sources[src_idx]->size(); } @@ -329,16 +368,16 @@ table_with_metadata read_json(host_span> sources, * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading * stops. */ - size_t pref_bytes_size = chunk_offset; - size_t end_bytes_size = chunk_offset + chunk_size; - std::vector batch_offsets{pref_bytes_size}; - for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { + std::size_t pref_bytes_size = chunk_offset; + std::size_t end_bytes_size = chunk_offset + chunk_size; + std::vector batch_offsets{pref_bytes_size}; + for (std::size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { pref_source_size += sources[i]->size(); // If the current source file can subsume multiple batches, we split the file until the // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) while (pref_bytes_size < end_bytes_size && - pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) { - auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size); + pref_source_size >= std::min(pref_bytes_size + batch_size, end_bytes_size)) { + auto next_batch_size = std::min(batch_size, end_bytes_size - pref_bytes_size); batch_offsets.push_back(batch_offsets.back() + next_batch_size); pref_bytes_size += next_batch_size; } @@ -356,7 +395,7 @@ table_with_metadata read_json(host_span> sources, // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (size_t i = 0; i < batch_offsets.size() - 1; i++) { + for (std::size_t i = 0; i < batch_offsets.size() - 1; i++) { batched_reader_opts.set_byte_range_offset(batch_offsets[i]); batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); partial_tables.emplace_back( diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 32de4ebabfa..7e3a920f00d 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -37,6 +37,20 @@ constexpr size_t min_subchunk_size = 10000; constexpr int estimated_compression_ratio = 4; constexpr int max_subchunks_prealloced = 3; +/** + * @brief Read from array of data sources into RMM buffer. The size of the returned device span + can be larger than the number of bytes requested from the list of sources when + the range to be read spans across multiple sources. This is due to the delimiter + characters inserted after the end of each accessed source. + * + * @param buffer Device span buffer to which data is read + * @param sources Array of data sources + * @param compression Compression format of source + * @param range_offset Number of bytes to skip from source start + * @param range_size Number of bytes to read from source + * @param stream CUDA stream used for device memory operations and kernel launches + * @returns A subspan of the input device span containing data read + */ device_span ingest_raw_input(device_span buffer, host_span> sources, compression_type compression, @@ -44,14 +58,20 @@ device_span ingest_raw_input(device_span buffer, size_t range_size, rmm::cuda_stream_view stream); +/** + * @brief Reads and returns the entire data set in batches. + * + * @param sources Input `datasource` objects to read the dataset from + * @param reader_opts Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + * + * @return cudf::table object that contains the array of cudf::column. + */ table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); -size_type find_first_delimiter(device_span d_data, - char const delimiter, - rmm::cuda_stream_view stream); - } // namespace io::json::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/strings/replace/multi.cu b/cpp/src/strings/replace/multi.cu index 2ca22f0e017..b5248700d53 100644 --- a/cpp/src/strings/replace/multi.cu +++ b/cpp/src/strings/replace/multi.cu @@ -533,16 +533,5 @@ std::unique_ptr replace_multiple(strings_column_view const& strings, return detail::replace_multiple(strings, targets, repls, stream, mr); } -// deprecated in 24.08 -std::unique_ptr replace(strings_column_view const& strings, - strings_column_view const& targets, - strings_column_view const& repls, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - return detail::replace_multiple(strings, targets, repls, stream, mr); -} - } // namespace strings } // namespace cudf diff --git a/cpp/src/utilities/type_checks.cpp b/cpp/src/utilities/type_checks.cpp index dac981fb532..3095b342748 100644 --- a/cpp/src/utilities/type_checks.cpp +++ b/cpp/src/utilities/type_checks.cpp @@ -139,11 +139,6 @@ bool have_same_types(column_view const& lhs, column_view const& rhs) return type_dispatcher(lhs.type(), columns_equal_fn{}, lhs, rhs); } -bool column_types_equal(column_view const& lhs, column_view const& rhs) -{ - return have_same_types(lhs, rhs); -} - bool have_same_types(column_view const& lhs, scalar const& rhs) { return type_dispatcher(lhs.type(), column_scalar_equal_fn{}, lhs, rhs); diff --git a/cpp/tests/io/json/json_quote_normalization_test.cpp b/cpp/tests/io/json/json_quote_normalization_test.cpp index 55ad0afe499..3a9ba8d9f3b 100644 --- a/cpp/tests/io/json/json_quote_normalization_test.cpp +++ b/cpp/tests/io/json/json_quote_normalization_test.cpp @@ -26,7 +26,7 @@ #include #include -#include +#include #include #include @@ -42,12 +42,11 @@ void run_test(std::string const& host_input, std::string const& expected_host_ou std::make_shared(); auto stream_view = cudf::test::get_default_stream(); - auto device_input = cudf::detail::make_device_uvector_async( - host_input, stream_view, rmm::mr::get_current_device_resource()); + auto device_input = rmm::device_buffer( + host_input.c_str(), host_input.size(), stream_view, rmm::mr::get_current_device_resource()); // Preprocessing FST - cudf::io::datasource::owning_buffer> device_data( - std::move(device_input)); + cudf::io::datasource::owning_buffer device_data(std::move(device_input)); cudf::io::json::detail::normalize_single_quotes(device_data, stream_view, rsc.get()); std::string preprocessed_host_output(device_data.size(), 0); diff --git a/cpp/tests/io/json/json_whitespace_normalization_test.cu b/cpp/tests/io/json/json_whitespace_normalization_test.cu index 8ed5fa81b12..01dd17fab98 100644 --- a/cpp/tests/io/json/json_whitespace_normalization_test.cu +++ b/cpp/tests/io/json/json_whitespace_normalization_test.cu @@ -38,12 +38,11 @@ void run_test(std::string const& host_input, std::string const& expected_host_ou // Prepare cuda stream for data transfers & kernels auto stream_view = cudf::test::get_default_stream(); - auto device_input = cudf::detail::make_device_uvector_async( - host_input, stream_view, rmm::mr::get_current_device_resource()); + auto device_input = rmm::device_buffer( + host_input.c_str(), host_input.size(), stream_view, rmm::mr::get_current_device_resource()); // Preprocessing FST - cudf::io::datasource::owning_buffer> device_data( - std::move(device_input)); + cudf::io::datasource::owning_buffer device_data(std::move(device_input)); cudf::io::json::detail::normalize_whitespace( device_data, stream_view, rmm::mr::get_current_device_resource()); diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index 49abf7b484d..e34ab991c11 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -28,13 +28,17 @@ struct JsonLargeReaderTest : public cudf::test::StringsLargeTest {}; TEST_F(JsonLargeReaderTest, MultiBatch) { - std::string json_string = R"( + std::string json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - constexpr size_t batch_size_ub = std::numeric_limits::max(); - constexpr size_t expected_file_size = 1.5 * static_cast(batch_size_ub); + + std::size_t const batch_size_upper_bound = std::numeric_limits::max() / 16; + // set smaller batch_size to reduce file size and execution time + setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1); + + constexpr std::size_t expected_file_size = 1.5 * static_cast(batch_size_upper_bound); std::size_t const log_repetitions = static_cast(std::ceil(std::log2(expected_file_size / json_string.size()))); @@ -66,8 +70,11 @@ TEST_F(JsonLargeReaderTest, MultiBatch) datasources.emplace_back(cudf::io::datasource::create(hb)); } // Test for different chunk sizes - std::vector chunk_sizes{ - batch_size_ub / 4, batch_size_ub / 2, batch_size_ub, static_cast(batch_size_ub * 2)}; + std::vector chunk_sizes{batch_size_upper_bound / 4, + batch_size_upper_bound / 2, + batch_size_upper_bound, + static_cast(batch_size_upper_bound * 2)}; + for (auto chunk_size : chunk_sizes) { auto const tables = split_byte_range_reading(datasources, @@ -86,4 +93,7 @@ TEST_F(JsonLargeReaderTest, MultiBatch) // cannot use EQUAL due to concatenate removing null mask CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); } + + // go back to normal batch_size + unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); } diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 4e737451ed6..36e342cae13 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -1084,7 +1084,12 @@ private static DidViewChange gatherJSONColumns(Schema schema, TableWithMeta.Nest // The types don't match so just return the input unchanged... return DidViewChange.no(); } else { - String[] foundNames = children.getNames(); + String[] foundNames; + if (children == null) { + foundNames = new String[0]; + } else { + foundNames = children.getNames(); + } HashMap indices = new HashMap<>(); for (int i = 0; i < foundNames.length; i++) { indices.put(foundNames[i], i); @@ -1101,8 +1106,9 @@ private static DidViewChange gatherJSONColumns(Schema schema, TableWithMeta.Nest for (int i = 0; i < columns.length; i++) { String neededColumnName = neededNames[i]; Integer index = indices.get(neededColumnName); + Schema childSchema = schema.getChild(i); if (index != null) { - if (schema.getChild(i).isStructOrHasStructDescendant()) { + if (childSchema.isStructOrHasStructDescendant()) { ColumnView child = cv.getChildColumnView(index); boolean shouldCloseChild = true; try { @@ -1131,8 +1137,23 @@ private static DidViewChange gatherJSONColumns(Schema schema, TableWithMeta.Nest } } else { somethingChanged = true; - try (Scalar s = Scalar.fromNull(types[i])) { - columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount()); + if (types[i] == DType.LIST) { + try (Scalar s = Scalar.listFromNull(childSchema.getChild(0).asHostDataType())) { + columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount()); + } + } else if (types[i] == DType.STRUCT) { + int numStructChildren = childSchema.getNumChildren(); + HostColumnVector.DataType[] structChildren = new HostColumnVector.DataType[numStructChildren]; + for (int structChildIndex = 0; structChildIndex < numStructChildren; structChildIndex++) { + structChildren[structChildIndex] = childSchema.getChild(structChildIndex).asHostDataType(); + } + try (Scalar s = Scalar.structFromNull(structChildren)) { + columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount()); + } + } else { + try (Scalar s = Scalar.fromNull(types[i])) { + columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount()); + } } } } diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index 819d351b2c4..83596704672 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -530,6 +530,10 @@ def _select_by_label_list_like(self, key: Any) -> ColumnAccessor: ) else: data = {k: self._grouped_data[k] for k in key} + if len(data) != len(key): + raise ValueError( + "Selecting duplicate column labels is not supported." + ) if self.multiindex: data = dict(_to_flat_dict_inner(data)) return self.__class__( diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 094da09ab08..3eab27bd165 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -5,6 +5,7 @@ import operator import pickle import warnings +from collections.abc import Hashable from functools import cache, cached_property from numbers import Number from typing import TYPE_CHECKING, Any, Literal, MutableMapping, cast @@ -60,7 +61,7 @@ from cudf.utils.utils import _warn_no_dask_cudf, search_range if TYPE_CHECKING: - from collections.abc import Generator, Hashable, Iterable + from collections.abc import Generator, Iterable from datetime import tzinfo @@ -450,6 +451,16 @@ def __getitem__(self, index): return self.start + index * self.step return self._as_int_index()[index] + def _get_columns_by_label(self, labels) -> Index: + # used in .sort_values + if isinstance(labels, Hashable): + if labels == self.name: + return self._as_int_index() + elif is_list_like(labels): + if list(self.names) == list(labels): + return self._as_int_index() + raise KeyError(labels) + @_performance_tracking def equals(self, other) -> bool: if isinstance(other, RangeIndex): @@ -2403,11 +2414,13 @@ def day_name(self, locale: str | None = None) -> Index: >>> datetime_index = cudf.date_range("2016-12-31", "2017-01-08", freq="D") >>> datetime_index DatetimeIndex(['2016-12-31', '2017-01-01', '2017-01-02', '2017-01-03', - '2017-01-04', '2017-01-05', '2017-01-06', '2017-01-07'], + '2017-01-04', '2017-01-05', '2017-01-06', '2017-01-07', + '2017-01-08'], dtype='datetime64[ns]', freq='D') >>> datetime_index.day_name() Index(['Saturday', 'Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', - 'Friday', 'Saturday'], dtype='object') + 'Friday', 'Saturday', 'Sunday'], + dtype='object') """ day_names = self._column.get_day_names(locale) return Index._from_data({self.name: day_names}) diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 24d947a574a..3b44a0f5864 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -3592,10 +3592,34 @@ def sort_values( if len(self) == 0: return self + try: + by_in_columns = self._get_columns_by_label(by) + except KeyError: + by_in_columns = None + if self.ndim == 1: + # For Series case, we're never selecting an index level. + by_in_index = None + else: + try: + by_in_index = self.index._get_columns_by_label(by) + except KeyError: + by_in_index = None + + if by_in_columns is not None and by_in_index is not None: + raise ValueError( + f"{by=} appears in the {type(self).__name__} columns " + "and as an index level which is ambiguous." + ) + elif by_in_columns is not None: + by_columns = by_in_columns + elif by_in_index is not None: + by_columns = by_in_index + else: + raise KeyError(by) # argsort the `by` column out = self._gather( GatherMap.from_column_unchecked( - self._get_columns_by_label(by)._get_sorted_inds( + by_columns._get_sorted_inds( ascending=ascending, na_position=na_position ), len(self), diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index 9646b34830f..ab88b191570 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1394,12 +1394,16 @@ def from_arrays( raise TypeError(error_msg) codes = [] levels = [] + names_from_arrays = [] for array in arrays: if not (is_list_like(array) or is_column_like(array)): raise TypeError(error_msg) code, level = factorize(array, sort=True) codes.append(code) levels.append(level) + names_from_arrays.append(getattr(array, "name", None)) + if names is None: + names = names_from_arrays return cls( codes=codes, levels=levels, sortorder=sortorder, names=names ) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index de57ac5f290..53675d339ac 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -801,14 +801,17 @@ def dt(self): >>> s.dt.hour 0 12 1 13 + 2 14 dtype: int16 >>> s.dt.second 0 0 1 0 + 2 0 dtype: int16 >>> s.dt.day 0 3 1 3 + 2 3 dtype: int16 Returns diff --git a/python/cudf/cudf/core/tools/datetimes.py b/python/cudf/cudf/core/tools/datetimes.py index 2f77778116f..c50a36b68b5 100644 --- a/python/cudf/cudf/core/tools/datetimes.py +++ b/python/cudf/cudf/core/tools/datetimes.py @@ -951,7 +951,7 @@ def date_range( end = cudf.Scalar(end, dtype=dtype) _is_increment_sequence = end >= start - periods = math.ceil( + periods = math.floor( int(end - start) / _offset_to_nanoseconds_lower_bound(offset) ) @@ -959,9 +959,10 @@ def date_range( # Mismatched sign between (end-start) and offset, return empty # column periods = 0 - elif periods == 0: - # end == start, return exactly 1 timestamp (start) - periods = 1 + else: + # If end == start, periods == 0 and we return exactly 1 timestamp (start). + # Otherwise, since closed="both", we ensure the end point is included. + periods += 1 # We compute `end_estim` (the estimated upper bound of the date # range) below, but don't always use it. We do this to ensure diff --git a/python/cudf/cudf/tests/test_datetime.py b/python/cudf/cudf/tests/test_datetime.py index 6bc775d2a2c..7be4faa42c3 100644 --- a/python/cudf/cudf/tests/test_datetime.py +++ b/python/cudf/cudf/tests/test_datetime.py @@ -2536,3 +2536,9 @@ def test_dti_methods(method, kwargs): result = getattr(cudf_dti, method)(**kwargs) expected = getattr(pd_dti, method)(**kwargs) assert_eq(result, expected) + + +def test_date_range_start_end_divisible_by_freq(): + result = cudf.date_range("2011-01-01", "2011-01-02", freq="h") + expected = pd.date_range("2011-01-01", "2011-01-02", freq="h") + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/test_indexing.py b/python/cudf/cudf/tests/test_indexing.py index 7005cbc6834..716b4dc6acd 100644 --- a/python/cudf/cudf/tests/test_indexing.py +++ b/python/cudf/cudf/tests/test_indexing.py @@ -2361,3 +2361,11 @@ def test_sliced_categorical_as_ordered(): name="a", ) assert_eq(result, expected) + + +def test_duplicate_labels_raises(): + df = cudf.DataFrame([[1, 2]], columns=["a", "b"]) + with pytest.raises(ValueError): + df[["a", "a"]] + with pytest.raises(ValueError): + df.loc[:, ["a", "a"]] diff --git a/python/cudf/cudf/tests/test_multiindex.py b/python/cudf/cudf/tests/test_multiindex.py index b7314a36e73..a68f4574da3 100644 --- a/python/cudf/cudf/tests/test_multiindex.py +++ b/python/cudf/cudf/tests/test_multiindex.py @@ -2179,3 +2179,13 @@ def test_unique_level(): result = pd_mi.unique(level=1) expected = cudf_mi.unique(level=1) assert_eq(result, expected) + + +@pytest.mark.parametrize( + "idx", [pd.Index, pd.CategoricalIndex, pd.DatetimeIndex, pd.TimedeltaIndex] +) +def test_from_arrays_infer_names(idx): + arrays = [idx([1], name="foo"), idx([2], name="bar")] + expected = pd.MultiIndex.from_arrays(arrays) + result = cudf.MultiIndex.from_arrays(arrays) + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/test_sorting.py b/python/cudf/cudf/tests/test_sorting.py index a8ffce6e88b..2cf2259d9ec 100644 --- a/python/cudf/cudf/tests/test_sorting.py +++ b/python/cudf/cudf/tests/test_sorting.py @@ -405,3 +405,23 @@ def test_dataframe_scatter_by_map_empty(): df = DataFrame({"a": [], "b": []}, dtype="float64") scattered = df.scatter_by_map(df["a"]) assert len(scattered) == 0 + + +def test_sort_values_by_index_level(): + df = pd.DataFrame({"a": [1, 3, 2]}, index=pd.Index([1, 3, 2], name="b")) + cudf_df = DataFrame.from_pandas(df) + result = cudf_df.sort_values("b") + expected = df.sort_values("b") + assert_eq(result, expected) + + +def test_sort_values_by_ambiguous(): + df = pd.DataFrame({"a": [1, 3, 2]}, index=pd.Index([1, 3, 2], name="a")) + cudf_df = DataFrame.from_pandas(df) + + assert_exceptions_equal( + lfunc=df.sort_values, + rfunc=cudf_df.sort_values, + lfunc_args_and_kwargs=(["a"], {}), + rfunc_args_and_kwargs=(["a"], {}), + )