Skip to content

Commit

Permalink
Fix multi-source reading in JSON byte range reader (#15671)
Browse files Browse the repository at this point in the history
This PR fixes the number of bytes read and corrects the offsets for the delimiters added to the buffer when reading across multiple sources.

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #15671
  • Loading branch information
shrshi authored May 10, 2024
1 parent c3f3409 commit b5a9c4b
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 32 deletions.
57 changes: 28 additions & 29 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ size_t sources_size(host_span<std::unique_ptr<datasource>> const sources,
}

/**
* @brief Read from array of data sources into RMM buffer
* @brief Read from array of data sources into RMM buffer. The size of the returned device span
can be larger than the number of bytes requested from the list of sources when
the range to be read spans across multiple sources. This is due to the delimiter
characters inserted after the end of each accessed source.
*
* @param buffer Device span buffer to which data is read
* @param sources Array of data sources
Expand All @@ -72,7 +75,6 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1);

if (compression == compression_type::NONE) {
std::vector<size_type> delimiter_map{};
Expand All @@ -89,46 +91,45 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset);
size_t start_source = std::distance(prefsum_source_sizes.begin(), upper);

auto remaining_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
auto const total_bytes_to_read =
std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (size_t i = start_source; i < sources.size() && remaining_bytes_to_read; i++) {
for (size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, remaining_bytes_to_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read;
auto data_size =
std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
if (sources[i]->is_device_read_preferred(data_size)) {
bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
CUDF_CUDA_TRY(cudaMemcpyAsync(
destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value()));
destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value()));
bytes_read += h_buffer->size();
}
range_offset = 0;
remaining_bytes_to_read -= bytes_read;
delimiter_map.push_back(bytes_read);
bytes_read += num_delimiter_chars;
delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size()));
}
// In the case where all sources are empty, bytes_read is zero
if (bytes_read) bytes_read -= num_delimiter_chars;
// Removing delimiter inserted after last non-empty source is read
if (!delimiter_map.empty()) { delimiter_map.pop_back(); }

// If this is a multi-file source, we scatter the JSON line delimiters between files
if (sources.size() > 1) {
static_assert(num_delimiter_chars == 1,
"Currently only single-character delimiters are supported");
auto const delimiter_source = thrust::make_constant_iterator('\n');
auto const d_delimiter_map = cudf::detail::make_device_uvector_async(
host_span<size_type const>{delimiter_map.data(), delimiter_map.size() - 1},
stream,
rmm::mr::get_current_device_resource());
delimiter_map, stream, rmm::mr::get_current_device_resource());
thrust::scatter(rmm::exec_policy_nosync(stream),
delimiter_source,
delimiter_source + d_delimiter_map.size(),
d_delimiter_map.data(),
buffer.data());
}
stream.synchronize();
return buffer.first(bytes_read);
return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}
// TODO: allow byte range reading from multiple compressed files.
auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset);
Expand All @@ -151,17 +152,15 @@ size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::data
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const total_source_size =
sources_size(sources, reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size()) +
(sources.size() - 1);
auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1);
rmm::device_uvector<char> buffer(total_source_size, stream);
ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(buffer, delimiter, stream);
auto readbufspan = ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(readbufspan, '\n', stream);
}

/**
Expand Down Expand Up @@ -195,8 +194,7 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting");
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size =
should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
Expand All @@ -217,7 +215,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
size_t const buffer_size =
reader_compression != compression_type::NONE
? total_source_size * estimated_compression_ratio + header_size
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk);
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) +
num_extra_delimiters;
rmm::device_uvector<char> buffer(buffer_size, stream);
device_span<char> bufspan(buffer);

Expand Down
110 changes: 107 additions & 3 deletions cpp/tests/io/json_chunked_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@

#include <rmm/resource_ref.hpp>

#include <fstream>
#include <string>
#include <vector>

/**
* @brief Base test fixture for JSON reader tests
*/
struct JsonReaderTest : public cudf::test::BaseFixture {};

cudf::test::TempDirTestEnvironment* const temp_env =
static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
Expand All @@ -41,7 +49,6 @@ std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
{
using namespace cudf::io::json::detail;
using cudf::size_type;
// assuming single source.
size_t total_source_size = 0;
for (auto const& source : sources) {
total_source_size += source->size();
Expand Down Expand Up @@ -77,7 +84,9 @@ std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const& [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1) continue;
if (chunk_start == -1 or chunk_end == -1 or
static_cast<size_t>(chunk_start) >= total_source_size)
continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
Expand All @@ -87,7 +96,7 @@ std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
return tables;
}

TEST_F(JsonReaderTest, ByteRange)
TEST_F(JsonReaderTest, ByteRange_SingleSource)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
Expand Down Expand Up @@ -126,3 +135,98 @@ TEST_F(JsonReaderTest, ByteRange)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}

TEST_F(JsonReaderTest, ReadCompleteFiles)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json";
{
std::ofstream outfile(filename, std::ofstream::out);
outfile << json_string;
}

constexpr int num_sources = 5;
std::vector<std::string> filepaths(num_sources, filename);

cudf::io::json_reader_options in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths})
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

cudf::io::table_with_metadata result = cudf::io::read_json(in_options);

std::vector<cudf::io::table_with_metadata> part_tables;
for (auto filepath : filepaths) {
cudf::io::json_reader_options part_in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepath})
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

part_tables.push_back(cudf::io::read_json(part_in_options));
}

auto part_table_views = std::vector<cudf::table_view>(part_tables.size());
std::transform(part_tables.begin(), part_tables.end(), part_table_views.begin(), [](auto& table) {
return table.tbl->view();
});

auto expected_result = cudf::concatenate(part_table_views);

CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result.tbl->view(), expected_result->view());
}

TEST_F(JsonReaderTest, ByteRange_MultiSource)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json";
{
std::ofstream outfile(filename, std::ofstream::out);
outfile << json_string;
}

constexpr int num_sources = 5;
std::vector<std::string> filepaths(num_sources, filename);

// Initialize parsing options (reading json lines)
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths})
.lines(true)
.compression(cudf::io::compression_type::NONE)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options);

auto file_paths = json_lines_options.get_source().filepaths();
std::vector<std::unique_ptr<cudf::io::datasource>> datasources;
for (auto& fp : file_paths) {
datasources.emplace_back(cudf::io::datasource::create(fp));
}

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
return table.tbl->view();
});
auto result = cudf::concatenate(table_views);

// Verify that the data read via chunked reader matches the data read via nested JSON reader
// cannot use EQUAL due to concatenate removing null mask
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}

0 comments on commit b5a9c4b

Please sign in to comment.