From cc4ab533d18bf8914562a8a76820db0453d4c247 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Sep 2024 14:28:18 -0700 Subject: [PATCH 1/9] works --- cpp/src/io/csv/reader_impl.cu | 244 ++++++++++++++++++++-------------- 1 file changed, 146 insertions(+), 98 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index ebca334a715..2c55cbc504e 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -46,11 +46,8 @@ #include #include -#include #include -#include #include -#include #include #include #include @@ -196,15 +193,10 @@ void erase_except_last(C& container, rmm::cuda_stream_view stream) container.resize(1, stream); } -size_t find_first_row_start(char row_terminator, host_span data) +constexpr std::array UTF8_BOM = {0xEF, 0xBB, 0xBF}; +[[nodiscard]] bool has_utf8_bom(host_span data) { - // For now, look for the first terminator (assume the first terminator isn't within a quote) - // TODO: Attempt to infer this from the data - size_t pos = 0; - while (pos < data.size() && data[pos] != row_terminator) { - ++pos; - } - return std::min(pos + 1, data.size()); + return data.size() >= 3 && memcmp(data.data(), UTF8_BOM.data(), 3) == 0; } /** @@ -213,20 +205,30 @@ size_t find_first_row_start(char row_terminator, host_span data) * This function scans the input data to record the row offsets (relative to the start of the * input data). A row is actually the data/offset between two termination symbols. * - * @param data Uncompressed input data in host memory - * @param range_begin Only include rows starting after this position - * @param range_end Only include rows starting before this position - * @param skip_rows Number of rows to skip from the start - * @param num_rows Number of rows to read; -1: all remaining data - * @param load_whole_file Hint that the entire data will be needed on gpu - * @param stream CUDA stream used for device memory operations and kernel launches - * @return Input data and row offsets in the device memory + * @param[in] source The source data (may be compressed) + * @param[in] reader_opts Settings for controlling reading behavior + * @param[in] parse_opts Settings for controlling parsing behavior + * @param[out] header The header row, if any + * @param[in] data Host buffer containing uncompressed data, if input is compressed + * @param[in] has_bom Indicates if the data has a BOM + * @param[in] byte_range_offset Offset of the byte range + * @param[in] range_begin Start of the first row, relative to the byte range start + * @param[in] range_end End of the data to read, relative to the byte range start; equal to the + * data size if all data after byte_range_offset needs to be read + * @param[in] skip_rows Number of rows to skip from the start + * @param[in] num_rows Number of rows to read; -1 means all + * @param[in] load_whole_file Indicates if the whole file should be read + * @param[in] stream CUDA stream used for device memory operations and kernel launches + * @return Input data and row offsets in the device memory */ std::pair, selected_rows_offsets> load_data_and_gather_row_offsets( + cudf::io::datasource* source, csv_reader_options const& reader_opts, parse_options const& parse_opts, std::vector& header, - host_span data, + std::optional> data, + bool has_bom, + size_t byte_range_offset, size_t range_begin, size_t range_end, size_t skip_rows, @@ -235,35 +237,54 @@ std::pair, selected_rows_offsets> load_data_and_gather rmm::cuda_stream_view stream) { constexpr size_t max_chunk_bytes = 64 * 1024 * 1024; // 64MB - size_t buffer_size = std::min(max_chunk_bytes, data.size()); - size_t max_blocks = + + auto const data_size = data.has_value() ? data->size() : source->size(); + auto const range_size_padded = + std::min(reader_opts.get_byte_range_size_with_padding(), data_size - byte_range_offset); + auto const max_read_size = std::min(range_size_padded ? range_size_padded : data_size, + data_size - byte_range_offset); + size_t const buffer_size = std::min(max_chunk_bytes, data_size); + size_t const max_blocks = std::max((buffer_size / cudf::io::csv::gpu::rowofs_block_bytes) + 1, 2); + cudf::detail::hostdevice_vector row_ctx(max_blocks, stream); - size_t buffer_pos = std::min(range_begin - std::min(range_begin, sizeof(char)), data.size()); - size_t pos = std::min(range_begin, data.size()); + size_t pos = has_bom ? sizeof(UTF8_BOM) : range_begin; + // Need the line terminator of last line before the byte range + size_t buffer_pos = range_begin != 0 ? range_begin - 1 : pos; size_t header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0; uint64_t ctx = 0; // For compatibility with the previous parser, a row is considered in-range if the // previous row terminator is within the given range - range_end += (range_end < data.size()); + range_end += (range_end < data_size); - // Reserve memory by allocating and then resetting the size - rmm::device_uvector d_data{ - (load_whole_file) ? data.size() : std::min(buffer_size * 2, data.size()), stream}; - d_data.resize(0, stream); + rmm::device_uvector d_data{0, stream}; + d_data.reserve((load_whole_file) ? data_size : std::min(buffer_size * 2, max_read_size), stream); rmm::device_uvector all_row_offsets{0, stream}; do { - size_t target_pos = std::min(pos + max_chunk_bytes, data.size()); + size_t target_pos = std::min(pos + max_chunk_bytes, max_read_size); size_t chunk_size = target_pos - pos; auto const previous_data_size = d_data.size(); d_data.resize(target_pos - buffer_pos, stream); - CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.begin() + previous_data_size, - data.begin() + buffer_pos + previous_data_size, - target_pos - buffer_pos - previous_data_size, - cudaMemcpyDefault, - stream.value())); + if (data.has_value()) { + CUDF_CUDA_TRY( + cudaMemcpyAsync(d_data.begin() + previous_data_size, + data->begin() + byte_range_offset + buffer_pos + previous_data_size, + target_pos - buffer_pos - previous_data_size, + cudaMemcpyDefault, + stream.value())); + } else { + // TODO use device_read + auto const buffer = source->host_read(buffer_pos + byte_range_offset + previous_data_size, + target_pos - buffer_pos - previous_data_size); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.begin() + previous_data_size, + buffer->data(), + buffer->size(), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + } // Pass 1: Count the potential number of rows in each character block for each // possible parser state at the beginning of the block. @@ -274,7 +295,7 @@ std::pair, selected_rows_offsets> load_data_and_gather chunk_size, pos, buffer_pos, - data.size(), + max_read_size, range_begin, range_end, skip_rows, @@ -313,13 +334,13 @@ std::pair, selected_rows_offsets> load_data_and_gather chunk_size, pos, buffer_pos, - data.size(), + max_read_size, range_begin, range_end, skip_rows, stream); // With byte range, we want to keep only one row out of the specified range - if (range_end < data.size()) { + if (range_end < data_size) { CUDF_CUDA_TRY(cudaMemcpyAsync(row_ctx.host_ptr(), row_ctx.device_ptr(), num_blocks * sizeof(uint64_t), @@ -360,7 +381,7 @@ std::pair, selected_rows_offsets> load_data_and_gather } } pos = target_pos; - } while (pos < data.size()); + } while (pos < max_read_size); auto const non_blank_row_offsets = io::csv::gpu::remove_blank_rows(parse_opts.view(), d_data, all_row_offsets, stream); @@ -378,9 +399,18 @@ std::pair, selected_rows_offsets> load_data_and_gather auto const header_start = buffer_pos + row_ctx[0]; auto const header_end = buffer_pos + row_ctx[1]; - CUDF_EXPECTS(header_start <= header_end && header_end <= data.size(), + CUDF_EXPECTS(header_start <= header_end && header_end <= max_read_size, "Invalid csv header location"); - header.assign(data.begin() + header_start, data.begin() + header_end); + header.resize(header_end - header_start); + if (data.has_value()) { + std::copy(data->begin() + byte_range_offset + header_start, + data->begin() + byte_range_offset + header_end, + header.begin()); + } else { + source->host_read(header_start + byte_range_offset, + header_end - header_start, + reinterpret_cast(header.data())); + } if (header_rows > 0) { row_offsets.erase_first_n(header_rows); } } // Apply num_rows limit @@ -397,73 +427,91 @@ std::pair, selected_rows_offsets> select_data_and_row_ parse_options const& parse_opts, rmm::cuda_stream_view stream) { - auto range_offset = reader_opts.get_byte_range_offset(); - auto range_size = reader_opts.get_byte_range_size(); - auto range_size_padded = reader_opts.get_byte_range_size_with_padding(); - auto skip_rows = reader_opts.get_skiprows(); - auto skip_end_rows = reader_opts.get_skipfooter(); - auto num_rows = reader_opts.get_nrows(); + auto range_offset = reader_opts.get_byte_range_offset(); + auto range_size = reader_opts.get_byte_range_size(); + auto skip_rows = reader_opts.get_skiprows(); + auto skip_end_rows = reader_opts.get_skipfooter(); + auto num_rows = reader_opts.get_nrows(); if (range_offset > 0 || range_size > 0) { CUDF_EXPECTS(reader_opts.get_compression() == compression_type::NONE, "Reading compressed data using `byte range` is unsupported"); } + // TODO: Allow parsing the header outside the mapped range + CUDF_EXPECTS((range_offset == 0 || reader_opts.get_header() < 0), + "byte_range offset with header not supported"); - // Transfer source data to GPU - if (!source->is_empty()) { - auto buffer = - source->host_read(range_offset, range_size_padded != 0 ? range_size_padded : source->size()); - auto h_data = - host_span(reinterpret_cast(buffer->data()), buffer->size()); - - std::vector h_uncomp_data_owner; - if (reader_opts.get_compression() != compression_type::NONE) { - h_uncomp_data_owner = - decompress(reader_opts.get_compression(), {buffer->data(), buffer->size()}); - h_data = {reinterpret_cast(h_uncomp_data_owner.data()), - h_uncomp_data_owner.size()}; - buffer.reset(); - } + if (source->is_empty()) { + return {rmm::device_uvector{0, stream}, selected_rows_offsets{stream}}; + } - // check for and skip UTF-8 BOM - uint8_t const UTF8_BOM[] = {0xEF, 0xBB, 0xBF}; - if (h_data.size() >= sizeof(UTF8_BOM) && - memcmp(h_data.data(), UTF8_BOM, sizeof(UTF8_BOM)) == 0) { - h_data = h_data.subspan(sizeof(UTF8_BOM), h_data.size() - sizeof(UTF8_BOM)); - } + std::optional> h_data; + std::vector h_uncomp_data_owner; + if (reader_opts.get_compression() != compression_type::NONE) { + auto const h_comp_data = source->host_read(0, source->size()); + h_uncomp_data_owner = + decompress(reader_opts.get_compression(), {h_comp_data->data(), h_comp_data->size()}); + h_data = host_span{reinterpret_cast(h_uncomp_data_owner.data()), + h_uncomp_data_owner.size()}; + } - // None of the parameters for row selection is used, we are parsing the entire file - bool const load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 && - skip_end_rows <= 0 && num_rows == -1; - - // With byte range, find the start of the first data row - size_t const data_start_offset = - (range_offset != 0) ? find_first_row_start(parse_opts.terminator, h_data) : 0; - - // TODO: Allow parsing the header outside the mapped range - CUDF_EXPECTS((range_offset == 0 || reader_opts.get_header() < 0), - "byte_range offset with header not supported"); - - // Gather row offsets - auto data_row_offsets = - load_data_and_gather_row_offsets(reader_opts, - parse_opts, - header, - h_data, - data_start_offset, - (range_size) ? range_size : h_data.size(), - (skip_rows > 0) ? skip_rows : 0, - num_rows, - load_whole_file, - stream); - auto& row_offsets = data_row_offsets.second; - // Exclude the rows that are to be skipped from the end - if (skip_end_rows > 0 && static_cast(skip_end_rows) < row_offsets.size()) { - row_offsets.shrink(row_offsets.size() - skip_end_rows); + size_t data_start_offset = range_offset; + bool has_bom = false; + if (h_data.has_value()) { + has_bom = has_utf8_bom(*h_data); + } else { + if (range_offset == 0) { + auto bom_buffer = source->host_read(0, std::min(source->size(), sizeof(UTF8_BOM))); + auto bom_chars = host_span{reinterpret_cast(bom_buffer->data()), + bom_buffer->size()}; + has_bom = has_utf8_bom(bom_chars); + } else { + constexpr auto find_data_start_chunk_size = 4ul * 1024; + + while (data_start_offset < source->size()) { + auto const read_size = + std::min(find_data_start_chunk_size, source->size() - data_start_offset); + auto buffer = source->host_read(data_start_offset, read_size); + auto buffer_chars = + host_span{reinterpret_cast(buffer->data()), buffer->size()}; + + if (auto first_row_start = + std::find(buffer_chars.begin(), buffer_chars.end(), parse_opts.terminator); + first_row_start != buffer_chars.end()) { + data_start_offset += std::distance(buffer_chars.begin(), first_row_start) + 1; + break; + } + data_start_offset += read_size; + } } - return data_row_offsets; } - return {rmm::device_uvector{0, stream}, selected_rows_offsets{stream}}; + + // None of the parameters for row selection is used, we are parsing the entire file + bool const load_whole_file = + range_offset == 0 && range_size == 0 && skip_rows <= 0 && skip_end_rows <= 0 && num_rows == -1; + + // Transfer source data to GPU and gather row offsets + auto const uncomp_size = h_data.has_value() ? h_data->size() : source->size(); + auto data_row_offsets = load_data_and_gather_row_offsets(source, + reader_opts, + parse_opts, + header, + h_data, + has_bom, + range_offset, + data_start_offset - range_offset, + (range_size) ? range_size : uncomp_size, + (skip_rows > 0) ? skip_rows : 0, + num_rows, + load_whole_file, + stream); + auto& row_offsets = data_row_offsets.second; + // Exclude the rows that are to be skipped from the end + if (skip_end_rows > 0 && static_cast(skip_end_rows) < row_offsets.size()) { + row_offsets.shrink(row_offsets.size() - skip_end_rows); + } + + return data_row_offsets; } void select_data_types(host_span user_dtypes, From 879d450d8162a7b16f084d65a5cbc26df0f32fd1 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Sep 2024 16:54:26 -0700 Subject: [PATCH 2/9] mild clean up --- cpp/src/io/csv/reader_impl.cu | 69 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 2c55cbc504e..d1c5ef17b28 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -210,7 +210,6 @@ constexpr std::array UTF8_BOM = {0xEF, 0xBB, 0xBF}; * @param[in] parse_opts Settings for controlling parsing behavior * @param[out] header The header row, if any * @param[in] data Host buffer containing uncompressed data, if input is compressed - * @param[in] has_bom Indicates if the data has a BOM * @param[in] byte_range_offset Offset of the byte range * @param[in] range_begin Start of the first row, relative to the byte range start * @param[in] range_end End of the data to read, relative to the byte range start; equal to the @@ -227,7 +226,6 @@ std::pair, selected_rows_offsets> load_data_and_gather parse_options const& parse_opts, std::vector& header, std::optional> data, - bool has_bom, size_t byte_range_offset, size_t range_begin, size_t range_end, @@ -238,19 +236,20 @@ std::pair, selected_rows_offsets> load_data_and_gather { constexpr size_t max_chunk_bytes = 64 * 1024 * 1024; // 64MB - auto const data_size = data.has_value() ? data->size() : source->size(); - auto const range_size_padded = - std::min(reader_opts.get_byte_range_size_with_padding(), data_size - byte_range_offset); - auto const max_read_size = std::min(range_size_padded ? range_size_padded : data_size, - data_size - byte_range_offset); - size_t const buffer_size = std::min(max_chunk_bytes, data_size); - size_t const max_blocks = - std::max((buffer_size / cudf::io::csv::gpu::rowofs_block_bytes) + 1, 2); + auto const data_size = data.has_value() ? data->size() : source->size(); + size_t const buffer_size = std::min(max_chunk_bytes, data_size); + auto const max_input_size = [&]() { + if (range_end == data_size) { + return data_size - byte_range_offset; + } else { + return std::min(reader_opts.get_byte_range_size_with_padding(), + data_size - byte_range_offset); + } + }(); - cudf::detail::hostdevice_vector row_ctx(max_blocks, stream); - size_t pos = has_bom ? sizeof(UTF8_BOM) : range_begin; - // Need the line terminator of last line before the byte range - size_t buffer_pos = range_begin != 0 ? range_begin - 1 : pos; + size_t pos = range_begin; + // When using byta range, need the line terminator of last line before the range + size_t input_pos = byte_range_offset == 0 ? pos : pos - 1; size_t header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0; uint64_t ctx = 0; @@ -259,25 +258,29 @@ std::pair, selected_rows_offsets> load_data_and_gather range_end += (range_end < data_size); rmm::device_uvector d_data{0, stream}; - d_data.reserve((load_whole_file) ? data_size : std::min(buffer_size * 2, max_read_size), stream); + d_data.reserve((load_whole_file) ? data_size : std::min(buffer_size * 2, max_input_size), stream); rmm::device_uvector all_row_offsets{0, stream}; + + size_t const max_blocks = + std::max((buffer_size / cudf::io::csv::gpu::rowofs_block_bytes) + 1, 2); + cudf::detail::hostdevice_vector row_ctx(max_blocks, stream); do { - size_t target_pos = std::min(pos + max_chunk_bytes, max_read_size); + size_t target_pos = std::min(pos + max_chunk_bytes, max_input_size); size_t chunk_size = target_pos - pos; auto const previous_data_size = d_data.size(); - d_data.resize(target_pos - buffer_pos, stream); + d_data.resize(target_pos - input_pos, stream); if (data.has_value()) { CUDF_CUDA_TRY( cudaMemcpyAsync(d_data.begin() + previous_data_size, - data->begin() + byte_range_offset + buffer_pos + previous_data_size, - target_pos - buffer_pos - previous_data_size, + data->begin() + byte_range_offset + input_pos + previous_data_size, + target_pos - input_pos - previous_data_size, cudaMemcpyDefault, stream.value())); } else { // TODO use device_read - auto const buffer = source->host_read(buffer_pos + byte_range_offset + previous_data_size, - target_pos - buffer_pos - previous_data_size); + auto const buffer = source->host_read(input_pos + byte_range_offset + previous_data_size, + target_pos - input_pos - previous_data_size); CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.begin() + previous_data_size, buffer->data(), buffer->size(), @@ -294,8 +297,8 @@ std::pair, selected_rows_offsets> load_data_and_gather d_data, chunk_size, pos, - buffer_pos, - max_read_size, + input_pos, + max_input_size, range_begin, range_end, skip_rows, @@ -333,8 +336,8 @@ std::pair, selected_rows_offsets> load_data_and_gather d_data, chunk_size, pos, - buffer_pos, - max_read_size, + input_pos, + max_input_size, range_begin, range_end, skip_rows, @@ -377,11 +380,11 @@ std::pair, selected_rows_offsets> load_data_and_gather size_t discard_bytes = std::max(d_data.size(), sizeof(char)) - sizeof(char); if (discard_bytes != 0) { erase_except_last(d_data, stream); - buffer_pos += discard_bytes; + input_pos += discard_bytes; } } pos = target_pos; - } while (pos < max_read_size); + } while (pos < max_input_size); auto const non_blank_row_offsets = io::csv::gpu::remove_blank_rows(parse_opts.view(), d_data, all_row_offsets, stream); @@ -397,9 +400,9 @@ std::pair, selected_rows_offsets> load_data_and_gather stream.value())); stream.synchronize(); - auto const header_start = buffer_pos + row_ctx[0]; - auto const header_end = buffer_pos + row_ctx[1]; - CUDF_EXPECTS(header_start <= header_end && header_end <= max_read_size, + auto const header_start = input_pos + row_ctx[0]; + auto const header_end = input_pos + row_ctx[1]; + CUDF_EXPECTS(header_start <= header_end && header_end <= max_input_size, "Invalid csv header location"); header.resize(header_end - header_start); if (data.has_value()) { @@ -456,15 +459,14 @@ std::pair, selected_rows_offsets> select_data_and_row_ } size_t data_start_offset = range_offset; - bool has_bom = false; if (h_data.has_value()) { - has_bom = has_utf8_bom(*h_data); + if (has_utf8_bom(*h_data)) { data_start_offset += sizeof(UTF8_BOM); } } else { if (range_offset == 0) { auto bom_buffer = source->host_read(0, std::min(source->size(), sizeof(UTF8_BOM))); auto bom_chars = host_span{reinterpret_cast(bom_buffer->data()), bom_buffer->size()}; - has_bom = has_utf8_bom(bom_chars); + if (has_utf8_bom(bom_chars)) { data_start_offset += sizeof(UTF8_BOM); } } else { constexpr auto find_data_start_chunk_size = 4ul * 1024; @@ -497,7 +499,6 @@ std::pair, selected_rows_offsets> select_data_and_row_ parse_opts, header, h_data, - has_bom, range_offset, data_start_offset - range_offset, (range_size) ? range_size : uncomp_size, From b7b49355b1ba9e35b70dc5e85f4d8696eafbf000 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Sep 2024 17:30:48 -0700 Subject: [PATCH 3/9] bit more clean up --- cpp/src/io/csv/reader_impl.cu | 46 +++++++++++++++++------------------ 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index d1c5ef17b28..38fc709a0ad 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -237,7 +237,7 @@ std::pair, selected_rows_offsets> load_data_and_gather constexpr size_t max_chunk_bytes = 64 * 1024 * 1024; // 64MB auto const data_size = data.has_value() ? data->size() : source->size(); - size_t const buffer_size = std::min(max_chunk_bytes, data_size); + auto const buffer_size = std::min(max_chunk_bytes, data_size); auto const max_input_size = [&]() { if (range_end == data_size) { return data_size - byte_range_offset; @@ -246,27 +246,27 @@ std::pair, selected_rows_offsets> load_data_and_gather data_size - byte_range_offset); } }(); - - size_t pos = range_begin; - // When using byta range, need the line terminator of last line before the range - size_t input_pos = byte_range_offset == 0 ? pos : pos - 1; - size_t header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0; - uint64_t ctx = 0; + auto const header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0; // For compatibility with the previous parser, a row is considered in-range if the // previous row terminator is within the given range range_end += (range_end < data_size); + auto pos = range_begin; + // When using byta range, need the line terminator of last line before the range + auto input_pos = byte_range_offset == 0 ? pos : pos - 1; + uint64_t ctx = 0; + rmm::device_uvector d_data{0, stream}; d_data.reserve((load_whole_file) ? data_size : std::min(buffer_size * 2, max_input_size), stream); rmm::device_uvector all_row_offsets{0, stream}; - size_t const max_blocks = + auto const max_blocks = std::max((buffer_size / cudf::io::csv::gpu::rowofs_block_bytes) + 1, 2); cudf::detail::hostdevice_vector row_ctx(max_blocks, stream); do { - size_t target_pos = std::min(pos + max_chunk_bytes, max_input_size); - size_t chunk_size = target_pos - pos; + auto const target_pos = std::min(pos + max_chunk_bytes, max_input_size); + auto const chunk_size = target_pos - pos; auto const previous_data_size = d_data.size(); d_data.resize(target_pos - input_pos, stream); @@ -291,18 +291,18 @@ std::pair, selected_rows_offsets> load_data_and_gather // Pass 1: Count the potential number of rows in each character block for each // possible parser state at the beginning of the block. - uint32_t num_blocks = cudf::io::csv::gpu::gather_row_offsets(parse_opts.view(), - row_ctx.device_ptr(), - device_span(), - d_data, - chunk_size, - pos, - input_pos, - max_input_size, - range_begin, - range_end, - skip_rows, - stream); + auto const num_blocks = cudf::io::csv::gpu::gather_row_offsets(parse_opts.view(), + row_ctx.device_ptr(), + device_span(), + d_data, + chunk_size, + pos, + input_pos, + max_input_size, + range_begin, + range_end, + skip_rows, + stream); CUDF_CUDA_TRY(cudaMemcpyAsync(row_ctx.host_ptr(), row_ctx.device_ptr(), num_blocks * sizeof(uint64_t), @@ -391,7 +391,7 @@ std::pair, selected_rows_offsets> load_data_and_gather auto row_offsets = selected_rows_offsets{std::move(all_row_offsets), non_blank_row_offsets}; // Remove header rows and extract header - size_t const header_row_index = std::max(header_rows, 1) - 1; + auto const header_row_index = std::max(header_rows, 1) - 1; if (header_row_index + 1 < row_offsets.size()) { CUDF_CUDA_TRY(cudaMemcpyAsync(row_ctx.host_ptr(), row_offsets.data() + header_row_index, From 63abe6aabcc917b0457cc595a6cbcdbe3f06ebc9 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 17 Sep 2024 17:55:31 -0700 Subject: [PATCH 4/9] well.. there it is --- cpp/src/io/csv/reader_impl.cu | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 38fc709a0ad..94cb283e8bb 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -270,23 +270,30 @@ std::pair, selected_rows_offsets> load_data_and_gather auto const previous_data_size = d_data.size(); d_data.resize(target_pos - input_pos, stream); + + auto const read_offset = byte_range_offset + input_pos + previous_data_size; + auto const read_size = target_pos - input_pos - previous_data_size; if (data.has_value()) { - CUDF_CUDA_TRY( - cudaMemcpyAsync(d_data.begin() + previous_data_size, - data->begin() + byte_range_offset + input_pos + previous_data_size, - target_pos - input_pos - previous_data_size, - cudaMemcpyDefault, - stream.value())); - } else { - // TODO use device_read - auto const buffer = source->host_read(input_pos + byte_range_offset + previous_data_size, - target_pos - input_pos - previous_data_size); - CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.begin() + previous_data_size, - buffer->data(), - buffer->size(), + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data() + previous_data_size, + data->data() + read_offset, + target_pos - input_pos - previous_data_size, cudaMemcpyDefault, stream.value())); - stream.synchronize(); + } else { + if (source->is_device_read_preferred(read_size)) { + source->device_read_async(read_offset, + read_size, + reinterpret_cast(d_data.data() + previous_data_size), + stream); + } else { + auto const buffer = source->host_read(read_offset, read_size); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data() + previous_data_size, + buffer->data(), + buffer->size(), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + } } // Pass 1: Count the potential number of rows in each character block for each From 9a2610268105ce26e6ff49722c26adc8f9f59fc5 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 18 Sep 2024 13:06:53 -0700 Subject: [PATCH 5/9] well.. there it is --- cpp/src/io/csv/reader_impl.cu | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 38fc709a0ad..8396f65c564 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -270,23 +270,30 @@ std::pair, selected_rows_offsets> load_data_and_gather auto const previous_data_size = d_data.size(); d_data.resize(target_pos - input_pos, stream); + + auto const read_offset = byte_range_offset + input_pos + previous_data_size; + auto const read_size = target_pos - input_pos - previous_data_size; if (data.has_value()) { - CUDF_CUDA_TRY( - cudaMemcpyAsync(d_data.begin() + previous_data_size, - data->begin() + byte_range_offset + input_pos + previous_data_size, - target_pos - input_pos - previous_data_size, - cudaMemcpyDefault, - stream.value())); - } else { - // TODO use device_read - auto const buffer = source->host_read(input_pos + byte_range_offset + previous_data_size, - target_pos - input_pos - previous_data_size); - CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.begin() + previous_data_size, - buffer->data(), - buffer->size(), + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data() + previous_data_size, + data->data() + read_offset, + target_pos - input_pos - previous_data_size, cudaMemcpyDefault, stream.value())); - stream.synchronize(); + } else { + if (source->is_device_read_preferred(read_size)) { + source->device_read(read_offset, + read_size, + reinterpret_cast(d_data.data() + previous_data_size), + stream); + } else { + auto const buffer = source->host_read(read_offset, read_size); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data() + previous_data_size, + buffer->data(), + buffer->size(), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + } } // Pass 1: Count the potential number of rows in each character block for each From 4e192527171310a2fc08c17d7404df200aa689a3 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 24 Sep 2024 16:42:54 -0700 Subject: [PATCH 6/9] Apply suggestions from code review Co-authored-by: MithunR --- cpp/src/io/csv/reader_impl.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 8396f65c564..e06204a0c3c 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -238,7 +238,7 @@ std::pair, selected_rows_offsets> load_data_and_gather auto const data_size = data.has_value() ? data->size() : source->size(); auto const buffer_size = std::min(max_chunk_bytes, data_size); - auto const max_input_size = [&]() { + auto const max_input_size = [&] { if (range_end == data_size) { return data_size - byte_range_offset; } else { @@ -253,7 +253,7 @@ std::pair, selected_rows_offsets> load_data_and_gather range_end += (range_end < data_size); auto pos = range_begin; - // When using byta range, need the line terminator of last line before the range + // When using byte range, need the line terminator of last line before the range auto input_pos = byte_range_offset == 0 ? pos : pos - 1; uint64_t ctx = 0; From e8dc274a00433e5c75e348adb5761debc27ac1b5 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 25 Sep 2024 09:44:49 -0700 Subject: [PATCH 7/9] comment Co-authored-by: MithunR --- cpp/src/io/csv/reader_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index e06204a0c3c..87794d4face 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -292,7 +292,7 @@ std::pair, selected_rows_offsets> load_data_and_gather buffer->size(), cudaMemcpyDefault, stream.value())); - stream.synchronize(); + stream.synchronize(); // To prevent buffer going out of scope before we copy the data. } } From 02f88763176192bb76613231d453c9a50c53d5bb Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 25 Sep 2024 09:50:14 -0700 Subject: [PATCH 8/9] explicit ctor --- cpp/src/io/csv/reader_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 87794d4face..d1cbfd1609a 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -85,7 +85,7 @@ class selected_rows_offsets { : all{std::move(data)}, selected{selected_span} { } - selected_rows_offsets(rmm::cuda_stream_view stream) : all{0, stream}, selected{all} {} + explicit selected_rows_offsets(rmm::cuda_stream_view stream) : all{0, stream}, selected{all} {} operator device_span() const { return selected; } void shrink(size_t size) From 98a22ae45b40248fffb3bcd73b5f9e8abe50e821 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 27 Sep 2024 20:09:05 -0700 Subject: [PATCH 9/9] code review suggestions --- cpp/src/io/csv/reader_impl.cu | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index d1cbfd1609a..8c32fc85f78 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -196,7 +196,8 @@ void erase_except_last(C& container, rmm::cuda_stream_view stream) constexpr std::array UTF8_BOM = {0xEF, 0xBB, 0xBF}; [[nodiscard]] bool has_utf8_bom(host_span data) { - return data.size() >= 3 && memcmp(data.data(), UTF8_BOM.data(), 3) == 0; + return data.size() >= UTF8_BOM.size() && + memcmp(data.data(), UTF8_BOM.data(), UTF8_BOM.size()) == 0; } /** @@ -475,8 +476,7 @@ std::pair, selected_rows_offsets> select_data_and_row_ bom_buffer->size()}; if (has_utf8_bom(bom_chars)) { data_start_offset += sizeof(UTF8_BOM); } } else { - constexpr auto find_data_start_chunk_size = 4ul * 1024; - + auto find_data_start_chunk_size = 1024ul; while (data_start_offset < source->size()) { auto const read_size = std::min(find_data_start_chunk_size, source->size() - data_start_offset); @@ -491,6 +491,7 @@ std::pair, selected_rows_offsets> select_data_and_row_ break; } data_start_offset += read_size; + find_data_start_chunk_size *= 2; } } }