From 953eccb09f47d8a953281c719c77d1b97607fdd6 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 10 Oct 2024 18:46:30 +0000 Subject: [PATCH 1/7] Refactor ORC dict encoding with new cuco static_map --- cpp/src/io/orc/dict_enc.cu | 115 ++++++++++++++++------------------ cpp/src/io/orc/orc_gpu.hpp | 39 +++++++----- cpp/src/io/orc/writer_impl.cu | 26 +++++--- 3 files changed, 95 insertions(+), 85 deletions(-) diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 5be75350951..64f70ecf7be 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -77,20 +77,6 @@ void rowgroup_char_counts(device_2dspan counts, counts, orc_columns, rowgroup_bounds, str_col_indexes); } -template -CUDF_KERNEL void __launch_bounds__(block_size) - initialize_dictionary_hash_maps_kernel(device_span dictionaries) -{ - auto const dict_map = dictionaries[blockIdx.x].map_slots; - auto const t = threadIdx.x; - for (size_type i = 0; i < dict_map.size(); i += block_size) { - if (t + i < dict_map.size()) { - new (&dict_map[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; - new (&dict_map[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; - } - } -} - struct equality_functor { column_device_view const& col; __device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const @@ -121,26 +107,35 @@ CUDF_KERNEL void __launch_bounds__(block_size) auto const& col = columns[dict.column_idx]; // Make a view of the hash map - auto hash_map_mutable = map_type::device_mutable_view(dict.map_slots.data(), - dict.map_slots.size(), - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - auto const hash_fn = hash_functor{col}; - auto const equality_fn = equality_functor{col}; + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; + using probing_scheme_type = cuco::linear_probing; + + storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()}; + // Make a view of the hash map. + auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}, + equality_fn, + probing_scheme_type{hash_fn}, + cuco::thread_scope_block, + storage_ref}; + + // Create a map ref with `cuco::insert` operator + auto has_map_insert_ref = hash_map_ref.with_operators(cuco::insert); auto const start_row = dict.start_row; auto const end_row = dict.start_row + dict.num_rows; size_type entry_count{0}; size_type char_count{0}; + // all threads should loop the same number of times for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) { auto const is_valid = cur_row < end_row and col.is_valid(cur_row); if (is_valid) { // insert element at cur_row to hash map and count successful insertions - auto const is_unique = - hash_map_mutable.insert(std::pair(cur_row, cur_row), hash_fn, equality_fn); + auto const is_unique = has_map_insert_ref.insert(cuco::pair{cur_row, cur_row}); if (is_unique) { ++entry_count; @@ -174,25 +169,22 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (not dict.is_enabled) { return; } - auto const t = threadIdx.x; - auto map = map_type::device_view(dict.map_slots.data(), - dict.map_slots.size(), - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - + auto t = threadIdx.x; __shared__ cuda::atomic counter; using cuda::std::memory_order_relaxed; if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { - if (t + i < dict.map_slots.size()) { - auto* slot = reinterpret_cast(map.begin_slot() + t + i); - auto key = slot->first; + + for (; t < dict.map_slots.size(); t += block_size) { + auto window = dict.map_slots.begin() + t; + // Collect all slots from each window. + for (auto& slot : *window) { + auto const key = slot.first; if (key != KEY_SENTINEL) { auto loc = counter.fetch_add(1, memory_order_relaxed); dict.data[loc] = key; - slot->second = loc; + slot.second = loc; } } } @@ -205,47 +197,48 @@ CUDF_KERNEL void __launch_bounds__(block_size) { auto const col_idx = blockIdx.x; auto const stripe_idx = blockIdx.y; + auto const t = threadIdx.x; auto const& dict = dictionaries[col_idx][stripe_idx]; auto const& col = columns[dict.column_idx]; if (not dict.is_enabled) { return; } - auto const t = threadIdx.x; + // Make a view of the hash map + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; + using probing_scheme_type = cuco::linear_probing; + + storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()}; + // Make a view of the hash map. + auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}, + equality_fn, + probing_scheme_type{hash_fn}, + cuco::thread_scope_block, + storage_ref}; + + // Create a map ref with `cuco::insert` operator + auto has_map_find_ref = hash_map_ref.with_operators(cuco::find); + auto const start_row = dict.start_row; auto const end_row = dict.start_row + dict.num_rows; - auto const map = map_type::device_view(dict.map_slots.data(), - dict.map_slots.size(), - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - - thread_index_type cur_row = start_row + t; - while (cur_row < end_row) { + for (thread_index_type cur_row = start_row + t; cur_row < end_row; cur_row += block_size) { if (col.is_valid(cur_row)) { - auto const hash_fn = hash_functor{col}; - auto const equality_fn = equality_functor{col}; - auto const found_slot = map.find(cur_row, hash_fn, equality_fn); - cudf_assert(found_slot != map.end() && - "Unable to find value in map in dictionary index construction"); - if (found_slot != map.end()) { - // No need for atomic as this is not going to be modified by any other thread - auto const val_ptr = reinterpret_cast(&found_slot->second); - dict.index[cur_row] = *val_ptr; - } + dict.index[cur_row] = [&]() { + auto const found_slot = has_map_find_ref.find(cur_row); + + // Fail if we didn't find the previously inserted key. + cudf_assert(found_slot != has_map_find_ref.end() && + "Unable to find value in map in dictionary index construction"); + + // Return the found value. + return found_slot->second; + }(); } - cur_row += block_size; } } -void initialize_dictionary_hash_maps(device_2dspan dictionaries, - rmm::cuda_stream_view stream) -{ - if (dictionaries.count() == 0) { return; } - constexpr int block_size = 1024; - initialize_dictionary_hash_maps_kernel - <<>>(dictionaries.flat_view()); -} - void populate_dictionary_hash_maps(device_2dspan dictionaries, device_span columns, rmm::cuda_stream_view stream) diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 8c7ccf0527f..0949fafe9a4 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -21,6 +21,7 @@ #include "io/utilities/column_buffer.hpp" #include "orc.hpp" +#include #include #include #include @@ -40,19 +41,27 @@ namespace gpu { using cudf::detail::device_2dspan; using cudf::detail::host_2dspan; +using key_type = size_type; +using mapped_type = size_type; +using slot_type = cuco::pair; +auto constexpr map_cg_size = + 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. + ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. +auto constexpr window_size = + 1; ///< Number of concurrent slots (set for best performance) handled by each thread. +auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size + ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. +using storage_type = cuco::aow_storage, + cudf::detail::cuco_allocator>; +using storage_ref_type = typename storage_type::ref_type; +using window_type = typename storage_type::window_type; +using slot_type = cuco::pair; + auto constexpr KEY_SENTINEL = size_type{-1}; auto constexpr VALUE_SENTINEL = size_type{-1}; -using map_type = cuco::legacy::static_map; - -/** - * @brief The alias of `map_type::pair_atomic_type` class. - * - * Declare this struct by trivial subclassing instead of type aliasing so we can have forward - * declaration of this struct somewhere else. - */ -struct slot_type : public map_type::slot_type {}; - struct CompressedStreamInfo { CompressedStreamInfo() = default; explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_) @@ -184,11 +193,11 @@ struct StripeStream { */ struct stripe_dictionary { // input - device_span map_slots; // hash map storage - uint32_t column_idx = 0; // column index - size_type start_row = 0; // first row in the stripe - size_type start_rowgroup = 0; // first rowgroup in the stripe - size_type num_rows = 0; // number of rows in the stripe + device_span map_slots; // hash map (windows) storage + uint32_t column_idx = 0; // column index + size_type start_row = 0; // first row in the stripe + size_type start_rowgroup = 0; // first rowgroup in the stripe + size_type num_rows = 0; // number of rows in the stripe // output device_span data; // index of elements in the column to include in the dictionary diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 60a64fb0ee6..f0f818feb65 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -20,6 +20,7 @@ */ #include "io/comp/nvcomp_adapter.hpp" +#include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" #include "io/utilities/column_utils.cuh" #include "writer_impl.hpp" @@ -2110,7 +2111,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, bool sort_dictionaries, rmm::cuda_stream_view stream) { - std::vector>> hash_maps_storage( + // Variable to keep track of the current total map storage size + size_t total_map_storage_size = 0; + std::vector> hash_maps_storage_offsets( orc_table.string_column_indices.size()); for (auto col_idx : orc_table.string_column_indices) { auto& str_column = orc_table.column(col_idx); @@ -2119,14 +2122,21 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, stripe.size == 0 ? 0 : segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - segmentation.rowgroups[stripe.first][col_idx].begin; - hash_maps_storage[str_column.str_index()].emplace_back(stripe_num_rows * 1.43, stream); + hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size); + total_map_storage_size += stripe_num_rows * gpu::occupancy_factor; } + hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size); } hostdevice_2dvector stripe_dicts( orc_table.num_string_columns(), segmentation.num_stripes(), stream); if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}}; + // Create a single bulk storage used by all sub-dictionaries + auto map_storage = gpu::storage_type{ + total_map_storage_size, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; + // Initialize stripe dictionaries for (auto col_idx : orc_table.string_column_indices) { auto& str_column = orc_table.column(col_idx); @@ -2137,7 +2147,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, auto const stripe_idx = stripe.id; auto& sd = stripe_dicts[str_col_idx][stripe_idx]; - sd.map_slots = hash_maps_storage[str_col_idx][stripe_idx]; + sd.map_slots = {map_storage.data() + hash_maps_storage_offsets[str_col_idx][stripe_idx], + hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] - + hash_maps_storage_offsets[str_col_idx][stripe_idx]}; sd.column_idx = col_idx; sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; sd.start_rowgroup = stripe.first; @@ -2150,7 +2162,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, } stripe_dicts.host_to_device_async(stream); - gpu::initialize_dictionary_hash_maps(stripe_dicts, stream); + map_storage.initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); // Copy the entry counts and char counts from the device to the host stripe_dicts.device_to_host_sync(stream); @@ -2184,8 +2196,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, col_use_dictionary = true; } else { // Clear hash map storage as dictionary encoding is not used for this stripe - hash_maps_storage[str_col_idx][stripe_idx] = rmm::device_uvector(0, stream); - sd.map_slots = {}; + sd.map_slots = {}; } } // If any stripe uses dictionary encoding, allocate index storage for the whole column @@ -2202,9 +2213,6 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, gpu::collect_map_entries(stripe_dicts, stream); gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); - // deallocate hash map storage, unused after this point - hash_maps_storage.clear(); - // Clear map slots and attach order buffers auto dictionaries_flat = stripe_dicts.host_view().flat_view(); for (auto& sd : dictionaries_flat) { From 732eaf9bf59e0aaa572f1a7ee6239ba2a540ccc0 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:31:51 -0700 Subject: [PATCH 2/7] Change `with_*` with `rebing_*` Co-authored-by: Yunsong Wang --- cpp/src/io/orc/dict_enc.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 64f70ecf7be..631c3e09be1 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -121,7 +121,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) storage_ref}; // Create a map ref with `cuco::insert` operator - auto has_map_insert_ref = hash_map_ref.with_operators(cuco::insert); + auto has_map_insert_ref = hash_map_ref.rebind_operators(cuco::insert); auto const start_row = dict.start_row; auto const end_row = dict.start_row + dict.num_rows; @@ -218,7 +218,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) storage_ref}; // Create a map ref with `cuco::insert` operator - auto has_map_find_ref = hash_map_ref.with_operators(cuco::find); + auto has_map_find_ref = hash_map_ref.rebind_operators(cuco::find); auto const start_row = dict.start_row; auto const end_row = dict.start_row + dict.num_rows; From 5dacd83ef41543ea766f9fe27b2bab8b0cae4273 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 11 Oct 2024 19:12:38 +0000 Subject: [PATCH 3/7] Suggestions from review --- cpp/src/io/orc/dict_enc.cu | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 631c3e09be1..26e12027049 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -95,6 +95,9 @@ struct hash_functor { } }; +// Probing scheme to use for the hash map +using probing_scheme_type = cuco::linear_probing; + template CUDF_KERNEL void __launch_bounds__(block_size) populate_dictionary_hash_maps_kernel(device_2dspan dictionaries, @@ -107,9 +110,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) auto const& col = columns[dict.column_idx]; // Make a view of the hash map - auto const hash_fn = hash_functor{col}; - auto const equality_fn = equality_functor{col}; - using probing_scheme_type = cuco::linear_probing; + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()}; // Make a view of the hash map. @@ -204,9 +206,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (not dict.is_enabled) { return; } // Make a view of the hash map - auto const hash_fn = hash_functor{col}; - auto const equality_fn = equality_functor{col}; - using probing_scheme_type = cuco::linear_probing; + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()}; // Make a view of the hash map. From 62ca2ceed4ea5f058cbd1448c592f229e445bfb5 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 11 Oct 2024 23:10:08 +0000 Subject: [PATCH 4/7] Minor suggestions from reviewer comments --- cpp/src/io/orc/dict_enc.cu | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 26e12027049..0cb5c382631 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -171,22 +171,24 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (not dict.is_enabled) { return; } - auto t = threadIdx.x; + auto const t = threadIdx.x; __shared__ cuda::atomic counter; using cuda::std::memory_order_relaxed; if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - for (; t < dict.map_slots.size(); t += block_size) { - auto window = dict.map_slots.begin() + t; - // Collect all slots from each window. - for (auto& slot : *window) { - auto const key = slot.first; - if (key != KEY_SENTINEL) { - auto loc = counter.fetch_add(1, memory_order_relaxed); - dict.data[loc] = key; - slot.second = loc; + for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { + if (t + i < dict.map_slots.size()) { + auto window = dict.map_slots.begin() + t + i; + // Collect all slots from each window. + for (auto& slot : *window) { + auto const key = slot.first; + if (key != KEY_SENTINEL) { + auto loc = counter.fetch_add(1, memory_order_relaxed); + dict.data[loc] = key; + slot.second = loc; + } } } } @@ -226,16 +228,11 @@ CUDF_KERNEL void __launch_bounds__(block_size) for (thread_index_type cur_row = start_row + t; cur_row < end_row; cur_row += block_size) { if (col.is_valid(cur_row)) { - dict.index[cur_row] = [&]() { - auto const found_slot = has_map_find_ref.find(cur_row); - - // Fail if we didn't find the previously inserted key. - cudf_assert(found_slot != has_map_find_ref.end() && - "Unable to find value in map in dictionary index construction"); - - // Return the found value. - return found_slot->second; - }(); + auto const found_slot = has_map_find_ref.find(cur_row); + // Fail if we didn't find the previously inserted key. + cudf_assert(found_slot != has_map_find_ref.end() && + "Unable to find value in map in dictionary index construction"); + dict.index[cur_row] = found_slot->second; } } } From fba84c5e168f7b24440560d0340c83e6fe307d83 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 11 Oct 2024 23:14:12 +0000 Subject: [PATCH 5/7] Remove the unnecessary lambda --- cpp/src/io/parquet/chunk_dict.cu | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 1a2a9eac17d..b85ebf2fa1a 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -194,17 +194,12 @@ struct map_find_fn { val_idx += block_size) { // Find the key using a single thread for best performance for now. if (data_col.is_valid(val_idx)) { + auto const found_slot = map_find_ref.find(val_idx); + // Fail if we didn't find the previously inserted key. + cudf_assert(found_slot != map_find_ref.end() && + "Unable to find value in map in dictionary index construction"); // No need for atomic as this is not going to be modified by any other thread. - chunk->dict_index[val_idx - s_ck_start_val_idx] = [&]() { - auto const found_slot = map_find_ref.find(val_idx); - - // Fail if we didn't find the previously inserted key. - cudf_assert(found_slot != map_find_ref.end() && - "Unable to find value in map in dictionary index construction"); - - // Return the found value. - return found_slot->second; - }(); + chunk->dict_index[val_idx - s_ck_start_val_idx] = found_slot->second; } } } else { From dafad6cba6944008b97c6ca14f431944e71a667c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 11 Oct 2024 23:35:57 +0000 Subject: [PATCH 6/7] Scope out `map_storage` --- cpp/src/io/orc/writer_impl.cu | 148 ++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 71 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index f0f818feb65..d5b645c3f11 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2132,86 +2132,92 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, orc_table.num_string_columns(), segmentation.num_stripes(), stream); if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}}; - // Create a single bulk storage used by all sub-dictionaries - auto map_storage = gpu::storage_type{ - total_map_storage_size, - cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; - - // Initialize stripe dictionaries - for (auto col_idx : orc_table.string_column_indices) { - auto& str_column = orc_table.column(col_idx); - auto const str_col_idx = str_column.str_index(); - str_column.attach_stripe_dicts(stripe_dicts[str_col_idx], - stripe_dicts.device_view()[str_col_idx]); - for (auto const& stripe : segmentation.stripes) { - auto const stripe_idx = stripe.id; - auto& sd = stripe_dicts[str_col_idx][stripe_idx]; - - sd.map_slots = {map_storage.data() + hash_maps_storage_offsets[str_col_idx][stripe_idx], - hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] - - hash_maps_storage_offsets[str_col_idx][stripe_idx]}; - sd.column_idx = col_idx; - sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; - sd.start_rowgroup = stripe.first; - sd.num_rows = - segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - sd.start_row; - - sd.entry_count = 0; - sd.char_count = 0; - } - } - stripe_dicts.host_to_device_async(stream); - - map_storage.initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); - gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); - // Copy the entry counts and char counts from the device to the host - stripe_dicts.device_to_host_sync(stream); - // Data owners; can be cleared after encode std::vector> dict_data_owner; std::vector> dict_index_owner; std::vector> dict_order_owner; - // Make decision about which stripes to encode with dictionary encoding - for (auto col_idx : orc_table.string_column_indices) { - auto& str_column = orc_table.column(col_idx); - bool col_use_dictionary{false}; - for (auto const& stripe : segmentation.stripes) { - auto const stripe_idx = stripe.id; - auto const str_col_idx = str_column.str_index(); - auto& sd = stripe_dicts[str_col_idx][stripe_idx]; - auto const direct_char_count = std::accumulate( - thrust::make_counting_iterator(stripe.first), - thrust::make_counting_iterator(stripe.first + stripe.size), - 0, - [&](auto total, auto const& rg) { return total + str_column.rowgroup_char_count(rg); }); - // Enable dictionary encoding if the dictionary size is smaller than the direct encode size - // The estimate excludes the LENGTH stream size, which is present in both cases - sd.is_enabled = [&]() { - auto const dict_index_size = varint_size(sd.entry_count); - return sd.char_count + dict_index_size * sd.entry_count < direct_char_count; - }(); - if (sd.is_enabled) { - dict_data_owner.emplace_back(sd.entry_count, stream); - sd.data = dict_data_owner.back(); - col_use_dictionary = true; - } else { - // Clear hash map storage as dictionary encoding is not used for this stripe - sd.map_slots = {}; + + // Scope this section so that `map_storage` is destroyed once no longer needed. + { + // Create a single bulk storage used by all sub-dictionaries + auto map_storage = gpu::storage_type{ + total_map_storage_size, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; + + // Initialize stripe dictionaries + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + auto const str_col_idx = str_column.str_index(); + str_column.attach_stripe_dicts(stripe_dicts[str_col_idx], + stripe_dicts.device_view()[str_col_idx]); + for (auto const& stripe : segmentation.stripes) { + auto const stripe_idx = stripe.id; + auto& sd = stripe_dicts[str_col_idx][stripe_idx]; + + sd.map_slots = {map_storage.data() + hash_maps_storage_offsets[str_col_idx][stripe_idx], + hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] - + hash_maps_storage_offsets[str_col_idx][stripe_idx]}; + sd.column_idx = col_idx; + sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; + sd.start_rowgroup = stripe.first; + sd.num_rows = + segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - sd.start_row; + + sd.entry_count = 0; + sd.char_count = 0; } } - // If any stripe uses dictionary encoding, allocate index storage for the whole column - if (col_use_dictionary) { - dict_index_owner.emplace_back(str_column.size(), stream); - for (auto& sd : stripe_dicts[str_column.str_index()]) { - sd.index = dict_index_owner.back(); + stripe_dicts.host_to_device_async(stream); + + map_storage.initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); + gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); + // Copy the entry counts and char counts from the device to the host + stripe_dicts.device_to_host_sync(stream); + + // Make decision about which stripes to encode with dictionary encoding + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + bool col_use_dictionary{false}; + for (auto const& stripe : segmentation.stripes) { + auto const stripe_idx = stripe.id; + auto const str_col_idx = str_column.str_index(); + auto& sd = stripe_dicts[str_col_idx][stripe_idx]; + auto const direct_char_count = std::accumulate( + thrust::make_counting_iterator(stripe.first), + thrust::make_counting_iterator(stripe.first + stripe.size), + 0, + [&](auto total, auto const& rg) { return total + str_column.rowgroup_char_count(rg); }); + // Enable dictionary encoding if the dictionary size is smaller than the direct encode size + // The estimate excludes the LENGTH stream size, which is present in both cases + sd.is_enabled = [&]() { + auto const dict_index_size = varint_size(sd.entry_count); + return sd.char_count + dict_index_size * sd.entry_count < direct_char_count; + }(); + if (sd.is_enabled) { + dict_data_owner.emplace_back(sd.entry_count, stream); + sd.data = dict_data_owner.back(); + col_use_dictionary = true; + } else { + // Clear hash map storage as dictionary encoding is not used for this stripe + sd.map_slots = {}; + } + } + // If any stripe uses dictionary encoding, allocate index storage for the whole column + if (col_use_dictionary) { + dict_index_owner.emplace_back(str_column.size(), stream); + for (auto& sd : stripe_dicts[str_column.str_index()]) { + sd.index = dict_index_owner.back(); + } } } - } - // Synchronize to ensure the copy is complete before we clear `map_slots` - stripe_dicts.host_to_device_sync(stream); + // Synchronize to ensure the copy is complete before we clear `map_slots` + stripe_dicts.host_to_device_sync(stream); - gpu::collect_map_entries(stripe_dicts, stream); - gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); + gpu::collect_map_entries(stripe_dicts, stream); + gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); + + // `map_storage` is destroyed here as no longer needed. + } // Clear map slots and attach order buffers auto dictionaries_flat = stripe_dicts.host_view().flat_view(); From 641ae2a9e5304deb371746652cd683421920ae35 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 11 Oct 2024 23:49:55 +0000 Subject: [PATCH 7/7] Use `unique_ptr` to destroy map_storage --- cpp/src/io/orc/writer_impl.cu | 149 +++++++++++++++++----------------- 1 file changed, 73 insertions(+), 76 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index d5b645c3f11..b09062f700e 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2132,92 +2132,89 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, orc_table.num_string_columns(), segmentation.num_stripes(), stream); if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}}; + // Create a single bulk storage to use for all sub-dictionaries + auto map_storage = std::make_unique( + total_map_storage_size, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}); + + // Initialize stripe dictionaries + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + auto const str_col_idx = str_column.str_index(); + str_column.attach_stripe_dicts(stripe_dicts[str_col_idx], + stripe_dicts.device_view()[str_col_idx]); + for (auto const& stripe : segmentation.stripes) { + auto const stripe_idx = stripe.id; + auto& sd = stripe_dicts[str_col_idx][stripe_idx]; + + sd.map_slots = {map_storage->data() + hash_maps_storage_offsets[str_col_idx][stripe_idx], + hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] - + hash_maps_storage_offsets[str_col_idx][stripe_idx]}; + sd.column_idx = col_idx; + sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; + sd.start_rowgroup = stripe.first; + sd.num_rows = + segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - sd.start_row; + + sd.entry_count = 0; + sd.char_count = 0; + } + } + stripe_dicts.host_to_device_async(stream); + + map_storage->initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); + gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); + // Copy the entry counts and char counts from the device to the host + stripe_dicts.device_to_host_sync(stream); + // Data owners; can be cleared after encode std::vector> dict_data_owner; std::vector> dict_index_owner; std::vector> dict_order_owner; - - // Scope this section so that `map_storage` is destroyed once no longer needed. - { - // Create a single bulk storage used by all sub-dictionaries - auto map_storage = gpu::storage_type{ - total_map_storage_size, - cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; - - // Initialize stripe dictionaries - for (auto col_idx : orc_table.string_column_indices) { - auto& str_column = orc_table.column(col_idx); - auto const str_col_idx = str_column.str_index(); - str_column.attach_stripe_dicts(stripe_dicts[str_col_idx], - stripe_dicts.device_view()[str_col_idx]); - for (auto const& stripe : segmentation.stripes) { - auto const stripe_idx = stripe.id; - auto& sd = stripe_dicts[str_col_idx][stripe_idx]; - - sd.map_slots = {map_storage.data() + hash_maps_storage_offsets[str_col_idx][stripe_idx], - hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] - - hash_maps_storage_offsets[str_col_idx][stripe_idx]}; - sd.column_idx = col_idx; - sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; - sd.start_rowgroup = stripe.first; - sd.num_rows = - segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - sd.start_row; - - sd.entry_count = 0; - sd.char_count = 0; + // Make decision about which stripes to encode with dictionary encoding + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + bool col_use_dictionary{false}; + for (auto const& stripe : segmentation.stripes) { + auto const stripe_idx = stripe.id; + auto const str_col_idx = str_column.str_index(); + auto& sd = stripe_dicts[str_col_idx][stripe_idx]; + auto const direct_char_count = std::accumulate( + thrust::make_counting_iterator(stripe.first), + thrust::make_counting_iterator(stripe.first + stripe.size), + 0, + [&](auto total, auto const& rg) { return total + str_column.rowgroup_char_count(rg); }); + // Enable dictionary encoding if the dictionary size is smaller than the direct encode size + // The estimate excludes the LENGTH stream size, which is present in both cases + sd.is_enabled = [&]() { + auto const dict_index_size = varint_size(sd.entry_count); + return sd.char_count + dict_index_size * sd.entry_count < direct_char_count; + }(); + if (sd.is_enabled) { + dict_data_owner.emplace_back(sd.entry_count, stream); + sd.data = dict_data_owner.back(); + col_use_dictionary = true; + } else { + // Clear hash map storage as dictionary encoding is not used for this stripe + sd.map_slots = {}; } } - stripe_dicts.host_to_device_async(stream); - - map_storage.initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); - gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); - // Copy the entry counts and char counts from the device to the host - stripe_dicts.device_to_host_sync(stream); - - // Make decision about which stripes to encode with dictionary encoding - for (auto col_idx : orc_table.string_column_indices) { - auto& str_column = orc_table.column(col_idx); - bool col_use_dictionary{false}; - for (auto const& stripe : segmentation.stripes) { - auto const stripe_idx = stripe.id; - auto const str_col_idx = str_column.str_index(); - auto& sd = stripe_dicts[str_col_idx][stripe_idx]; - auto const direct_char_count = std::accumulate( - thrust::make_counting_iterator(stripe.first), - thrust::make_counting_iterator(stripe.first + stripe.size), - 0, - [&](auto total, auto const& rg) { return total + str_column.rowgroup_char_count(rg); }); - // Enable dictionary encoding if the dictionary size is smaller than the direct encode size - // The estimate excludes the LENGTH stream size, which is present in both cases - sd.is_enabled = [&]() { - auto const dict_index_size = varint_size(sd.entry_count); - return sd.char_count + dict_index_size * sd.entry_count < direct_char_count; - }(); - if (sd.is_enabled) { - dict_data_owner.emplace_back(sd.entry_count, stream); - sd.data = dict_data_owner.back(); - col_use_dictionary = true; - } else { - // Clear hash map storage as dictionary encoding is not used for this stripe - sd.map_slots = {}; - } - } - // If any stripe uses dictionary encoding, allocate index storage for the whole column - if (col_use_dictionary) { - dict_index_owner.emplace_back(str_column.size(), stream); - for (auto& sd : stripe_dicts[str_column.str_index()]) { - sd.index = dict_index_owner.back(); - } + // If any stripe uses dictionary encoding, allocate index storage for the whole column + if (col_use_dictionary) { + dict_index_owner.emplace_back(str_column.size(), stream); + for (auto& sd : stripe_dicts[str_column.str_index()]) { + sd.index = dict_index_owner.back(); } } - // Synchronize to ensure the copy is complete before we clear `map_slots` - stripe_dicts.host_to_device_sync(stream); + } + // Synchronize to ensure the copy is complete before we clear `map_slots` + stripe_dicts.host_to_device_sync(stream); - gpu::collect_map_entries(stripe_dicts, stream); - gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); + gpu::collect_map_entries(stripe_dicts, stream); + gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); - // `map_storage` is destroyed here as no longer needed. - } + // deallocate hash map storage, unused after this point + map_storage.reset(); // Clear map slots and attach order buffers auto dictionaries_flat = stripe_dicts.host_view().flat_view();