Skip to content

Commit

Permalink
Almost works. Need to do cg split work.
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Aug 13, 2024
1 parent b09cfa0 commit 089f909
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 130 deletions.
202 changes: 145 additions & 57 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <rmm/exec_policy.hpp>

#include <cooperative_groups.h>
#include <cuda/atomic>

namespace cudf::io::parquet::detail {
Expand All @@ -32,47 +33,142 @@ namespace {
constexpr int DEFAULT_BLOCK_SIZE = 256;
}

template <typename T>
struct equality_functor {
column_device_view const& col;
template <typename T>
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx)
__device__ bool operator()(key_type const lhs_idx, key_type const rhs_idx) const
{
// We don't call this for nulls so this is fine
auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{};
return equal(col.element<T>(lhs_idx), col.element<T>(rhs_idx));
// We don't call this for nulls so this is fine
auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{};
auto const result = equal(col.element<T>(lhs_idx), col.element<T>(rhs_idx));
printf("col_type_id:%d, equality idx1:%d, idx2:%d, eq:%d\n",
col.type().id(),
lhs_idx,
rhs_idx,
result);
return result;
}
};

template <typename T>
struct hash_functor {
column_device_view const& col;
__device__ auto operator()(key_type idx) const
{
auto const hashed = cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
printf("hashing idx: %d = %d\n", idx, hashed);
return hashed; // cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
}
};

struct map_insert_fn {
storage_ref_type const& storage_ref;

template <typename T>
__device__ auto operator()(size_type idx) const
__device__ bool operator()(column_device_view const& col, key_type i)
{
if constexpr (column_device_view::has_element_accessor<T>()) {
return cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
using equality_fn_type = equality_functor<T>;
using hash_fn_type = hash_functor<T>;
using probing_scheme_type = cuco::linear_probing<cg_size, hash_fn_type>;

// Instantiate hash and equality functors.
auto hash_fn = hash_fn_type{col};
auto equal_fn = equality_fn_type{col};

// Make a view of the hash map
cuco::static_map_ref<key_type,
mapped_type,
SCOPE,
equality_fn_type,
probing_scheme_type,
storage_ref_type>
hash_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
{equal_fn},
{hash_fn},
{},
storage_ref};

// Create another map with insert operator
auto map_insert_ref = hash_map_ref.with_operators(cuco::insert_and_find);
// Insert
auto [iter, found] = map_insert_ref.insert_and_find(cuco::pair{i, i});
printf("Inserted k=%d, v=%d, unique=%d\n", iter->first, iter->second, found);
return found;
} else {
return static_cast<uint32_t>(KEY_SENTINEL);
CUDF_UNREACHABLE("Unsupported type to insert in map");
}
}
}; // namespace cudf::io::parquet::detail

struct map_find_fn {
storage_ref_type const& storage_ref;

template <typename T>
__device__ cuco::pair<key_type, mapped_type> operator()(column_device_view const& col, key_type i)
{
if constexpr (column_device_view::has_element_accessor<T>()) {
using equality_fn_type = equality_functor<T>;
using hash_fn_type = hash_functor<T>;
using probing_scheme_type = cuco::linear_probing<cg_size, hash_fn_type>;

// Instantiate hash and equality functors.
auto hash_fn = hash_fn_type{col};
auto equal_fn = equality_fn_type{col};

// Make a view of the hash map
cuco::static_map_ref<key_type,
mapped_type,
SCOPE,
equality_fn_type,
probing_scheme_type,
storage_ref_type>
hash_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
{equal_fn},
{hash_fn},
{},
storage_ref};

// Create another map with find operator
auto map_find_ref = hash_map_ref.with_operators(cuco::find);

// Find the key = i
auto found_slot = map_find_ref.find(i);

// Check if we found the previously inserted key.
cudf_assert(found_slot != map_find_ref.end() &&
"Unable to find value in map in dictionary index construction");

// Return a pair of the found key and value.
printf("Find=%d, Found slot: k=%d, v=%d\n", i, found_slot->first, found_slot->second);
return {found_slot->first, found_slot->second};
} else {
CUDF_UNREACHABLE("Unsupported type to find in map");
}
}
};

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(map_ref_type* map_refs,
populate_chunk_hash_maps_kernel(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;
auto const col_idx = blockIdx.y;
auto const block_x = blockIdx.x;
auto const frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

using block_reduce = cub::BlockReduce<size_type, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

Expand All @@ -81,12 +177,11 @@ CUDF_KERNEL void __launch_bounds__(block_size)
size_type const end_value_idx = row_to_value_idx(end_row, *col);

column_device_view const& data_col = *col->leaf_column;

// Make a view of the hash map
auto hash_map_ref = *(map_refs + chunk->dict_map_idx);
storage_ref_type const storage_ref{chunk->dict_map_size, map_storage + chunk->dict_map_offset};

__shared__ size_type total_num_dict_entries;
thread_index_type val_idx = s_start_value_idx + t;

while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);
Expand All @@ -95,14 +190,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
size_type is_unique = 0;
size_type uniq_elem_size = 0;
if (is_valid) {
auto const val = type_dispatcher(data_col.type(), hash_functor{data_col}, val_idx);
if (val != KEY_SENTINEL) {
auto map_insert_ref = hash_map_ref.with_operators(cuco::insert);
is_unique = map_insert_ref.insert(cuco::pair{val, val_idx});
} else {
is_unique = false;
}
// if (!t) printf("inserted val = %u, val_idx = %ld\n", val, val_idx);
auto const is_unique =
type_dispatcher(data_col.type(), map_insert_fn{storage_ref}, data_col, val_idx);
uniq_elem_size = [&]() -> size_type {
if (not is_unique) { return 0; }
switch (col->physical_type) {
Expand Down Expand Up @@ -152,24 +241,30 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
collect_map_entries_kernel(map_ref_type* map_refs, device_span<EncColumnChunk> chunks)
collect_map_entries_kernel(storage_type::window_type* map_storage,
device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
if (not chunk.use_dictionary) { return; }

auto t = threadIdx.x;
auto map = *(map_refs + chunk.dict_map_idx);
[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

storage_ref_type const storage_ref{chunk.dict_map_size, map_storage + chunk.dict_map_offset};

__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;
using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();

for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
auto* slot = reinterpret_cast<map_ref_type::value_type*>(&map + t + i);
auto* slot = reinterpret_cast<slot_type*>(storage_ref.data() + chunk.dict_map_offset + t + i);
auto key = slot->first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size");
printf("Writing %d at loc: %d\n", key, loc);
chunk.dict_data[loc] = key;
// If sorting dict page ever becomes a hard requirement, enable the following statement and
// add a dict sorting step before storing into the slot's second field.
Expand All @@ -182,18 +277,20 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
get_dictionary_indices_kernel(map_ref_type* map_refs,
get_dictionary_indices_kernel(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;
auto const col_idx = blockIdx.y;
auto const block_x = blockIdx.x;
auto const frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto const col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

Expand All @@ -203,54 +300,45 @@ CUDF_KERNEL void __launch_bounds__(block_size)
auto const end_value_idx = row_to_value_idx(end_row, *col);

column_device_view const& data_col = *col->leaf_column;

auto map = *(map_refs + chunk->dict_map_idx);
storage_ref_type const storage_ref{chunk->dict_map_size, map_storage + chunk->dict_map_offset};

thread_index_type val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
if (data_col.is_valid(val_idx)) {
auto val = type_dispatcher(data_col.type(), hash_functor{data_col}, val_idx);
if (val != static_cast<uint32_t>(KEY_SENTINEL)) {
auto map_find_ref = map.with_operators(cuco::find);
auto found_slot = map_find_ref.find(val);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto* val_ptr = reinterpret_cast<map_ref_type::mapped_type*>(&found_slot->second);
chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr;
}
}
auto [found_key, found_value] =
type_dispatcher(data_col.type(), map_find_fn{storage_ref}, data_col, val_idx);
// No need for atomic as this is not going to be modified by any other thread
chunk->dict_index[val_idx - s_ck_start_val_idx] = found_value;
}

val_idx += block_size;
}
}

void populate_chunk_hash_maps(map_ref_type* map_refs,
void populate_chunk_hash_maps(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
dim3 const dim_grid(frags.size().second, frags.size().first);
populate_chunk_hash_maps_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_refs, frags);
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}

void collect_map_entries(map_ref_type* map_refs,
void collect_map_entries(storage_type::window_type* map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 1024;
collect_map_entries_kernel<block_size>
<<<chunks.size(), block_size, 0, stream.value()>>>(map_refs, chunks);
<<<chunks.size(), block_size, 0, stream.value()>>>(map_storage, chunks);
}

void get_dictionary_indices(map_ref_type* map_refs,
void get_dictionary_indices(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
dim3 const dim_grid(frags.size().second, frags.size().first);
get_dictionary_indices_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_refs, frags);
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}
} // namespace cudf::io::parquet::detail
28 changes: 7 additions & 21 deletions cpp/src/io/parquet/parquet_gpu.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,28 @@

namespace cudf::io::parquet::detail {

using key_type = uint32_t;
using mapped_type = uint32_t;
using key_type = size_type;
using mapped_type = size_type;

auto constexpr cg_size = 1; ///< A CUDA Cooperative Group of 8 threads to handle each subset
auto constexpr window_size = 1; ///< Number of concurrent slots handled by each thread

auto constexpr KEY_SENTINEL = key_type{std::numeric_limits<uint32_t>::max()};
auto constexpr VALUE_SENTINEL = mapped_type{std::numeric_limits<uint32_t>::max()};
auto constexpr KEY_SENTINEL = key_type{std::numeric_limits<key_type>::max()};
auto constexpr VALUE_SENTINEL = mapped_type{std::numeric_limits<mapped_type>::max()};
auto constexpr SCOPE = cuda::thread_scope_device;

using slot_type = cuco::pair<key_type, mapped_type>;

using storage_type = cuco::aow_storage<slot_type, window_size>;
using storage_ref_type = typename storage_type::ref_type;

template <typename T>
struct my_hasher {
__device__ auto operator()(T index) const { return index; }
};

using probing_scheme_type = cuco::linear_probing<cg_size, my_hasher<key_type>>;

using map_ref_type = cuco::static_map_ref<key_type,
mapped_type,
SCOPE,
thrust::equal_to<key_type>,
probing_scheme_type,
storage_ref_type>; ///< Map ref type

/**
* @brief Insert chunk values into their respective hash maps
*
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(map_ref_type* map_refs,
void populate_chunk_hash_maps(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand All @@ -76,7 +62,7 @@ void populate_chunk_hash_maps(map_ref_type* map_refs,
* @param chunks Flat span of chunks to compact hash maps for
* @param stream CUDA stream to use
*/
void collect_map_entries(map_ref_type* map_refs,
void collect_map_entries(storage_type::window_type* map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream);

Expand All @@ -92,7 +78,7 @@ void collect_map_entries(map_ref_type* map_refs,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(map_ref_type* map_refs,
void get_dictionary_indices(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand Down
Loading

0 comments on commit 089f909

Please sign in to comment.