From 82ea28fb1d202a2e35eb96498a94659dee106bac Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 12 Aug 2024 16:56:27 +0000 Subject: [PATCH] pr reviews --- cpp/src/io/json/read_json.cu | 88 ++++++++++++++------------- cpp/tests/large_strings/json_tests.cu | 12 ++-- 2 files changed, 51 insertions(+), 49 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 8a51c02f343..a8f5a71259a 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -42,11 +42,11 @@ namespace cudf::io::json::detail { namespace { // Return total size of sources enclosing the passed range -size_t sources_size(host_span> const sources, - size_t range_offset, - size_t range_size) +std::size_t sources_size(host_span> const sources, + std::size_t range_offset, + std::size_t range_size) { - return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) { + return std::accumulate(sources.begin(), sources.end(), 0ul, [=](std::size_t sum, auto& source) { auto const size = source->size(); // TODO take care of 0, 0, or *, 0 case. return sum + @@ -56,7 +56,7 @@ size_t sources_size(host_span> const sources, // Return estimated size of subchunk using a heuristic involving the byte range size and the minimum // subchunk size -size_t estimate_size_per_subchunk(size_t chunk_size) +std::size_t estimate_size_per_subchunk(std::size_t chunk_size) { auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to @@ -75,13 +75,13 @@ size_t estimate_size_per_subchunk(size_t chunk_size) * * @return size in bytes */ -size_t get_batch_size_upper_bound() +std::size_t get_batch_size_upper_bound() { - auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); - int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; - auto const batch_limit = static_cast(std::numeric_limits::max()); - auto const batch_size_upper_bound = - static_cast((batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); + auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); + int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; + auto const batch_limit = static_cast(std::numeric_limits::max()); + auto const batch_size_upper_bound = static_cast( + (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); return batch_size_upper_bound; } @@ -125,12 +125,12 @@ datasource::owning_buffer> get_record_range_raw_input( { CUDF_FUNC_RANGE(); - size_t const total_source_size = sources_size(sources, 0, 0); + std::size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); compression_type const reader_compression = reader_opts.get_compression(); - size_t const chunk_offset = reader_opts.get_byte_range_offset(); - size_t chunk_size = reader_opts.get_byte_range_size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -138,14 +138,14 @@ datasource::owning_buffer> get_record_range_raw_input( auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; - int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; - size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; + std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. auto constexpr header_size = 4096; - size_t const buffer_size = + std::size_t const buffer_size = reader_compression != compression_type::NONE ? total_source_size * estimated_compression_ratio + header_size : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + @@ -167,8 +167,8 @@ datasource::owning_buffer> get_record_range_raw_input( return datasource::owning_buffer>(std::move(empty_buf)); } else if (!should_load_all_sources) { // Find next delimiter - std::int64_t next_delim_pos = -1; - size_t next_subchunk_start = chunk_offset + chunk_size; + std::int64_t next_delim_pos = -1; + std::size_t next_subchunk_start = chunk_offset + chunk_size; while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) { buffer_offset += readbufspan.size(); readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), @@ -227,8 +227,8 @@ table_with_metadata read_batch(host_span> sources, device_span ingest_raw_input(device_span buffer, host_span> sources, compression_type compression, - size_t range_offset, - size_t range_size, + std::size_t range_offset, + std::size_t range_size, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -238,23 +238,24 @@ device_span ingest_raw_input(device_span buffer, auto constexpr num_delimiter_chars = 1; if (compression == compression_type::NONE) { - auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); - std::vector prefsum_source_sizes(sources.size()); + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); + std::vector prefsum_source_sizes(sources.size()); std::vector> h_buffers; - size_t bytes_read = 0; + std::size_t bytes_read = 0; std::transform_inclusive_scan(sources.begin(), sources.end(), prefsum_source_sizes.begin(), - std::plus{}, + std::plus{}, [](std::unique_ptr const& s) { return s->size(); }); auto upper = std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); - size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; + i++) { if (sources[i]->is_empty()) continue; auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); @@ -333,15 +334,16 @@ table_with_metadata read_json(host_span> sources, * Note that the batched reader does not work for compressed inputs or for regular * JSON inputs. */ - size_t const total_source_size = sources_size(sources, 0, 0); - size_t chunk_offset = reader_opts.get_byte_range_offset(); - size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? total_source_size - chunk_offset - : std::min(chunk_size, total_source_size - chunk_offset); + std::size_t const total_source_size = sources_size(sources, 0, 0); + std::size_t chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); - size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - size_t const batch_size_upper_bound = get_batch_size_upper_bound(); - size_t const batch_size = batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); + std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + std::size_t const batch_size_upper_bound = get_batch_size_upper_bound(); + std::size_t const batch_size = + batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); /* * Identify the position (zero-indexed) of starting source file from which to begin @@ -351,10 +353,10 @@ table_with_metadata read_json(host_span> sources, */ // Prefix sum of source file sizes - size_t pref_source_size = 0; + std::size_t pref_source_size = 0; // Starting source file from which to being batching evaluated using byte range offset - size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { - for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { + std::size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { + for (std::size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } pref_source_size += sources[src_idx]->size(); } @@ -366,10 +368,10 @@ table_with_metadata read_json(host_span> sources, * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading * stops. */ - size_t pref_bytes_size = chunk_offset; - size_t end_bytes_size = chunk_offset + chunk_size; - std::vector batch_offsets{pref_bytes_size}; - for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { + std::size_t pref_bytes_size = chunk_offset; + std::size_t end_bytes_size = chunk_offset + chunk_size; + std::vector batch_offsets{pref_bytes_size}; + for (std::size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { pref_source_size += sources[i]->size(); // If the current source file can subsume multiple batches, we split the file until the // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) @@ -393,7 +395,7 @@ table_with_metadata read_json(host_span> sources, // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (size_t i = 0; i < batch_offsets.size() - 1; i++) { + for (std::size_t i = 0; i < batch_offsets.size() - 1; i++) { batched_reader_opts.set_byte_range_offset(batch_offsets[i]); batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); partial_tables.emplace_back( diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index 04a32ddad8d..e34ab991c11 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -34,11 +34,11 @@ TEST_F(JsonLargeReaderTest, MultiBatch) { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - size_t const batch_size_upper_bound = std::numeric_limits::max() / 16; + std::size_t const batch_size_upper_bound = std::numeric_limits::max() / 16; // set smaller batch_size to reduce file size and execution time setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1); - constexpr size_t expected_file_size = 1.5 * static_cast(batch_size_upper_bound); + constexpr std::size_t expected_file_size = 1.5 * static_cast(batch_size_upper_bound); std::size_t const log_repetitions = static_cast(std::ceil(std::log2(expected_file_size / json_string.size()))); @@ -70,10 +70,10 @@ TEST_F(JsonLargeReaderTest, MultiBatch) datasources.emplace_back(cudf::io::datasource::create(hb)); } // Test for different chunk sizes - std::vector chunk_sizes{batch_size_upper_bound / 4, - batch_size_upper_bound / 2, - batch_size_upper_bound, - static_cast(batch_size_upper_bound * 2)}; + std::vector chunk_sizes{batch_size_upper_bound / 4, + batch_size_upper_bound / 2, + batch_size_upper_bound, + static_cast(batch_size_upper_bound * 2)}; for (auto chunk_size : chunk_sizes) { auto const tables =