From 50f8ab87412dbc9b534f0f800f37120005b2d37b Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 20 Feb 2024 09:51:12 -0800 Subject: [PATCH 01/12] rle_stream with dictionary support + micro kernels for fixed and fixed dictionary Signed-off-by: Alessandro Bellina --- cpp/src/io/parquet/decode_fixed.cu | 26 ++++++++++++++++++ cpp/src/io/parquet/decode_fixed.hpp | 42 +++++++++++++++++++++++++++++ cpp/src/io/parquet/page_data.cuh | 5 ++++ cpp/src/io/parquet/page_hdr.cu | 26 ++++++++++++++---- cpp/src/io/parquet/reader_impl.cpp | 1 + cpp/src/io/parquet/rle_stream.cuh | 2 +- 6 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 cpp/src/io/parquet/decode_fixed.hpp diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index bfd89200786..c961d869d9d 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -129,6 +129,7 @@ static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat( if (is_valid) { int const dst_pos = (value_count + thread_value_count) - 1; int const src_pos = (valid_count + thread_valid_count) - 1; + auto ix = rolling_index(src_pos); sb->nz_idx[rolling_index(src_pos)] = dst_pos; } @@ -168,6 +169,7 @@ __device__ inline void gpuDecodeValues( int const src_pos = pos + t; // the position in the output column/buffer + auto nz_idx = sb->nz_idx[rolling_index(src_pos)]; int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values @@ -387,7 +389,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __shared__ rle_run def_runs[rle_run_buffer_size]; rle_stream def_decoder{def_runs}; + bool const nullable = s->col.max_level[level_type::DEFINITION] > 0; + // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. + // + // corner case: in the case of lists, we can have pages that contain "0" rows if the current row + // starts before this page and ends after this page: + // P0 P1 P2 + // |---------|---------|----------| + // ^------------------^ + // row start row end + // P1 will contain 0 rows + // if (s->num_rows == 0) { return; } bool const nullable = is_nullable(s); @@ -494,13 +507,26 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) return; } + // the level stream decoders + // rolling_buf_size = 256 __shared__ rle_run def_runs[rle_run_buffer_size]; rle_stream def_decoder{def_runs}; __shared__ rle_run dict_runs[rle_run_buffer_size]; rle_stream dict_stream{dict_runs}; + bool const nullable = s->col.max_level[level_type::DEFINITION] > 0; + // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. + // + // corner case: in the case of lists, we can have pages that contain "0" rows if the current row + // starts before this page and ends after this page: + // P0 P1 P2 + // |---------|---------|----------| + // ^------------------^ + // row start row end + // P1 will contain 0 rows + // if (s->num_rows == 0) { return; } bool const nullable = is_nullable(s); diff --git a/cpp/src/io/parquet/decode_fixed.hpp b/cpp/src/io/parquet/decode_fixed.hpp new file mode 100644 index 00000000000..cc26ae3e8df --- /dev/null +++ b/cpp/src/io/parquet/decode_fixed.hpp @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "parquet_gpu.hpp" + +namespace cudf { +namespace io { +namespace parquet { +namespace detail { +void DecodePageDataFixed(cudf::detail::hostdevice_vector& pages, + cudf::detail::hostdevice_vector const& chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + rmm::cuda_stream_view stream); + +void DecodePageDataFixedDict(cudf::detail::hostdevice_vector& pages, + cudf::detail::hostdevice_vector const& chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + rmm::cuda_stream_view stream); + +} // namespace detail +} // namespace parquet +} // namespace io +} // namespace cudf diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index f182747650e..c920c30288b 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -18,8 +18,13 @@ #include "page_decode.cuh" +#include + #include +#include +#include + namespace cudf::io::parquet::detail { /** diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index cf0dd85e490..b93a059d993 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -163,10 +163,20 @@ __device__ inline bool is_boolean(ColumnChunkDesc const& chunk) * @return `kernel_mask_bits` value for the given page */ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, - ColumnChunkDesc const& chunk) + ColumnChunkDesc const& chunk, + int use_fixed_op) { if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } - + // TODO: once we remove `use_fixed_op` we should also simplify this + // to use some functions (e.g. is_fixed_width, is_dictionary) + if (use_fixed_op != 0 && !is_string_col(chunk) && chunk.max_nesting_depth == 1 && + (chunk.data_type & 7) != BYTE_ARRAY && (chunk.data_type & 7) != BOOLEAN) { + if (page.encoding == Encoding::PLAIN) { + return decode_kernel_mask::FIXED_WIDTH_NO_DICT; + } else if (use_fixed_op == 2 && page.encoding == Encoding::PLAIN_DICTIONARY) { + return decode_kernel_mask::FIXED_WIDTH_DICT; + } + } if (page.encoding == Encoding::DELTA_BINARY_PACKED) { return decode_kernel_mask::DELTA_BINARY; } else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) { @@ -383,7 +393,8 @@ CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, chunk_page_info* chunk_pages, int32_t num_chunks, - kernel_error::pointer error_code) + kernel_error::pointer error_code, + int use_fixed_op) { using cudf::detail::warp_size; gpuParsePageHeader parse_page_header; @@ -485,7 +496,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, error[warp_id] |= static_cast(decode_error::DATA_STREAM_OVERRUN); } - bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck); + bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck, use_fixed_op); } else { bs->cur = bs->end; } @@ -574,8 +585,13 @@ void __host__ DecodePageHeaders(ColumnChunkDesc* chunks, dim3 dim_block(128, 1); dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block + // TODO: note we will no longer need this env variable and use_fixed_op + // once it's enabled by default. + char* opt = std::getenv("USE_FIXED_OP"); + int use_fixed_op = (opt == nullptr || opt[0] == '0') ? 0 : (opt[0] == '1' ? 1 : 2); + gpuDecodePageHeaders<<>>( - chunks, chunk_pages, num_chunks, error_code); + chunks, chunk_pages, num_chunks, error_code, use_fixed_op); } void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks, diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 3af4d5cdb86..7ebbd5fd4db 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -16,6 +16,7 @@ #include "reader_impl.hpp" +#include "decode_fixed.hpp" #include "error.hpp" #include diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 4a0791d5c54..79fff990bcb 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -300,7 +300,7 @@ struct rle_stream { if (warp_id == 0) { // fill the next set of runs. fill_runs will generally be the bottleneck for any // kernel that uses an rle_stream. - if (warp_lane == 0) { + if (!warp_lane) { fill_run_batch(); if (decode_index == -1) { // first time, set it to the beginning of the buffer (rolled) From 990a849f3d2c3d43af3d9df1cf4da4b6c04631e7 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Thu, 29 Feb 2024 15:36:11 -0800 Subject: [PATCH 02/12] load balancing experiment --- cpp/examples/basic/CMakeLists.txt | 4 +- cpp/examples/build.sh | 2 +- cpp/src/io/parquet/rle_stream.cuh | 71 +++++++++++++++++++++---------- 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/cpp/examples/basic/CMakeLists.txt b/cpp/examples/basic/CMakeLists.txt index a3fe699667a..5c52cbe8428 100644 --- a/cpp/examples/basic/CMakeLists.txt +++ b/cpp/examples/basic/CMakeLists.txt @@ -17,8 +17,8 @@ project( include(../fetch_dependencies.cmake) # Configure your project here -add_executable(basic_example src/process_csv.cpp) -target_link_libraries(basic_example PRIVATE cudf::cudf) +add_executable(basic_example src/process_parquet.cpp) +target_link_libraries(basic_example PRIVATE cudf::cudf cudf::cudftest_default_stream cudf::cudftestutil) target_compile_features(basic_example PRIVATE cxx_std_17) install(TARGETS basic_example DESTINATION bin/examples/libcudf) diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index bde6ef7d69c..955be032b32 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -47,7 +47,7 @@ build_example() { build_dir="${example_dir}/build" # Configure - cmake -S ${example_dir} -B ${build_dir} -Dcudf_ROOT="${LIB_BUILD_DIR}" + cmake -S ${example_dir} -B ${build_dir} -Dcudf_ROOT="${LIB_BUILD_DIR}" -DCMAKE_BUILD_TYPE=Debug # Build cmake --build ${build_dir} -j${PARALLEL_LEVEL} # Install if needed diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 79fff990bcb..be7521ce131 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -158,6 +158,7 @@ struct rle_run { uint8_t const* start; int level_run; // level_run header value int remaining; // number of output items remaining to be decoded + int run_offset; }; // a stream of rle_runs @@ -189,6 +190,9 @@ struct rle_stream { int fill_index; int decode_index; + int last_run_bytes; + int last_run_remaining; + __device__ rle_stream(rle_run* _runs) : runs(_runs) {} __device__ inline bool is_last_decode_warp(int warp_id) @@ -214,6 +218,9 @@ struct rle_stream { cur_values = 0; fill_index = 0; decode_index = -1; // signals the first iteration. Nothing to decode. + + last_run_bytes = 0; + last_run_remaining = 0; } __device__ inline void fill_run_batch() @@ -229,30 +236,48 @@ struct rle_stream { auto& run = runs[rolling_index(fill_index)]; // Encoding::RLE - // bytes for the varint header - uint8_t const* _cur = cur; - int const level_run = get_vlq32(_cur, end); - // run_bytes includes the header size - int run_bytes = _cur - cur; - - // literal run - if (is_literal_run(level_run)) { - // from the parquet spec: literal runs always come in multiples of 8 values. - run.size = (level_run >> 1) * 8; - run_bytes += ((run.size * level_bits) + 7) >> 3; + if (last_run_remaining == 0) { + uint8_t const* _cur = cur; + int const level_run = get_vlq32(_cur, end); + // run_bytes includes the header size + int run_bytes = _cur - cur; + + // literal run + if (is_literal_run(level_run)) { + // from the parquet spec: literal runs always come in multiples of 8 values. + run.size = (level_run >> 1) * 8; + run_bytes += ((run.size * level_bits) + 7) >> 3; + } + // repeated value run + else { + run.size = (level_run >> 1); + run_bytes += ((level_bits) + 7) >> 3; + } + last_run_bytes = run_bytes; + last_run_remaining = run.size; + int const batch = min(96, last_run_remaining); + run.output_pos = output_pos; + run.start = _cur; + run.level_run = level_run; + run.remaining = batch; + run.run_offset = run.size - last_run_remaining; + last_run_remaining -= batch; + } else { + auto& prior = runs[rolling_index(fill_index - 1)]; + int const batch = min(96, last_run_remaining); + run.output_pos = prior.output_pos; + run.start = prior.start; + run.level_run = prior.level_run; + run.remaining = batch; + run.size = prior.size; + run.run_offset = run.size - last_run_remaining; + last_run_remaining -= batch; } - // repeated value run - else { - run.size = (level_run >> 1); - run_bytes += ((level_bits) + 7) >> 3; + if (last_run_remaining == 0) { + cur += last_run_bytes; + output_pos += run.size; } - run.output_pos = output_pos; - run.start = _cur; - run.level_run = level_run; - run.remaining = run.size; - cur += run_bytes; - output_pos += run.size; fill_index++; } } @@ -324,7 +349,7 @@ struct rle_stream { // if it's supposed to fit in this call to `decode_next`. if (max_count > run.output_pos) { int remaining = run.remaining; - int const run_offset = run.size - remaining; + int run_offset = run.run_offset; // last_run_pos is the absolute position of the run, including // what was decoded last time. int const last_run_pos = run.output_pos + run_offset; @@ -358,6 +383,8 @@ struct rle_stream { decode_index_shared = run_index + 1; } run.remaining = remaining; + run_offset += batch_len; + run.run_offset = run_offset; } } } From 46e8294ff6b3ce53f5033f4439942a905aa89906 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 1 Mar 2024 06:31:21 -0800 Subject: [PATCH 03/12] parquet process exampel --- cpp/examples/basic/src/process_parquet.cpp | 216 +++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 cpp/examples/basic/src/process_parquet.cpp diff --git a/cpp/examples/basic/src/process_parquet.cpp b/cpp/examples/basic/src/process_parquet.cpp new file mode 100644 index 00000000000..306d96989be --- /dev/null +++ b/cpp/examples/basic/src/process_parquet.cpp @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +cudf::io::table_with_metadata read_parquet( + std::string const& file_path, + std::string const& col_name) +{ + std::cout << "reading parquet file: " << file_path << " column: " << col_name << std::endl; + auto source_info = cudf::io::source_info(file_path); + auto builder = cudf::io::parquet_reader_options::builder(source_info); + cudf::io::parquet_reader_options options; + if (col_name == "ALL") { + options = builder.build(); + } else { + options = builder.columns({col_name}).build(); + } + auto reader = cudf::io::chunked_parquet_reader( + 1L*1024*1024*1024, + 4L*1024*1024*1024, options); + cudf::io::table_with_metadata res; + while (reader.has_next()) { + res = reader.read_chunk(); + std::cout << "table of " << res.tbl->num_rows() << " rows scanned" << std::endl; + for (int i = 0; i < res.tbl->num_columns(); ++i) { + std::cout << "Col " << i + << " num_rows: " << res.tbl->get_column(i).size() + << " num_nulls: " << res.tbl->get_column(i).null_count() << std::endl; + } + } + return res; +} + +void simple_int_column(int num_rows) +{ + std::string filepath("/home/abellina/table_with_dict.parquet"); + + [[maybe_unused]] auto valids = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return 1; }); //i % 2 == 0; }); + //0, [](auto i) { return i == 123 || i == 555 ? 0 : 1; }); + + /// 0, [](auto i) { return 1; }); + // 0, [](auto i) { return i == 123 || i == 777 ? 0 : 1; }); + auto iter1 = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i % 10; }); + cudf::test::fixed_width_column_wrapper col1(iter1, iter1 + num_rows, valids); + //cudf::test::fixed_width_column_wrapper col1(iter1, iter1 + num_rows); + auto tbl = cudf::table_view{{col1}}; + + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) + .dictionary_policy(cudf::io::dictionary_policy::ALWAYS); + cudf::io::write_parquet(out_opts); +} + +int main(int argc, char** argv) +{ + cudaSetDevice(0); + + //auto resource = cudf::test::create_memory_resource("pool"); + //auto resource = cudf::test::create_memory_resource("cuda"); + auto resource = cudf::test::create_memory_resource("cuda"); + rmm::mr::set_current_device_resource(resource.get()); + + // Read data + //auto store_sales = read_parquet("/home/abellina/part-00191-9dcfb50c-76b0-4dbf-882b-b60e7ad5b925.c000.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/part-00000-f7d7d84a-8f00-4921-95d1-ef17638a1c83-c000.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/second_mill.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/s_1.5.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/s_1.25.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/s_1.125.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/s_1.005.snappy.parquet"); + //auto store_sales = read_parquet("/home/abellina/cudf/part-00000-f7d7d84a-8f00-4921-95d1-ef17638a1c83-c000.snappy.parquet"); + ///cudaDeviceSynchronize(); +// std::cout <<"done1"<view(), actual.tbl->view()); +// std::cout << "done" << std::endl; +//} +for (int i = 0; i < 1; ++i) { + setenv("USE_FIXED_OP", "0", 1); + auto expected = read_parquet(name, "ALL"); + cudaDeviceSynchronize(); + std::cout << "op0:" << cudf::test::to_string(expected.tbl->get_column(0).view(), std::string(",")) << std::endl; + + setenv("USE_FIXED_OP", "2", 1); + auto actual = read_parquet(name, "ALL"); + //read_parquet(name, "ALL"); + cudaDeviceSynchronize(); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected.tbl->view(), actual.tbl->view()); + std::cout << "op2:" << cudf::test::to_string(actual.tbl->get_column(0).view(), std::string(",")) << std::endl; + std::cout << "done " << i << std::endl; +} + + + //if (argc > 1) { + // num_rows = atoi(argv[1]); + //} + //simple_int_column(17); + //read_parquet("/home/abellina/table_with_dict.parquet", "ALL"); + +// std::cout << "over here: " << cudf::test::to_string(simple.tbl->get_column(0).view(), std::string(",")) << std::endl; + //std::cout << "done" << std::endl; + + return 0; +} From 03f202cd61bbeb7b6ae5410aeae4b2da6bea5cbf Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 14 May 2024 21:30:42 -0700 Subject: [PATCH 04/12] rebase 24.06 Signed-off-by: Gera Shegalov --- cpp/src/io/parquet/decode_fixed.cu | 4 ---- cpp/src/io/parquet/page_hdr.cu | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index c961d869d9d..c4c35bab831 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -389,8 +389,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __shared__ rle_run def_runs[rle_run_buffer_size]; rle_stream def_decoder{def_runs}; - bool const nullable = s->col.max_level[level_type::DEFINITION] > 0; - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. // // corner case: in the case of lists, we can have pages that contain "0" rows if the current row @@ -515,8 +513,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __shared__ rle_run dict_runs[rle_run_buffer_size]; rle_stream dict_stream{dict_runs}; - bool const nullable = s->col.max_level[level_type::DEFINITION] > 0; - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. // // corner case: in the case of lists, we can have pages that contain "0" rows if the current row diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index b93a059d993..a370ea9e688 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -170,7 +170,7 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, // TODO: once we remove `use_fixed_op` we should also simplify this // to use some functions (e.g. is_fixed_width, is_dictionary) if (use_fixed_op != 0 && !is_string_col(chunk) && chunk.max_nesting_depth == 1 && - (chunk.data_type & 7) != BYTE_ARRAY && (chunk.data_type & 7) != BOOLEAN) { + chunk.physical_type != BYTE_ARRAY && chunk.physical_type != BOOLEAN) { if (page.encoding == Encoding::PLAIN) { return decode_kernel_mask::FIXED_WIDTH_NO_DICT; } else if (use_fixed_op == 2 && page.encoding == Encoding::PLAIN_DICTIONARY) { From b11b74fae6d9d47a0442dce970a2288d924df74e Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 16 May 2024 23:04:31 -0700 Subject: [PATCH 05/12] suggestions 1 Signed-off-by: Gera Shegalov --- cpp/src/io/parquet/decode_fixed.cu | 2 -- cpp/src/io/parquet/page_hdr.cu | 16 ---------------- 2 files changed, 18 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index c4c35bab831..7361bc9b5ac 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -129,7 +129,6 @@ static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat( if (is_valid) { int const dst_pos = (value_count + thread_value_count) - 1; int const src_pos = (valid_count + thread_valid_count) - 1; - auto ix = rolling_index(src_pos); sb->nz_idx[rolling_index(src_pos)] = dst_pos; } @@ -169,7 +168,6 @@ __device__ inline void gpuDecodeValues( int const src_pos = pos + t; // the position in the output column/buffer - auto nz_idx = sb->nz_idx[rolling_index(src_pos)]; int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index a370ea9e688..2f535cf4559 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -167,16 +167,6 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, int use_fixed_op) { if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } - // TODO: once we remove `use_fixed_op` we should also simplify this - // to use some functions (e.g. is_fixed_width, is_dictionary) - if (use_fixed_op != 0 && !is_string_col(chunk) && chunk.max_nesting_depth == 1 && - chunk.physical_type != BYTE_ARRAY && chunk.physical_type != BOOLEAN) { - if (page.encoding == Encoding::PLAIN) { - return decode_kernel_mask::FIXED_WIDTH_NO_DICT; - } else if (use_fixed_op == 2 && page.encoding == Encoding::PLAIN_DICTIONARY) { - return decode_kernel_mask::FIXED_WIDTH_DICT; - } - } if (page.encoding == Encoding::DELTA_BINARY_PACKED) { return decode_kernel_mask::DELTA_BINARY; } else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) { @@ -584,12 +574,6 @@ void __host__ DecodePageHeaders(ColumnChunkDesc* chunks, { dim3 dim_block(128, 1); dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block - - // TODO: note we will no longer need this env variable and use_fixed_op - // once it's enabled by default. - char* opt = std::getenv("USE_FIXED_OP"); - int use_fixed_op = (opt == nullptr || opt[0] == '0') ? 0 : (opt[0] == '1' ? 1 : 2); - gpuDecodePageHeaders<<>>( chunks, chunk_pages, num_chunks, error_code, use_fixed_op); } From 35b00ee913009519d7fc052461147878c26dc099 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 16 May 2024 23:14:22 -0700 Subject: [PATCH 06/12] suggestions Signed-off-by: Gera Shegalov --- cpp/src/io/parquet/page_hdr.cu | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 2f535cf4559..85ec623ad44 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -163,8 +163,7 @@ __device__ inline bool is_boolean(ColumnChunkDesc const& chunk) * @return `kernel_mask_bits` value for the given page */ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, - ColumnChunkDesc const& chunk, - int use_fixed_op) + ColumnChunkDesc const& chunk) { if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } if (page.encoding == Encoding::DELTA_BINARY_PACKED) { @@ -383,8 +382,7 @@ CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, chunk_page_info* chunk_pages, int32_t num_chunks, - kernel_error::pointer error_code, - int use_fixed_op) + kernel_error::pointer error_code) { using cudf::detail::warp_size; gpuParsePageHeader parse_page_header; @@ -486,7 +484,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, error[warp_id] |= static_cast(decode_error::DATA_STREAM_OVERRUN); } - bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck, use_fixed_op); + bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck); } else { bs->cur = bs->end; } @@ -575,7 +573,7 @@ void __host__ DecodePageHeaders(ColumnChunkDesc* chunks, dim3 dim_block(128, 1); dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block gpuDecodePageHeaders<<>>( - chunks, chunk_pages, num_chunks, error_code, use_fixed_op); + chunks, chunk_pages, num_chunks, error_code); } void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks, From 4c9a1ca28af373fd9a528329b28a95da6a7dd9a5 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 5 Jun 2024 21:38:50 -0700 Subject: [PATCH 07/12] Robert Maynard's patch Signed-off-by: Gera Shegalov --- cpp/tests/CMakeLists.txt | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 2f2c12f265c..a0d9083c4a4 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -68,12 +68,14 @@ function(ConfigureTest CMAKE_TEST_NAME) INSTALL_COMPONENT_SET testing ) - set_tests_properties( - ${CMAKE_TEST_NAME} - PROPERTIES - ENVIRONMENT - "GTEST_CUDF_STREAM_MODE=new_${_CUDF_TEST_STREAM_MODE}_default;LD_PRELOAD=$" - ) + if(CUDF_BUILD_STREAMS_TEST_UTIL) + set_tests_properties( + ${CMAKE_TEST_NAME} + PROPERTIES + ENVIRONMENT + "GTEST_CUDF_STREAM_MODE=new_${_CUDF_TEST_STREAM_MODE}_default;LD_PRELOAD=$" + ) + endif() endfunction() # ################################################################################################## @@ -401,14 +403,10 @@ ConfigureTest(SPAN_TEST utilities_tests/span_tests.cu) ConfigureTest(SPAN_TEST_DEVICE_VECTOR utilities_tests/span_tests.cu) # Overwrite the environments set by ConfigureTest -set_tests_properties( - SPAN_TEST - PROPERTIES - ENVIRONMENT - "GTEST_FILTER=-${_allowlist_filter};GTEST_CUDF_STREAM_MODE=new_cudf_default;LD_PRELOAD=$" -) -set_tests_properties( - SPAN_TEST_DEVICE_VECTOR PROPERTIES ENVIRONMENT "GTEST_FILTER=${_allowlist_filter}" +set_property( + TEST SPAN_TEST SPAN_TEST_DEVICE_VECTOR + APPEND + PROPERTY ENVIRONMENT "GTEST_FILTER=-${_allowlist_filter}" ) # ################################################################################################## @@ -671,9 +669,11 @@ target_include_directories(JIT_PARSER_TEST PRIVATE "$ Date: Thu, 18 Jul 2024 01:00:05 -0700 Subject: [PATCH 08/12] Delete process-parquet Signed-off-by: Gera Shegalov --- cpp/examples/basic/CMakeLists.txt | 4 +- cpp/examples/basic/src/process_parquet.cpp | 216 --------------------- cpp/examples/build.sh | 2 +- 3 files changed, 3 insertions(+), 219 deletions(-) delete mode 100644 cpp/examples/basic/src/process_parquet.cpp diff --git a/cpp/examples/basic/CMakeLists.txt b/cpp/examples/basic/CMakeLists.txt index 5c52cbe8428..a3fe699667a 100644 --- a/cpp/examples/basic/CMakeLists.txt +++ b/cpp/examples/basic/CMakeLists.txt @@ -17,8 +17,8 @@ project( include(../fetch_dependencies.cmake) # Configure your project here -add_executable(basic_example src/process_parquet.cpp) -target_link_libraries(basic_example PRIVATE cudf::cudf cudf::cudftest_default_stream cudf::cudftestutil) +add_executable(basic_example src/process_csv.cpp) +target_link_libraries(basic_example PRIVATE cudf::cudf) target_compile_features(basic_example PRIVATE cxx_std_17) install(TARGETS basic_example DESTINATION bin/examples/libcudf) diff --git a/cpp/examples/basic/src/process_parquet.cpp b/cpp/examples/basic/src/process_parquet.cpp deleted file mode 100644 index 306d96989be..00000000000 --- a/cpp/examples/basic/src/process_parquet.cpp +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -cudf::io::table_with_metadata read_parquet( - std::string const& file_path, - std::string const& col_name) -{ - std::cout << "reading parquet file: " << file_path << " column: " << col_name << std::endl; - auto source_info = cudf::io::source_info(file_path); - auto builder = cudf::io::parquet_reader_options::builder(source_info); - cudf::io::parquet_reader_options options; - if (col_name == "ALL") { - options = builder.build(); - } else { - options = builder.columns({col_name}).build(); - } - auto reader = cudf::io::chunked_parquet_reader( - 1L*1024*1024*1024, - 4L*1024*1024*1024, options); - cudf::io::table_with_metadata res; - while (reader.has_next()) { - res = reader.read_chunk(); - std::cout << "table of " << res.tbl->num_rows() << " rows scanned" << std::endl; - for (int i = 0; i < res.tbl->num_columns(); ++i) { - std::cout << "Col " << i - << " num_rows: " << res.tbl->get_column(i).size() - << " num_nulls: " << res.tbl->get_column(i).null_count() << std::endl; - } - } - return res; -} - -void simple_int_column(int num_rows) -{ - std::string filepath("/home/abellina/table_with_dict.parquet"); - - [[maybe_unused]] auto valids = cudf::detail::make_counting_transform_iterator( - 0, [](auto i) { return 1; }); //i % 2 == 0; }); - //0, [](auto i) { return i == 123 || i == 555 ? 0 : 1; }); - - /// 0, [](auto i) { return 1; }); - // 0, [](auto i) { return i == 123 || i == 777 ? 0 : 1; }); - auto iter1 = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i % 10; }); - cudf::test::fixed_width_column_wrapper col1(iter1, iter1 + num_rows, valids); - //cudf::test::fixed_width_column_wrapper col1(iter1, iter1 + num_rows); - auto tbl = cudf::table_view{{col1}}; - - cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) - .dictionary_policy(cudf::io::dictionary_policy::ALWAYS); - cudf::io::write_parquet(out_opts); -} - -int main(int argc, char** argv) -{ - cudaSetDevice(0); - - //auto resource = cudf::test::create_memory_resource("pool"); - //auto resource = cudf::test::create_memory_resource("cuda"); - auto resource = cudf::test::create_memory_resource("cuda"); - rmm::mr::set_current_device_resource(resource.get()); - - // Read data - //auto store_sales = read_parquet("/home/abellina/part-00191-9dcfb50c-76b0-4dbf-882b-b60e7ad5b925.c000.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/part-00000-f7d7d84a-8f00-4921-95d1-ef17638a1c83-c000.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/second_mill.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/s_1.5.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/s_1.25.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/s_1.125.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/s_1.005.snappy.parquet"); - //auto store_sales = read_parquet("/home/abellina/cudf/part-00000-f7d7d84a-8f00-4921-95d1-ef17638a1c83-c000.snappy.parquet"); - ///cudaDeviceSynchronize(); -// std::cout <<"done1"<view(), actual.tbl->view()); -// std::cout << "done" << std::endl; -//} -for (int i = 0; i < 1; ++i) { - setenv("USE_FIXED_OP", "0", 1); - auto expected = read_parquet(name, "ALL"); - cudaDeviceSynchronize(); - std::cout << "op0:" << cudf::test::to_string(expected.tbl->get_column(0).view(), std::string(",")) << std::endl; - - setenv("USE_FIXED_OP", "2", 1); - auto actual = read_parquet(name, "ALL"); - //read_parquet(name, "ALL"); - cudaDeviceSynchronize(); - CUDF_TEST_EXPECT_TABLES_EQUAL(expected.tbl->view(), actual.tbl->view()); - std::cout << "op2:" << cudf::test::to_string(actual.tbl->get_column(0).view(), std::string(",")) << std::endl; - std::cout << "done " << i << std::endl; -} - - - //if (argc > 1) { - // num_rows = atoi(argv[1]); - //} - //simple_int_column(17); - //read_parquet("/home/abellina/table_with_dict.parquet", "ALL"); - -// std::cout << "over here: " << cudf::test::to_string(simple.tbl->get_column(0).view(), std::string(",")) << std::endl; - //std::cout << "done" << std::endl; - - return 0; -} diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 580803a0163..dce81fb1677 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -47,7 +47,7 @@ build_example() { build_dir="${example_dir}/build" # Configure - cmake -S ${example_dir} -B ${build_dir} -Dcudf_ROOT="${LIB_BUILD_DIR}" -DCMAKE_BUILD_TYPE=Debug + cmake -S ${example_dir} -B ${build_dir} -Dcudf_ROOT="${LIB_BUILD_DIR}" # Build cmake --build ${build_dir} -j${PARALLEL_LEVEL} # Install if needed From 861f9f76415009cab53289f1bec5e117950e3d2d Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 2 Aug 2024 22:26:50 -0700 Subject: [PATCH 09/12] review Signed-off-by: Gera Shegalov --- cpp/src/io/parquet/decode_fixed.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.hpp b/cpp/src/io/parquet/decode_fixed.hpp index cc26ae3e8df..170292e4b4f 100644 --- a/cpp/src/io/parquet/decode_fixed.hpp +++ b/cpp/src/io/parquet/decode_fixed.hpp @@ -25,14 +25,14 @@ namespace detail { void DecodePageDataFixed(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, std::size_t num_rows, - size_t min_row, + std::size_t min_row, int level_type_size, rmm::cuda_stream_view stream); void DecodePageDataFixedDict(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, std::size_t num_rows, - size_t min_row, + std::size_t min_row, int level_type_size, rmm::cuda_stream_view stream); From 90fa87b61319425a264fee394bbf0f3c8b2deb9d Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Sat, 21 Sep 2024 02:04:17 -0700 Subject: [PATCH 10/12] GERA_DEBUG log don't forget to undo --- cpp/src/io/parquet/rle_stream.cuh | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index be7521ce131..6b197a79e71 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -358,6 +358,7 @@ struct rle_stream { // space available in the output buffer (for that last run at the end of // a call to decode_next). int const batch_len = min(remaining, max_count - last_run_pos); + printf("GERA_DEBUG batch_len=%d\n"); decode(output, run.level_run, run.start, From edfed2e9345f9a234d46427adbbbd9b2b6f3e2e7 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Sun, 22 Sep 2024 11:53:52 -0700 Subject: [PATCH 11/12] DEBUG LOG warp thread batch Signed-off-by: Gera Shegalov --- cpp/src/io/parquet/rle_stream.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 6b197a79e71..2f569999eda 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -358,7 +358,7 @@ struct rle_stream { // space available in the output buffer (for that last run at the end of // a call to decode_next). int const batch_len = min(remaining, max_count - last_run_pos); - printf("GERA_DEBUG batch_len=%d\n"); + printf("GERA_DEBUG thread=%d warp_id=%d warp_lane=%d => batch_len=%d\n", t, warp_id, warp_lane, batch_len); decode(output, run.level_run, run.start, From 09dd99e67f50cabd4327964f091f8ef17ecbb7f9 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 25 Sep 2024 13:33:01 -0700 Subject: [PATCH 12/12] Acommodate for the output offset Signed-off-by: Gera Shegalov --- cpp/src/io/parquet/rle_stream.cuh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 2f569999eda..2059f2790ad 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -347,7 +347,7 @@ struct rle_stream { int const max_count = cur_values + output_count; // run.output_pos is absolute position, we start decoding // if it's supposed to fit in this call to `decode_next`. - if (max_count > run.output_pos) { + if (max_count > run.output_pos + run.run_offset) { int remaining = run.remaining; int run_offset = run.run_offset; // last_run_pos is the absolute position of the run, including @@ -358,7 +358,6 @@ struct rle_stream { // space available in the output buffer (for that last run at the end of // a call to decode_next). int const batch_len = min(remaining, max_count - last_run_pos); - printf("GERA_DEBUG thread=%d warp_id=%d warp_lane=%d => batch_len=%d\n", t, warp_id, warp_lane, batch_len); decode(output, run.level_run, run.start,