Skip to content

Commit

Permalink
Refactor ORC dict encoding with new cuco static_map
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Oct 10, 2024
1 parent 553d8ec commit 953eccb
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 85 deletions.
115 changes: 54 additions & 61 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,6 @@ void rowgroup_char_counts(device_2dspan<size_type> counts,
counts, orc_columns, rowgroup_bounds, str_col_indexes);
}

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
initialize_dictionary_hash_maps_kernel(device_span<stripe_dictionary> dictionaries)
{
auto const dict_map = dictionaries[blockIdx.x].map_slots;
auto const t = threadIdx.x;
for (size_type i = 0; i < dict_map.size(); i += block_size) {
if (t + i < dict_map.size()) {
new (&dict_map[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&dict_map[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
}
}
}

struct equality_functor {
column_device_view const& col;
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const
Expand Down Expand Up @@ -121,26 +107,35 @@ CUDF_KERNEL void __launch_bounds__(block_size)
auto const& col = columns[dict.column_idx];

// Make a view of the hash map
auto hash_map_mutable = map_type::device_mutable_view(dict.map_slots.data(),
dict.map_slots.size(),
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
using probing_scheme_type = cuco::linear_probing<map_cg_size, hash_functor>;

storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()};
// Make a view of the hash map.
auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
equality_fn,
probing_scheme_type{hash_fn},
cuco::thread_scope_block,
storage_ref};

// Create a map ref with `cuco::insert` operator
auto has_map_insert_ref = hash_map_ref.with_operators(cuco::insert);

auto const start_row = dict.start_row;
auto const end_row = dict.start_row + dict.num_rows;

size_type entry_count{0};
size_type char_count{0};

// all threads should loop the same number of times
for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) {
auto const is_valid = cur_row < end_row and col.is_valid(cur_row);

if (is_valid) {
// insert element at cur_row to hash map and count successful insertions
auto const is_unique =
hash_map_mutable.insert(std::pair(cur_row, cur_row), hash_fn, equality_fn);
auto const is_unique = has_map_insert_ref.insert(cuco::pair{cur_row, cur_row});

if (is_unique) {
++entry_count;
Expand Down Expand Up @@ -174,25 +169,22 @@ CUDF_KERNEL void __launch_bounds__(block_size)

if (not dict.is_enabled) { return; }

auto const t = threadIdx.x;
auto map = map_type::device_view(dict.map_slots.data(),
dict.map_slots.size(),
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

auto t = threadIdx.x;
__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;

using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();
for (size_type i = 0; i < dict.map_slots.size(); i += block_size) {
if (t + i < dict.map_slots.size()) {
auto* slot = reinterpret_cast<map_type::value_type*>(map.begin_slot() + t + i);
auto key = slot->first;

for (; t < dict.map_slots.size(); t += block_size) {
auto window = dict.map_slots.begin() + t;
// Collect all slots from each window.
for (auto& slot : *window) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
dict.data[loc] = key;
slot->second = loc;
slot.second = loc;
}
}
}
Expand All @@ -205,47 +197,48 @@ CUDF_KERNEL void __launch_bounds__(block_size)
{
auto const col_idx = blockIdx.x;
auto const stripe_idx = blockIdx.y;
auto const t = threadIdx.x;
auto const& dict = dictionaries[col_idx][stripe_idx];
auto const& col = columns[dict.column_idx];

if (not dict.is_enabled) { return; }

auto const t = threadIdx.x;
// Make a view of the hash map
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
using probing_scheme_type = cuco::linear_probing<map_cg_size, hash_functor>;

storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()};
// Make a view of the hash map.
auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
equality_fn,
probing_scheme_type{hash_fn},
cuco::thread_scope_block,
storage_ref};

// Create a map ref with `cuco::insert` operator
auto has_map_find_ref = hash_map_ref.with_operators(cuco::find);

auto const start_row = dict.start_row;
auto const end_row = dict.start_row + dict.num_rows;

auto const map = map_type::device_view(dict.map_slots.data(),
dict.map_slots.size(),
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

thread_index_type cur_row = start_row + t;
while (cur_row < end_row) {
for (thread_index_type cur_row = start_row + t; cur_row < end_row; cur_row += block_size) {
if (col.is_valid(cur_row)) {
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const found_slot = map.find(cur_row, hash_fn, equality_fn);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto const val_ptr = reinterpret_cast<map_type::mapped_type const*>(&found_slot->second);
dict.index[cur_row] = *val_ptr;
}
dict.index[cur_row] = [&]() {
auto const found_slot = has_map_find_ref.find(cur_row);

// Fail if we didn't find the previously inserted key.
cudf_assert(found_slot != has_map_find_ref.end() &&
"Unable to find value in map in dictionary index construction");

// Return the found value.
return found_slot->second;
}();
}
cur_row += block_size;
}
}

void initialize_dictionary_hash_maps(device_2dspan<stripe_dictionary> dictionaries,
rmm::cuda_stream_view stream)
{
if (dictionaries.count() == 0) { return; }
constexpr int block_size = 1024;
initialize_dictionary_hash_maps_kernel<block_size>
<<<dictionaries.count(), block_size, 0, stream.value()>>>(dictionaries.flat_view());
}

void populate_dictionary_hash_maps(device_2dspan<stripe_dictionary> dictionaries,
device_span<orc_column_device_view const> columns,
rmm::cuda_stream_view stream)
Expand Down
39 changes: 24 additions & 15 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/utilities/column_buffer.hpp"
#include "orc.hpp"

#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/timezone.cuh>
#include <cudf/io/orc_types.hpp>
#include <cudf/io/types.hpp>
Expand All @@ -40,19 +41,27 @@ namespace gpu {
using cudf::detail::device_2dspan;
using cudf::detail::host_2dspan;

using key_type = size_type;
using mapped_type = size_type;
using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using slot_type = cuco::pair<key_type, mapped_type>;

auto constexpr KEY_SENTINEL = size_type{-1};
auto constexpr VALUE_SENTINEL = size_type{-1};

using map_type = cuco::legacy::static_map<size_type, size_type>;

/**
* @brief The alias of `map_type::pair_atomic_type` class.
*
* Declare this struct by trivial subclassing instead of type aliasing so we can have forward
* declaration of this struct somewhere else.
*/
struct slot_type : public map_type::slot_type {};

struct CompressedStreamInfo {
CompressedStreamInfo() = default;
explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_)
Expand Down Expand Up @@ -184,11 +193,11 @@ struct StripeStream {
*/
struct stripe_dictionary {
// input
device_span<slot_type> map_slots; // hash map storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
size_type num_rows = 0; // number of rows in the stripe
device_span<window_type> map_slots; // hash map (windows) storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
size_type num_rows = 0; // number of rows in the stripe

// output
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
Expand Down
26 changes: 17 additions & 9 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

#include "io/comp/nvcomp_adapter.hpp"
#include "io/orc/orc_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
#include "io/utilities/column_utils.cuh"
#include "writer_impl.hpp"
Expand Down Expand Up @@ -2110,7 +2111,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
bool sort_dictionaries,
rmm::cuda_stream_view stream)
{
std::vector<std::vector<rmm::device_uvector<gpu::slot_type>>> hash_maps_storage(
// Variable to keep track of the current total map storage size
size_t total_map_storage_size = 0;
std::vector<std::vector<size_t>> hash_maps_storage_offsets(
orc_table.string_column_indices.size());
for (auto col_idx : orc_table.string_column_indices) {
auto& str_column = orc_table.column(col_idx);
Expand All @@ -2119,14 +2122,21 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
stripe.size == 0 ? 0
: segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end -
segmentation.rowgroups[stripe.first][col_idx].begin;
hash_maps_storage[str_column.str_index()].emplace_back(stripe_num_rows * 1.43, stream);
hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size);
total_map_storage_size += stripe_num_rows * gpu::occupancy_factor;
}
hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size);
}

hostdevice_2dvector<gpu::stripe_dictionary> stripe_dicts(
orc_table.num_string_columns(), segmentation.num_stripes(), stream);
if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}};

// Create a single bulk storage used by all sub-dictionaries
auto map_storage = gpu::storage_type{
total_map_storage_size,
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream}};

// Initialize stripe dictionaries
for (auto col_idx : orc_table.string_column_indices) {
auto& str_column = orc_table.column(col_idx);
Expand All @@ -2137,7 +2147,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
auto const stripe_idx = stripe.id;
auto& sd = stripe_dicts[str_col_idx][stripe_idx];

sd.map_slots = hash_maps_storage[str_col_idx][stripe_idx];
sd.map_slots = {map_storage.data() + hash_maps_storage_offsets[str_col_idx][stripe_idx],
hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] -
hash_maps_storage_offsets[str_col_idx][stripe_idx]};
sd.column_idx = col_idx;
sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin;
sd.start_rowgroup = stripe.first;
Expand All @@ -2150,7 +2162,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
}
stripe_dicts.host_to_device_async(stream);

gpu::initialize_dictionary_hash_maps(stripe_dicts, stream);
map_storage.initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()});
gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream);
// Copy the entry counts and char counts from the device to the host
stripe_dicts.device_to_host_sync(stream);
Expand Down Expand Up @@ -2184,8 +2196,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
col_use_dictionary = true;
} else {
// Clear hash map storage as dictionary encoding is not used for this stripe
hash_maps_storage[str_col_idx][stripe_idx] = rmm::device_uvector<gpu::slot_type>(0, stream);
sd.map_slots = {};
sd.map_slots = {};
}
}
// If any stripe uses dictionary encoding, allocate index storage for the whole column
Expand All @@ -2202,9 +2213,6 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
gpu::collect_map_entries(stripe_dicts, stream);
gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream);

// deallocate hash map storage, unused after this point
hash_maps_storage.clear();

// Clear map slots and attach order buffers
auto dictionaries_flat = stripe_dicts.host_view().flat_view();
for (auto& sd : dictionaries_flat) {
Expand Down

0 comments on commit 953eccb

Please sign in to comment.