Skip to content

Commit

Permalink
Manual block scan
Browse files Browse the repository at this point in the history
  • Loading branch information
pmattione-nvidia committed Sep 10, 2024
1 parent 342c2f4 commit 50bbc94
Showing 1 changed file with 110 additions and 67 deletions.
177 changes: 110 additions & 67 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,48 @@ static __device__ int gpuUpdateValidityAndRowIndicesFlat(
return valid_count;
}

struct scan_results
{
uint32_t warp_bits;
int thread_count_within_warp;
int warp_count;

int thread_count_within_block;
int block_count;
};

template <int decode_block_size>
static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t thread_mask, scan_results& results)
{
constexpr int num_warps = decode_block_size / cudf::detail::warp_size;

results.warp_bits = warp_bits;
results.warp_count = __popc(results.warp_bits);
results.thread_count_within_warp = __popc(results.warp_bits & thread_mask);

__shared__ uint32_t warp_counts[num_warps];
if(warp_lane == 0) {
warp_counts[warp_index] = results.warp_count;
}
__syncthreads();

results.block_count = 0;
results.thread_count_within_block = results.thread_count_within_warp;
for(int warp_idx = 0; warp_idx < num_warps; ++warp_idx) {
results.block_count += warp_counts[warp_idx];
if(warp_idx < warp_index) {
results.thread_count_within_block += warp_counts[warp_idx];
}
}
}

template <int decode_block_size>
static __device__ void scan_block(int thread_bit, int warp_lane, int warp_index, uint32_t thread_mask, scan_results& results)
{
uint32_t warp_bits = ballot(thread_bit);
scan_block<decode_block_size>(warp_bits, warp_lane, warp_index, thread_mask, results);
}

template <int decode_block_size, bool nullable, typename level_t, typename state_buf>
static __device__ int gpuUpdateValidityAndRowIndicesLists(
int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def,
Expand Down Expand Up @@ -597,6 +639,11 @@ if constexpr (enable_print_large_list) {
using block_scan = cub::BlockScan<int, decode_block_size>;
__shared__ typename block_scan::TempStorage scan_storage;

int const warp_lane = t % cudf::detail::warp_size;
bool const is_first_lane = warp_lane == 0;
int const warp_index = t / cudf::detail::warp_size;
uint32_t const lane_mask = (uint32_t(1) << warp_lane) - 1;

while (value_count < target_value_count) {

if constexpr (enable_print) {
Expand Down Expand Up @@ -688,9 +735,15 @@ if constexpr (enable_print_large_list) {
}

// queries is_valid from all threads, stores prior total and total total
int thread_value_count = 0, block_value_count = 0;
block_scan(scan_storage).ExclusiveSum(in_nesting_bounds, thread_value_count, block_value_count);
__syncthreads();

//WARP VALUE COUNT:
scan_results value_count_scan_results;
scan_block<decode_block_size>(in_nesting_bounds, warp_lane, warp_index, lane_mask, value_count_scan_results);

int thread_value_count_within_warp = value_count_scan_results.thread_count_within_warp;
int warp_value_count = value_count_scan_results.warp_count;
int thread_value_count = value_count_scan_results.thread_count_within_block;
int block_value_count = value_count_scan_results.block_count;

if constexpr (enable_print_large_list) {
if(in_nesting_bounds != (t % 4 == 0)) {
Expand Down Expand Up @@ -741,30 +794,22 @@ if constexpr (enable_print_large_list) {
// the validity bit for thread t might actually represent output value t-6. the correct
// position for thread t's bit is thread_value_count.

static_assert(decode_block_size <= 8*sizeof(__uint128_t),
"This code relies on bits for block threads fitting within a uint128!");

auto shifted_validity = static_cast<__uint128_t>(is_valid) << thread_value_count;
auto or_reducer = [](const __uint128_t& lhs, const __uint128_t& rhs){
return lhs | rhs;
};

using block_reduce = cub::BlockReduce<__uint128_t, decode_block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;
__uint128_t block_valid_mask = block_reduce(reduce_storage).Reduce(shifted_validity, or_reducer);
//WARP VALID COUNT:
// for nested schemas, it's more complicated. This warp will visit 32 incoming values,
// however not all of them will necessarily represent a value at this nesting level. so
// the validity bit for thread t might actually represent output value t-6. the correct
// position for thread t's bit is thread_value_count. for cuda 11 we could use
// __reduce_or_sync(), but until then we have to do a warp reduce.
uint32_t const warp_valid_mask = WarpReduceOr32((uint32_t)is_valid << thread_value_count_within_warp);
auto thread_mask = (uint32_t(1) << thread_value_count_within_warp) - 1;

//Reduction result is only visible to thread zero, must share with other threads:
__shared__ __uint128_t block_valid_mask_storage;
if(t == 0) { block_valid_mask_storage = block_valid_mask; }
__syncthreads();
block_valid_mask = block_valid_mask_storage;
scan_results valid_count_scan_results;
scan_block<decode_block_size>(warp_valid_mask, warp_lane, warp_index, thread_mask, valid_count_scan_results);

auto count_set_bits = [](__uint128_t bits){
return __popcll((uint64_t)bits) + __popcll((uint64_t)(bits >> 64));
};
auto thread_mask = (__uint128_t(1) << thread_value_count) - 1;
int const thread_valid_count = count_set_bits(block_valid_mask & thread_mask);
//int const block_valid_count = count_set_bits(block_valid_mask);
int warp_valid_count = valid_count_scan_results.warp_count;
int thread_valid_count = valid_count_scan_results.thread_count_within_block;
int block_valid_count = valid_count_scan_results.block_count;

if constexpr (enable_print_large_list) {
if(((d_idx == 0) && (is_valid != (t % 4 == 0))) || ((d_idx == 1) && !is_valid)) {
Expand All @@ -775,36 +820,44 @@ if constexpr (enable_print_large_list) {
printf("CUB GARBAGE: blockIdx.x %d, value_count %d, target_value_count %d, t %d, d_idx %d, thread_valid_count %d\n",
blockIdx.x, value_count, target_value_count, t, d_idx, thread_valid_count);
}
/* if(((d_idx == 0) && (block_valid_count != 32)) || ((d_idx == 1) && (block_valid_count != 128))) {
if(((d_idx == 0) && (block_valid_count != 32)) || ((d_idx == 1) && (block_valid_count != 128))) {
printf("CUB GARBAGE: blockIdx.x %d, value_count %d, target_value_count %d, t %d, d_idx %d, block_valid_count %d\n",
blockIdx.x, value_count, target_value_count, t, d_idx, block_valid_count);
}*/
}
}

if constexpr (enable_print) {
if((block_valid_mask == 0) && (t == 0) && (d_idx == max_depth)) {
if((block_valid_count == 0) && (t == 0) && (d_idx == max_depth)) {
printf("EMPTY VALID MASK: def_level %d, max_def_level %d, in_nesting_bounds %d, start_depth %d, "
"end_depth %d, in_row_bounds %d, row_index %d, row_index_lower_bound %d, last_row %d, input_row_count %d\n",
def_level, ni.max_def_level, in_nesting_bounds, start_depth, end_depth, in_row_bounds, row_index,
row_index_lower_bound, last_row, input_row_count); }

if (t == 0) { printf("block_valid_mask %u\n", int(block_valid_mask)); }
if (t == 0) { printf("block_valid_count %u\n", int(block_valid_count)); }
if (t < 32) { printf("t %d, thread_valid_count %d\n", t, thread_valid_count); }
}

// compute warp and thread value counts for the -next- nesting level. we need to
// do this for nested schemas so that we can emit an offset for the -current- nesting
// level. more concretely : the offset for the current nesting level == current length of the
// next nesting level
int32_t next_thread_value_count = 0, next_block_value_count = 0;
int next_thread_value_count_within_warp = 0, next_warp_value_count = 0;
int next_thread_value_count = 0, next_block_value_count = 0;
int next_in_nesting_bounds = 0;
if (d_idx < max_depth) {
//mask is different between depths
next_in_nesting_bounds =
(d_idx + 1 >= start_depth && d_idx + 1 <= end_depth && in_row_bounds) ? 1 : 0;

block_scan(scan_storage).ExclusiveSum(next_in_nesting_bounds, next_thread_value_count, next_block_value_count);
__syncthreads();
//NEXT WARP VALUE COUNT:
scan_results next_value_count_scan_results;
scan_block<decode_block_size>(next_in_nesting_bounds, warp_lane, warp_index, lane_mask, next_value_count_scan_results);

next_thread_value_count_within_warp = next_value_count_scan_results.thread_count_within_warp;
next_warp_value_count = next_value_count_scan_results.warp_count;
next_thread_value_count = next_value_count_scan_results.thread_count_within_block;
next_block_value_count = next_value_count_scan_results.block_count;


if constexpr (enable_print_large_list) {
if(next_in_nesting_bounds != 1) {
Expand Down Expand Up @@ -873,49 +926,38 @@ if constexpr (enable_print_large_list) {
// (that is, read and write positions are already pre-bounded by first_row/num_rows).
// since we are about to write the validity vector
// here we need to adjust our computed mask to take into account the write row bounds.
int warp_null_count = 0;
if constexpr (nullable) {
if (ni.valid_map != nullptr) {
//TODO: Consider OR'ING for next_thread_value_count and popc() for next_thread_value_count
//so that we don't have to take a ballot here. Is uint128 so may deconstruct to this anyway ...
uint32_t const warp_count_mask = ballot(in_nesting_bounds);
if ((t % cudf::detail::warp_size) == 0) {
// last bit in the warp to store //in old is warp_valid_mask_bit_count

int warp_null_count = 0;
if(is_first_lane && (ni.valid_map != nullptr) && (warp_value_count > 0)) {
// last bit in the warp to store //in old is warp_valid_mask_bit_count
//so it's a count of everything in nesting bounds, though bits can be zero if NULL at this level
int const bit_count = __popc(warp_count_mask);
if(bit_count > 0) {

// absolute bit offset into the output validity map
//is cumulative sum of bit_count at the given nesting depth
// DON'T subtract by first_row: since it's lists it's not 1-row-per-value
int const bit_offset = ni.valid_map_offset + thread_value_count;
auto const shifted_valid_mask = static_cast<uint32_t>(block_valid_mask >> thread_value_count);
auto const bit_range_mask = (1 << bit_count) - 1; //mainly needed for warp_null_count
auto const warp_validity_mask = shifted_valid_mask & bit_range_mask;

store_validity(bit_offset, ni.valid_map, warp_validity_mask, bit_count);
warp_null_count = bit_count - __popc(warp_validity_mask);

if constexpr (enable_print) {
printf("STORE VALIDITY: t %d, depth %d, thread_value_count %d, valid_map_offset %d, bit_offset %d, bit_count %d, warp_validity_mask %u\n",
t, d_idx, thread_value_count, ni.valid_map_offset, bit_offset, bit_count, warp_validity_mask);
printf("NUM NULLS: t %d, depth %d, warp_null_count %d\n", t, d_idx, warp_null_count);
}

// absolute bit offset into the output validity map
//is cumulative sum of warp_value_count at the given nesting depth
// DON'T subtract by first_row: since it's lists it's not 1-row-per-value
int const bit_offset = ni.valid_map_offset + thread_value_count;

store_validity(bit_offset, ni.valid_map, warp_valid_mask, warp_value_count);
warp_null_count = warp_value_count - warp_valid_count;

if constexpr (enable_print) {
printf("STORE VALIDITY: t %d, depth %d, thread_value_count %d, valid_map_offset %d, bit_offset %d, warp_value_count %d, warp_valid_mask %u\n",
t, d_idx, thread_value_count, ni.valid_map_offset, bit_offset, warp_value_count, warp_valid_mask);
printf("NUM NULLS: t %d, depth %d, warp_null_count %d\n", t, d_idx, warp_null_count);
}
}
}

// sum null counts. we have to do it this way instead of just incrementing by (value_count -
// valid_count) because valid_count also includes rows that potentially start before our row
// bounds. if we could come up with a way to clean that up, we could remove this and just
// compute it directly at the end of the kernel.
size_type const block_null_count =
cudf::detail::single_lane_block_sum_reduce<decode_block_size, 0>(warp_null_count);
if constexpr (enable_print) {
if (t == 0) { printf("BLOCK NULLS: depth %d, prior %d, block_null_count %u\n",
d_idx, ni.null_count, block_null_count); }
if (t == 0) {
size_type const block_null_count = block_value_count - block_valid_count;
if constexpr (enable_print) {
if (t == 0) { printf("BLOCK NULLS: depth %d, prior %d, block_null_count %u\n",
d_idx, ni.null_count, block_null_count); }
}
ni.null_count += block_null_count;
}
if (t == 0) { ni.null_count += block_null_count; }
}

// if this is valid and we're at the leaf, output dst_pos
Expand Down Expand Up @@ -952,7 +994,6 @@ if constexpr (enable_print_large_list) {

// update stuff
if (t == 0) {
int const block_valid_count = count_set_bits(block_valid_mask);
ni.valid_count += block_valid_count;
ni.value_count += block_value_count;
ni.valid_map_offset += block_value_count;
Expand All @@ -963,6 +1004,8 @@ if constexpr (enable_print_large_list) {
block_value_count = next_block_value_count;
thread_value_count = next_thread_value_count;
in_nesting_bounds = next_in_nesting_bounds;
warp_value_count = next_warp_value_count;
thread_value_count_within_warp = next_thread_value_count_within_warp;
} //END OF DEPTH LOOP

if constexpr (enable_print) {
Expand Down

0 comments on commit 50bbc94

Please sign in to comment.