diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 0ead5c56264..ea52dce020e 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -50,7 +50,10 @@ size_t sources_size(host_span> const sources, } /** - * @brief Read from array of data sources into RMM buffer + * @brief Read from array of data sources into RMM buffer. The size of the returned device span + can be larger than the number of bytes requested from the list of sources when + the range to be read spans across multiple sources. This is due to the delimiter + characters inserted after the end of each accessed source. * * @param buffer Device span buffer to which data is read * @param sources Array of data sources @@ -72,7 +75,6 @@ device_span ingest_raw_input(device_span buffer, // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line // delimiter. auto constexpr num_delimiter_chars = 1; - auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); if (compression == compression_type::NONE) { std::vector delimiter_map{}; @@ -89,28 +91,29 @@ device_span ingest_raw_input(device_span buffer, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - auto remaining_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); + auto const total_bytes_to_read = + std::min(range_size, prefsum_source_sizes.back() - range_offset); range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (size_t i = start_source; i < sources.size() && remaining_bytes_to_read; i++) { + for (size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { if (sources[i]->is_empty()) continue; - auto data_size = std::min(sources[i]->size() - range_offset, remaining_bytes_to_read); - auto destination = reinterpret_cast(buffer.data()) + bytes_read; + auto data_size = + std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read + + (num_delimiter_chars * delimiter_map.size()); if (sources[i]->is_device_read_preferred(data_size)) { bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); } else { h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); auto const& h_buffer = h_buffers.back(); CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); bytes_read += h_buffer->size(); } range_offset = 0; - remaining_bytes_to_read -= bytes_read; - delimiter_map.push_back(bytes_read); - bytes_read += num_delimiter_chars; + delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); } - // In the case where all sources are empty, bytes_read is zero - if (bytes_read) bytes_read -= num_delimiter_chars; + // Removing delimiter inserted after last non-empty source is read + if (!delimiter_map.empty()) { delimiter_map.pop_back(); } // If this is a multi-file source, we scatter the JSON line delimiters between files if (sources.size() > 1) { @@ -118,9 +121,7 @@ device_span ingest_raw_input(device_span buffer, "Currently only single-character delimiters are supported"); auto const delimiter_source = thrust::make_constant_iterator('\n'); auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - host_span{delimiter_map.data(), delimiter_map.size() - 1}, - stream, - rmm::mr::get_current_device_resource()); + delimiter_map, stream, rmm::mr::get_current_device_resource()); thrust::scatter(rmm::exec_policy_nosync(stream), delimiter_source, delimiter_source + d_delimiter_map.size(), @@ -128,7 +129,7 @@ device_span ingest_raw_input(device_span buffer, buffer.data()); } stream.synchronize(); - return buffer.first(bytes_read); + return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); } // TODO: allow byte range reading from multiple compressed files. auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); @@ -151,17 +152,15 @@ size_type find_first_delimiter_in_chunk(host_span buffer(total_source_size, stream); - ingest_raw_input(buffer, - sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - return find_first_delimiter(buffer, delimiter, stream); + auto readbufspan = ingest_raw_input(buffer, + sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + stream); + return find_first_delimiter(readbufspan, '\n', stream); } /** @@ -195,8 +194,7 @@ datasource::owning_buffer> get_record_range_raw_input( CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting"); auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; - chunk_size = - should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; + chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; // Some magic numbers constexpr int num_subchunks = 10; // per chunk_size @@ -217,7 +215,8 @@ datasource::owning_buffer> get_record_range_raw_input( size_t const buffer_size = reader_compression != compression_type::NONE ? total_source_size * estimated_compression_ratio + header_size - : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk); + : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; rmm::device_uvector buffer(buffer_size, stream); device_span bufspan(buffer); diff --git a/cpp/tests/io/json_chunked_reader.cpp b/cpp/tests/io/json_chunked_reader.cpp index ef69ee5239d..7482cb1b70d 100644 --- a/cpp/tests/io/json_chunked_reader.cpp +++ b/cpp/tests/io/json_chunked_reader.cpp @@ -24,11 +24,19 @@ #include +#include +#include +#include + /** * @brief Base test fixture for JSON reader tests */ struct JsonReaderTest : public cudf::test::BaseFixture {}; +cudf::test::TempDirTestEnvironment* const temp_env = + static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + // function to extract first delimiter in the string in each chunk, // collate together and form byte_range for each chunk, // parse separately. @@ -41,7 +49,6 @@ std::vector skeleton_for_parellel_chunk_reader( { using namespace cudf::io::json::detail; using cudf::size_type; - // assuming single source. size_t total_source_size = 0; for (auto const& source : sources) { total_source_size += source->size(); @@ -77,7 +84,9 @@ std::vector skeleton_for_parellel_chunk_reader( std::vector tables; // Process each chunk in parallel. for (auto const& [chunk_start, chunk_end] : record_ranges) { - if (chunk_start == -1 or chunk_end == -1) continue; + if (chunk_start == -1 or chunk_end == -1 or + static_cast(chunk_start) >= total_source_size) + continue; reader_opts_chunk.set_byte_range_offset(chunk_start); reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); tables.push_back(read_json(sources, reader_opts_chunk, stream, mr)); @@ -87,7 +96,7 @@ std::vector skeleton_for_parellel_chunk_reader( return tables; } -TEST_F(JsonReaderTest, ByteRange) +TEST_F(JsonReaderTest, ByteRange_SingleSource) { std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } @@ -126,3 +135,98 @@ TEST_F(JsonReaderTest, ByteRange) CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); } } + +TEST_F(JsonReaderTest, ReadCompleteFiles) +{ + std::string const json_string = R"( + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + { + std::ofstream outfile(filename, std::ofstream::out); + outfile << json_string; + } + + constexpr int num_sources = 5; + std::vector filepaths(num_sources, filename); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + std::vector part_tables; + for (auto filepath : filepaths) { + cudf::io::json_reader_options part_in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + + part_tables.push_back(cudf::io::read_json(part_in_options)); + } + + auto part_table_views = std::vector(part_tables.size()); + std::transform(part_tables.begin(), part_tables.end(), part_table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + + auto expected_result = cudf::concatenate(part_table_views); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result.tbl->view(), expected_result->view()); +} + +TEST_F(JsonReaderTest, ByteRange_MultiSource) +{ + std::string const json_string = R"( + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + { + std::ofstream outfile(filename, std::ofstream::out); + outfile << json_string; + } + + constexpr int num_sources = 5; + std::vector filepaths(num_sources, filename); + + // Initialize parsing options (reading json lines) + cudf::io::json_reader_options json_lines_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) + .lines(true) + .compression(cudf::io::compression_type::NONE) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + + // Read full test data via existing, nested JSON lines reader + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + + auto file_paths = json_lines_options.get_source().filepaths(); + std::vector> datasources; + for (auto& fp : file_paths) { + datasources.emplace_back(cudf::io::datasource::create(fp)); + } + + // Test for different chunk sizes + for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { + auto const tables = skeleton_for_parellel_chunk_reader(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + + auto table_views = std::vector(tables.size()); + std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + auto result = cudf::concatenate(table_views); + + // Verify that the data read via chunked reader matches the data read via nested JSON reader + // cannot use EQUAL due to concatenate removing null mask + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); + } +}