Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ORC dictionary encoding to migrate to the new cuco::static_map #17049

Merged
merged 10 commits into from
Oct 12, 2024
115 changes: 54 additions & 61 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,6 @@ void rowgroup_char_counts(device_2dspan<size_type> counts,
counts, orc_columns, rowgroup_bounds, str_col_indexes);
}

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
initialize_dictionary_hash_maps_kernel(device_span<stripe_dictionary> dictionaries)
{
auto const dict_map = dictionaries[blockIdx.x].map_slots;
auto const t = threadIdx.x;
for (size_type i = 0; i < dict_map.size(); i += block_size) {
if (t + i < dict_map.size()) {
new (&dict_map[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&dict_map[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
}
}
}

struct equality_functor {
column_device_view const& col;
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const
Expand Down Expand Up @@ -121,26 +107,35 @@ CUDF_KERNEL void __launch_bounds__(block_size)
auto const& col = columns[dict.column_idx];

// Make a view of the hash map
auto hash_map_mutable = map_type::device_mutable_view(dict.map_slots.data(),
dict.map_slots.size(),
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
using probing_scheme_type = cuco::linear_probing<map_cg_size, hash_functor>;

storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()};
// Make a view of the hash map.
auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
equality_fn,
probing_scheme_type{hash_fn},
cuco::thread_scope_block,
storage_ref};

// Create a map ref with `cuco::insert` operator
auto has_map_insert_ref = hash_map_ref.with_operators(cuco::insert);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

auto const start_row = dict.start_row;
auto const end_row = dict.start_row + dict.num_rows;

size_type entry_count{0};
size_type char_count{0};

// all threads should loop the same number of times
for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) {
auto const is_valid = cur_row < end_row and col.is_valid(cur_row);

if (is_valid) {
// insert element at cur_row to hash map and count successful insertions
auto const is_unique =
hash_map_mutable.insert(std::pair(cur_row, cur_row), hash_fn, equality_fn);
auto const is_unique = has_map_insert_ref.insert(cuco::pair{cur_row, cur_row});

if (is_unique) {
++entry_count;
Expand Down Expand Up @@ -174,25 +169,22 @@ CUDF_KERNEL void __launch_bounds__(block_size)

if (not dict.is_enabled) { return; }

auto const t = threadIdx.x;
auto map = map_type::device_view(dict.map_slots.data(),
dict.map_slots.size(),
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

auto t = threadIdx.x;
__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;

using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();
for (size_type i = 0; i < dict.map_slots.size(); i += block_size) {
if (t + i < dict.map_slots.size()) {
auto* slot = reinterpret_cast<map_type::value_type*>(map.begin_slot() + t + i);
auto key = slot->first;

for (; t < dict.map_slots.size(); t += block_size) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto window = dict.map_slots.begin() + t;
// Collect all slots from each window.
for (auto& slot : *window) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
dict.data[loc] = key;
slot->second = loc;
slot.second = loc;
}
}
}
Expand All @@ -205,47 +197,48 @@ CUDF_KERNEL void __launch_bounds__(block_size)
{
auto const col_idx = blockIdx.x;
auto const stripe_idx = blockIdx.y;
auto const t = threadIdx.x;
auto const& dict = dictionaries[col_idx][stripe_idx];
auto const& col = columns[dict.column_idx];

if (not dict.is_enabled) { return; }

auto const t = threadIdx.x;
// Make a view of the hash map
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
using probing_scheme_type = cuco::linear_probing<map_cg_size, hash_functor>;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()};
// Make a view of the hash map.
auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
equality_fn,
probing_scheme_type{hash_fn},
cuco::thread_scope_block,
storage_ref};

// Create a map ref with `cuco::insert` operator
auto has_map_find_ref = hash_map_ref.with_operators(cuco::find);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

auto const start_row = dict.start_row;
auto const end_row = dict.start_row + dict.num_rows;

auto const map = map_type::device_view(dict.map_slots.data(),
dict.map_slots.size(),
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

thread_index_type cur_row = start_row + t;
while (cur_row < end_row) {
for (thread_index_type cur_row = start_row + t; cur_row < end_row; cur_row += block_size) {
if (col.is_valid(cur_row)) {
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const found_slot = map.find(cur_row, hash_fn, equality_fn);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto const val_ptr = reinterpret_cast<map_type::mapped_type const*>(&found_slot->second);
dict.index[cur_row] = *val_ptr;
}
dict.index[cur_row] = [&]() {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto const found_slot = has_map_find_ref.find(cur_row);

// Fail if we didn't find the previously inserted key.
cudf_assert(found_slot != has_map_find_ref.end() &&
"Unable to find value in map in dictionary index construction");

// Return the found value.
return found_slot->second;
}();
}
cur_row += block_size;
}
}

void initialize_dictionary_hash_maps(device_2dspan<stripe_dictionary> dictionaries,
rmm::cuda_stream_view stream)
{
if (dictionaries.count() == 0) { return; }
constexpr int block_size = 1024;
initialize_dictionary_hash_maps_kernel<block_size>
<<<dictionaries.count(), block_size, 0, stream.value()>>>(dictionaries.flat_view());
}

void populate_dictionary_hash_maps(device_2dspan<stripe_dictionary> dictionaries,
device_span<orc_column_device_view const> columns,
rmm::cuda_stream_view stream)
Expand Down
39 changes: 24 additions & 15 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/utilities/column_buffer.hpp"
#include "orc.hpp"

#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/timezone.cuh>
#include <cudf/io/orc_types.hpp>
#include <cudf/io/types.hpp>
Expand All @@ -40,19 +41,27 @@ namespace gpu {
using cudf::detail::device_2dspan;
using cudf::detail::host_2dspan;

using key_type = size_type;
using mapped_type = size_type;
using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using slot_type = cuco::pair<key_type, mapped_type>;

auto constexpr KEY_SENTINEL = size_type{-1};
auto constexpr VALUE_SENTINEL = size_type{-1};

using map_type = cuco::legacy::static_map<size_type, size_type>;

/**
* @brief The alias of `map_type::pair_atomic_type` class.
*
* Declare this struct by trivial subclassing instead of type aliasing so we can have forward
* declaration of this struct somewhere else.
*/
struct slot_type : public map_type::slot_type {};

struct CompressedStreamInfo {
CompressedStreamInfo() = default;
explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_)
Expand Down Expand Up @@ -184,11 +193,11 @@ struct StripeStream {
*/
struct stripe_dictionary {
// input
device_span<slot_type> map_slots; // hash map storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
size_type num_rows = 0; // number of rows in the stripe
device_span<window_type> map_slots; // hash map (windows) storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
size_type num_rows = 0; // number of rows in the stripe

// output
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
Expand Down
26 changes: 17 additions & 9 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

#include "io/comp/nvcomp_adapter.hpp"
#include "io/orc/orc_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
#include "io/utilities/column_utils.cuh"
#include "writer_impl.hpp"
Expand Down Expand Up @@ -2110,7 +2111,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
bool sort_dictionaries,
rmm::cuda_stream_view stream)
{
std::vector<std::vector<rmm::device_uvector<gpu::slot_type>>> hash_maps_storage(
// Variable to keep track of the current total map storage size
size_t total_map_storage_size = 0;
std::vector<std::vector<size_t>> hash_maps_storage_offsets(
orc_table.string_column_indices.size());
for (auto col_idx : orc_table.string_column_indices) {
auto& str_column = orc_table.column(col_idx);
Expand All @@ -2119,14 +2122,21 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
stripe.size == 0 ? 0
: segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end -
segmentation.rowgroups[stripe.first][col_idx].begin;
hash_maps_storage[str_column.str_index()].emplace_back(stripe_num_rows * 1.43, stream);
hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size);
total_map_storage_size += stripe_num_rows * gpu::occupancy_factor;
}
hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size);
}

hostdevice_2dvector<gpu::stripe_dictionary> stripe_dicts(
orc_table.num_string_columns(), segmentation.num_stripes(), stream);
if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}};

// Create a single bulk storage used by all sub-dictionaries
auto map_storage = gpu::storage_type{
total_map_storage_size,
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream}};

// Initialize stripe dictionaries
for (auto col_idx : orc_table.string_column_indices) {
auto& str_column = orc_table.column(col_idx);
Expand All @@ -2137,7 +2147,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
auto const stripe_idx = stripe.id;
auto& sd = stripe_dicts[str_col_idx][stripe_idx];

sd.map_slots = hash_maps_storage[str_col_idx][stripe_idx];
sd.map_slots = {map_storage.data() + hash_maps_storage_offsets[str_col_idx][stripe_idx],
hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] -
hash_maps_storage_offsets[str_col_idx][stripe_idx]};
sd.column_idx = col_idx;
sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin;
sd.start_rowgroup = stripe.first;
Expand All @@ -2150,7 +2162,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
}
stripe_dicts.host_to_device_async(stream);

gpu::initialize_dictionary_hash_maps(stripe_dicts, stream);
map_storage.initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()});
gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream);
// Copy the entry counts and char counts from the device to the host
stripe_dicts.device_to_host_sync(stream);
Expand Down Expand Up @@ -2184,8 +2196,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
col_use_dictionary = true;
} else {
// Clear hash map storage as dictionary encoding is not used for this stripe
hash_maps_storage[str_col_idx][stripe_idx] = rmm::device_uvector<gpu::slot_type>(0, stream);
sd.map_slots = {};
sd.map_slots = {};
}
}
// If any stripe uses dictionary encoding, allocate index storage for the whole column
Expand All @@ -2202,9 +2213,6 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
gpu::collect_map_entries(stripe_dicts, stream);
gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream);

// deallocate hash map storage, unused after this point
hash_maps_storage.clear();
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

// Clear map slots and attach order buffers
auto dictionaries_flat = stripe_dicts.host_view().flat_view();
for (auto& sd : dictionaries_flat) {
Expand Down
Loading