From 788287936f4f21abc34a6bf9fc04f310e6b2824c Mon Sep 17 00:00:00 2001 From: Paul Mattione Date: Wed, 18 Sep 2024 18:06:33 -0400 Subject: [PATCH] small tweaks --- cpp/src/io/parquet/decode_fixed.cu | 77 +++++++++++++----------------- 1 file changed, 32 insertions(+), 45 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index b47b96b91a2..ac7a628bc19 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -562,13 +562,13 @@ struct scan_results }; template -static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t thread_mask, scan_results& results) +static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_index, uint32_t lane_mask, scan_results& results) { constexpr int num_warps = decode_block_size / cudf::detail::warp_size; results.warp_bits = warp_bits; results.warp_count = __popc(results.warp_bits); - results.thread_count_within_warp = __popc(results.warp_bits & thread_mask); + results.thread_count_within_warp = __popc(results.warp_bits & lane_mask); __shared__ uint32_t warp_counts[num_warps]; if(warp_lane == 0) { @@ -587,10 +587,10 @@ static __device__ void scan_block(uint32_t warp_bits, int warp_lane, int warp_in } template -static __device__ void scan_block(int thread_bit, int warp_lane, int warp_index, uint32_t thread_mask, scan_results& results) +static __device__ void scan_block(int thread_bit, int warp_lane, int warp_index, uint32_t lane_mask, scan_results& results) { uint32_t warp_bits = ballot(thread_bit); - scan_block(warp_bits, warp_lane, warp_index, thread_mask, results); + scan_block(warp_bits, warp_lane, warp_index, lane_mask, results); } template @@ -625,6 +625,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesLists( int const row_index_lower_bound = s->row_index_lower_bound; int const max_depth = s->col.max_nesting_depth - 1; + int max_depth_valid_count = s->nesting_info[max_depth].valid_count; __syncthreads(); @@ -963,38 +964,39 @@ if constexpr (enable_print_large_list) { // if this is valid and we're at the leaf, output dst_pos // Read these before the sync, so that when thread 0 modifies them we've already read their values int current_value_count = ni.value_count; - int current_valid_count = ni.valid_count; - __syncthreads(); // handle modification of ni.valid_count from below - if (is_valid && d_idx == max_depth) { - // for non-list types, the value count is always the same across - int const dst_pos = current_value_count + thread_value_count; - int const src_pos = current_valid_count + thread_valid_count; - int const output_index = rolling_index(src_pos); + __syncthreads(); // handle modification of ni.value_count from below + if (d_idx == max_depth) { + if (is_valid) { + // for non-list types, the value count is always the same across + int const dst_pos = current_value_count + thread_value_count; + int const src_pos = max_depth_valid_count + thread_valid_count; + int const output_index = rolling_index(src_pos); - if constexpr (enable_print || enable_print_range_error) { - if((output_index < 0) || (output_index >= state_buf::nz_buf_size)) { - printf("WHOA: output index STORE %d out of bounds!\n", output_index); + if constexpr (enable_print || enable_print_range_error) { + if((output_index < 0) || (output_index >= state_buf::nz_buf_size)) { + printf("WHOA: output index STORE %d out of bounds!\n", output_index); + } + if(dst_pos < 0) { printf("WHOA: dst_pos STORE %d out of bounds!\n", dst_pos); } } - if(dst_pos < 0) { printf("WHOA: dst_pos STORE %d out of bounds!\n", dst_pos); } - } - if constexpr (enable_print) { - if (t == 0) { printf("ni.value_count %d, ni.valid_count %d\n", int(ni.value_count), int(ni.valid_count)); } - if (t < 32) { printf("t %d, src_pos %d, output_index %d\n", t, src_pos, output_index); } + if constexpr (enable_print) { + if (t == 0) { printf("ni.value_count %d, max_depth_valid_count %d\n", int(ni.value_count), max_depth_valid_count); } + if (t < 32) { printf("t %d, src_pos %d, output_index %d\n", t, src_pos, output_index); } - if((t == 0) && (src_pos == 0)) {printf("SPECIAL: output_index %d, dst_pos %d, ni.value_count %d, ni.valid_count %d, thread_value_count %d, thread_valid_count %d\n", - output_index, dst_pos, ni.value_count, ni.valid_count, thread_value_count, thread_valid_count);} + if((t == 0) && (src_pos == 0)) {printf("SPECIAL: output_index %d, dst_pos %d, ni.value_count %d, max_depth_valid_count %d, thread_value_count %d, thread_valid_count %d\n", + output_index, dst_pos, ni.value_count, max_depth_valid_count, thread_value_count, thread_valid_count);} - if (t == 0) { printf("OUTPUT_INDICES: output_index %d, dst_pos %d\n", output_index, dst_pos); } - } + if (t == 0) { printf("OUTPUT_INDICES: output_index %d, dst_pos %d\n", output_index, dst_pos); } + } - //Index from rolling buffer of values (which doesn't include nulls) to final array (which includes gaps for nulls) - sb->nz_idx[output_index] = dst_pos; + //Index from rolling buffer of values (which doesn't include nulls) to final array (which includes gaps for nulls) + sb->nz_idx[output_index] = dst_pos; + } + max_depth_valid_count += block_valid_count; } // update stuff if (t == 0) { - ni.valid_count += block_valid_count; ni.value_count += block_value_count; ni.valid_map_offset += block_value_count; } @@ -1022,7 +1024,8 @@ if constexpr (enable_print_large_list) { if (t == 0) { // update valid value count for decoding and total # of values we've processed - s->nz_count = s->nesting_info[max_depth].valid_count; + s->nesting_info[max_depth].valid_count = max_depth_valid_count; + s->nz_count = max_depth_valid_count; s->input_value_count = value_count; // If we have lists # rows != # values @@ -1036,8 +1039,7 @@ if constexpr (enable_print_large_list) { } } - __syncthreads(); - return s->nesting_info[max_depth].valid_count; + return max_depth_valid_count; } // is the page marked nullable or not @@ -1094,7 +1096,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) device_span chunks, size_t min_row, size_t num_rows, - kernel_error::pointer error_code /*, int page_idx = -1, int num_pages = -1*/) + kernel_error::pointer error_code) { constexpr int rolling_buf_size = decode_block_size_t * 2; constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); @@ -1108,11 +1110,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) page_state_s* const s = &state_g; auto* const sb = &state_buffers; int const page_idx = blockIdx.x; -/* page_idx = (page_idx == -1) ? blockIdx.x : page_idx + blockIdx.x; - if((page_idx >= num_pages) && (num_pages != -1)) { - printf("BAIL ON PAGE %d of %d\n", page_idx, num_pages); - return; - }*/ int const t = threadIdx.x; PageInfo* pp = &pages[page_idx]; @@ -1357,13 +1354,6 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, dim3 dim_block(decode_block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page -/* - auto num_pages = pages.size(); - auto grid_dim = 1; //2, 10, 40, 100 no problem; all = problem - dim3 dim_grid(grid_dim, 1); // 1 threadblock per page - -for(decltype(num_pages) idx = 0; idx < num_pages; idx += grid_dim) { -*/ if (level_type_size == 1) { if (is_list) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); -// pages.device_ptr(), chunks, min_row, num_rows, error_code, idx, num_pages); } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); -// pages.device_ptr(), chunks, min_row, num_rows, error_code, idx, num_pages); } else if (has_nesting) { gpuDecodePageDataGeneric pages,