Skip to content

Commit

Permalink
small tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
pmattione-nvidia committed Sep 18, 2024
1 parent 50bbc94 commit 7882879
Showing 1 changed file with 32 additions and 45 deletions.
77 changes: 32 additions & 45 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,13 @@ struct scan_results
};

template <int decode_block_size>
static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t thread_mask, scan_results& results)
static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t lane_mask, scan_results& results)
{
constexpr int num_warps = decode_block_size / cudf::detail::warp_size;

results.warp_bits = warp_bits;
results.warp_count = __popc(results.warp_bits);
results.thread_count_within_warp = __popc(results.warp_bits & thread_mask);
results.thread_count_within_warp = __popc(results.warp_bits & lane_mask);

__shared__ uint32_t warp_counts[num_warps];
if(warp_lane == 0) {
Expand All @@ -587,10 +587,10 @@ static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_in
}

template <int decode_block_size>
static __device__ void scan_block(int thread_bit, int warp_lane, int warp_index, uint32_t thread_mask, scan_results& results)
static __device__ void scan_block(int thread_bit, int warp_lane, int warp_index, uint32_t lane_mask, scan_results& results)
{
uint32_t warp_bits = ballot(thread_bit);
scan_block<decode_block_size>(warp_bits, warp_lane, warp_index, thread_mask, results);
scan_block<decode_block_size>(warp_bits, warp_lane, warp_index, lane_mask, results);
}

template <int decode_block_size, bool nullable, typename level_t, typename state_buf>
Expand Down Expand Up @@ -625,6 +625,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesLists(

int const row_index_lower_bound = s->row_index_lower_bound;
int const max_depth = s->col.max_nesting_depth - 1;
int max_depth_valid_count = s->nesting_info[max_depth].valid_count;

__syncthreads();

Expand Down Expand Up @@ -963,38 +964,39 @@ if constexpr (enable_print_large_list) {
// if this is valid and we're at the leaf, output dst_pos
// Read these before the sync, so that when thread 0 modifies them we've already read their values
int current_value_count = ni.value_count;
int current_valid_count = ni.valid_count;
__syncthreads(); // handle modification of ni.valid_count from below
if (is_valid && d_idx == max_depth) {
// for non-list types, the value count is always the same across
int const dst_pos = current_value_count + thread_value_count;
int const src_pos = current_valid_count + thread_valid_count;
int const output_index = rolling_index<state_buf::nz_buf_size>(src_pos);
__syncthreads(); // handle modification of ni.value_count from below
if (d_idx == max_depth) {
if (is_valid) {
// for non-list types, the value count is always the same across
int const dst_pos = current_value_count + thread_value_count;
int const src_pos = max_depth_valid_count + thread_valid_count;
int const output_index = rolling_index<state_buf::nz_buf_size>(src_pos);

if constexpr (enable_print || enable_print_range_error) {
if((output_index < 0) || (output_index >= state_buf::nz_buf_size)) {
printf("WHOA: output index STORE %d out of bounds!\n", output_index);
if constexpr (enable_print || enable_print_range_error) {
if((output_index < 0) || (output_index >= state_buf::nz_buf_size)) {
printf("WHOA: output index STORE %d out of bounds!\n", output_index);
}
if(dst_pos < 0) { printf("WHOA: dst_pos STORE %d out of bounds!\n", dst_pos); }
}
if(dst_pos < 0) { printf("WHOA: dst_pos STORE %d out of bounds!\n", dst_pos); }
}

if constexpr (enable_print) {
if (t == 0) { printf("ni.value_count %d, ni.valid_count %d\n", int(ni.value_count), int(ni.valid_count)); }
if (t < 32) { printf("t %d, src_pos %d, output_index %d\n", t, src_pos, output_index); }
if constexpr (enable_print) {
if (t == 0) { printf("ni.value_count %d, max_depth_valid_count %d\n", int(ni.value_count), max_depth_valid_count); }
if (t < 32) { printf("t %d, src_pos %d, output_index %d\n", t, src_pos, output_index); }

if((t == 0) && (src_pos == 0)) {printf("SPECIAL: output_index %d, dst_pos %d, ni.value_count %d, ni.valid_count %d, thread_value_count %d, thread_valid_count %d\n",
output_index, dst_pos, ni.value_count, ni.valid_count, thread_value_count, thread_valid_count);}
if((t == 0) && (src_pos == 0)) {printf("SPECIAL: output_index %d, dst_pos %d, ni.value_count %d, max_depth_valid_count %d, thread_value_count %d, thread_valid_count %d\n",
output_index, dst_pos, ni.value_count, max_depth_valid_count, thread_value_count, thread_valid_count);}

if (t == 0) { printf("OUTPUT_INDICES: output_index %d, dst_pos %d\n", output_index, dst_pos); }
}
if (t == 0) { printf("OUTPUT_INDICES: output_index %d, dst_pos %d\n", output_index, dst_pos); }
}

//Index from rolling buffer of values (which doesn't include nulls) to final array (which includes gaps for nulls)
sb->nz_idx[output_index] = dst_pos;
//Index from rolling buffer of values (which doesn't include nulls) to final array (which includes gaps for nulls)
sb->nz_idx[output_index] = dst_pos;
}
max_depth_valid_count += block_valid_count;
}

// update stuff
if (t == 0) {
ni.valid_count += block_valid_count;
ni.value_count += block_value_count;
ni.valid_map_offset += block_value_count;
}
Expand Down Expand Up @@ -1022,7 +1024,8 @@ if constexpr (enable_print_large_list) {

if (t == 0) {
// update valid value count for decoding and total # of values we've processed
s->nz_count = s->nesting_info[max_depth].valid_count;
s->nesting_info[max_depth].valid_count = max_depth_valid_count;
s->nz_count = max_depth_valid_count;
s->input_value_count = value_count;

// If we have lists # rows != # values
Expand All @@ -1036,8 +1039,7 @@ if constexpr (enable_print_large_list) {
}
}

__syncthreads();
return s->nesting_info[max_depth].valid_count;
return max_depth_valid_count;
}

// is the page marked nullable or not
Expand Down Expand Up @@ -1094,7 +1096,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
kernel_error::pointer error_code /*, int page_idx = -1, int num_pages = -1*/)
kernel_error::pointer error_code)
{
constexpr int rolling_buf_size = decode_block_size_t * 2;
constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size<decode_block_size_t>();
Expand All @@ -1108,11 +1110,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t)
page_state_s* const s = &state_g;
auto* const sb = &state_buffers;
int const page_idx = blockIdx.x;
/* page_idx = (page_idx == -1) ? blockIdx.x : page_idx + blockIdx.x;
if((page_idx >= num_pages) && (num_pages != -1)) {
printf("BAIL ON PAGE %d of %d\n", page_idx, num_pages);
return;
}*/
int const t = threadIdx.x;
PageInfo* pp = &pages[page_idx];

Expand Down Expand Up @@ -1357,13 +1354,6 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span<PageInfo> pages,

dim3 dim_block(decode_block_size, 1);
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page
/*
auto num_pages = pages.size();
auto grid_dim = 1; //2, 10, 40, 100 no problem; all = problem
dim3 dim_grid(grid_dim, 1); // 1 threadblock per page
for(decltype(num_pages) idx = 0; idx < num_pages; idx += grid_dim) {
*/
if (level_type_size == 1) {
if (is_list) {
gpuDecodePageDataGeneric<uint8_t,
Expand All @@ -1375,7 +1365,6 @@ for(decltype(num_pages) idx = 0; idx < num_pages; idx += grid_dim) {
decode_fixed_width_values_func>
<<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
// pages.device_ptr(), chunks, min_row, num_rows, error_code, idx, num_pages);
} else if (has_nesting) {
gpuDecodePageDataGeneric<uint8_t,
decode_block_size,
Expand Down Expand Up @@ -1408,7 +1397,6 @@ for(decltype(num_pages) idx = 0; idx < num_pages; idx += grid_dim) {
decode_fixed_width_values_func>
<<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
// pages.device_ptr(), chunks, min_row, num_rows, error_code, idx, num_pages);
} else if (has_nesting) {
gpuDecodePageDataGeneric<uint16_t,
decode_block_size,
Expand All @@ -1431,7 +1419,6 @@ for(decltype(num_pages) idx = 0; idx < num_pages; idx += grid_dim) {
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}
//}
}

void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span<PageInfo> pages,
Expand Down

0 comments on commit 7882879

Please sign in to comment.