Skip to content

Commit

Permalink
pr reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
shrshi committed Aug 12, 2024
1 parent dfcc657 commit 82ea28f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 49 deletions.
88 changes: 45 additions & 43 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ namespace cudf::io::json::detail {
namespace {

// Return total size of sources enclosing the passed range
size_t sources_size(host_span<std::unique_ptr<datasource>> const sources,
size_t range_offset,
size_t range_size)
std::size_t sources_size(host_span<std::unique_ptr<datasource>> const sources,
std::size_t range_offset,
std::size_t range_size)
{
return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) {
return std::accumulate(sources.begin(), sources.end(), 0ul, [=](std::size_t sum, auto& source) {
auto const size = source->size();
// TODO take care of 0, 0, or *, 0 case.
return sum +
Expand All @@ -56,7 +56,7 @@ size_t sources_size(host_span<std::unique_ptr<datasource>> const sources,

// Return estimated size of subchunk using a heuristic involving the byte range size and the minimum
// subchunk size
size_t estimate_size_per_subchunk(size_t chunk_size)
std::size_t estimate_size_per_subchunk(std::size_t chunk_size)
{
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };
// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
Expand All @@ -75,13 +75,13 @@ size_t estimate_size_per_subchunk(size_t chunk_size)
*
* @return size in bytes
*/
size_t get_batch_size_upper_bound()
std::size_t get_batch_size_upper_bound()
{
auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE");
int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L;
auto const batch_limit = static_cast<int64_t>(std::numeric_limits<int32_t>::max());
auto const batch_size_upper_bound =
static_cast<size_t>((batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit);
auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE");
int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L;
auto const batch_limit = static_cast<int64_t>(std::numeric_limits<int32_t>::max());
auto const batch_size_upper_bound = static_cast<std::size_t>(
(batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit);
return batch_size_upper_bound;
}

Expand Down Expand Up @@ -125,27 +125,27 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
{
CUDF_FUNC_RANGE();

size_t const total_source_size = sources_size(sources, 0, 0);
std::size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1);
compression_type const reader_compression = reader_opts.get_compression();
size_t const chunk_offset = reader_opts.get_byte_range_offset();
size_t chunk_size = reader_opts.get_byte_range_size();
std::size_t const chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
// compression ratio. For uncompressed inputs, we can getter a better estimate using the idea
// of subchunks.
auto constexpr header_size = 4096;
size_t const buffer_size =
std::size_t const buffer_size =
reader_compression != compression_type::NONE
? total_source_size * estimated_compression_ratio + header_size
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) +
Expand All @@ -167,8 +167,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
return datasource::owning_buffer<rmm::device_uvector<char>>(std::move(empty_buf));
} else if (!should_load_all_sources) {
// Find next delimiter
std::int64_t next_delim_pos = -1;
size_t next_subchunk_start = chunk_offset + chunk_size;
std::int64_t next_delim_pos = -1;
std::size_t next_subchunk_start = chunk_offset + chunk_size;
while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) {
buffer_offset += readbufspan.size();
readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset),
Expand Down Expand Up @@ -227,8 +227,8 @@ table_with_metadata read_batch(host_span<std::unique_ptr<datasource>> sources,
device_span<char> ingest_raw_input(device_span<char> buffer,
host_span<std::unique_ptr<datasource>> sources,
compression_type compression,
size_t range_offset,
size_t range_size,
std::size_t range_offset,
std::size_t range_size,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
Expand All @@ -238,23 +238,24 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
auto constexpr num_delimiter_chars = 1;

if (compression == compression_type::NONE) {
auto delimiter_map = cudf::detail::make_empty_host_vector<size_t>(sources.size(), stream);
std::vector<size_t> prefsum_source_sizes(sources.size());
auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t>(sources.size(), stream);
std::vector<std::size_t> prefsum_source_sizes(sources.size());
std::vector<std::unique_ptr<datasource::buffer>> h_buffers;
size_t bytes_read = 0;
std::size_t bytes_read = 0;
std::transform_inclusive_scan(sources.begin(),
sources.end(),
prefsum_source_sizes.begin(),
std::plus<size_t>{},
std::plus<std::size_t>{},
[](std::unique_ptr<datasource> const& s) { return s->size(); });
auto upper =
std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset);
size_t start_source = std::distance(prefsum_source_sizes.begin(), upper);
std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper);

auto const total_bytes_to_read =
std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read;
i++) {
if (sources[i]->is_empty()) continue;
auto data_size =
std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
Expand Down Expand Up @@ -333,15 +334,16 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
* Note that the batched reader does not work for compressed inputs or for regular
* JSON inputs.
*/
size_t const total_source_size = sources_size(sources, 0, 0);
size_t chunk_offset = reader_opts.get_byte_range_offset();
size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);
std::size_t const total_source_size = sources_size(sources, 0, 0);
std::size_t chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);

size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
size_t const batch_size_upper_bound = get_batch_size_upper_bound();
size_t const batch_size = batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk);
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
std::size_t const batch_size_upper_bound = get_batch_size_upper_bound();
std::size_t const batch_size =
batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk);

/*
* Identify the position (zero-indexed) of starting source file from which to begin
Expand All @@ -351,10 +353,10 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
*/

// Prefix sum of source file sizes
size_t pref_source_size = 0;
std::size_t pref_source_size = 0;
// Starting source file from which to being batching evaluated using byte range offset
size_t const start_source = [chunk_offset, &sources, &pref_source_size]() {
for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) {
std::size_t const start_source = [chunk_offset, &sources, &pref_source_size]() {
for (std::size_t src_idx = 0; src_idx < sources.size(); ++src_idx) {
if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; }
pref_source_size += sources[src_idx]->size();
}
Expand All @@ -366,10 +368,10 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
* batch begins, and `end_bytes_size` gives the terminal bytes position after which reading
* stops.
*/
size_t pref_bytes_size = chunk_offset;
size_t end_bytes_size = chunk_offset + chunk_size;
std::vector<size_t> batch_offsets{pref_bytes_size};
for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) {
std::size_t pref_bytes_size = chunk_offset;
std::size_t end_bytes_size = chunk_offset + chunk_size;
std::vector<std::size_t> batch_offsets{pref_bytes_size};
for (std::size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) {
pref_source_size += sources[i]->size();
// If the current source file can subsume multiple batches, we split the file until the
// boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`)
Expand All @@ -393,7 +395,7 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
// Dispatch individual batches to read_batch and push the resulting table into
// partial_tables array. Note that the reader options need to be updated for each
// batch to adjust byte range offset and byte range size.
for (size_t i = 0; i < batch_offsets.size() - 1; i++) {
for (std::size_t i = 0; i < batch_offsets.size() - 1; i++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[i]);
batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]);
partial_tables.emplace_back(
Expand Down
12 changes: 6 additions & 6 deletions cpp/tests/large_strings/json_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ TEST_F(JsonLargeReaderTest, MultiBatch)
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";

size_t const batch_size_upper_bound = std::numeric_limits<int32_t>::max() / 16;
std::size_t const batch_size_upper_bound = std::numeric_limits<int32_t>::max() / 16;
// set smaller batch_size to reduce file size and execution time
setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1);

constexpr size_t expected_file_size = 1.5 * static_cast<double>(batch_size_upper_bound);
constexpr std::size_t expected_file_size = 1.5 * static_cast<double>(batch_size_upper_bound);
std::size_t const log_repetitions =
static_cast<std::size_t>(std::ceil(std::log2(expected_file_size / json_string.size())));

Expand Down Expand Up @@ -70,10 +70,10 @@ TEST_F(JsonLargeReaderTest, MultiBatch)
datasources.emplace_back(cudf::io::datasource::create(hb));
}
// Test for different chunk sizes
std::vector<size_t> chunk_sizes{batch_size_upper_bound / 4,
batch_size_upper_bound / 2,
batch_size_upper_bound,
static_cast<size_t>(batch_size_upper_bound * 2)};
std::vector<std::size_t> chunk_sizes{batch_size_upper_bound / 4,
batch_size_upper_bound / 2,
batch_size_upper_bound,
static_cast<std::size_t>(batch_size_upper_bound * 2)};

for (auto chunk_size : chunk_sizes) {
auto const tables =
Expand Down

0 comments on commit 82ea28f

Please sign in to comment.