Skip to content

Commit

Permalink
Tests working
Browse files Browse the repository at this point in the history
  • Loading branch information
pmattione-nvidia committed Aug 27, 2024
1 parent 2ca9618 commit 4b5f91a
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 207 deletions.
14 changes: 13 additions & 1 deletion cpp/include/cudf/table/experimental/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1429,18 +1429,30 @@ class device_row_comparator {
__device__ bool operator()(size_type const lhs_element_index,
size_type const rhs_element_index) const noexcept
{
static constexpr bool enable_print = false;
if (check_nulls) {
bool const lhs_is_null{lhs.is_null(lhs_element_index)};
bool const rhs_is_null{rhs.is_null(rhs_element_index)};
if (lhs_is_null and rhs_is_null) {
return nulls_are_equal == null_equality::EQUAL;
} else if (lhs_is_null != rhs_is_null) {
if constexpr (enable_print) {
printf("NULLS UNEQUAL AT %d, %d; values: %d %d\n",
lhs_element_index, rhs_element_index, int(lhs_is_null), int(rhs_is_null));
}
return false;
}
}

return comparator(lhs.element<Element>(lhs_element_index),
bool result = comparator(lhs.element<Element>(lhs_element_index),
rhs.element<Element>(rhs_element_index));
if constexpr (enable_print && cuda::std::is_integral_v<Element>) {
if(!result) {
printf("VALUES UNEQUAL: AT %d, %d, VALUES %d, %d\n", lhs_element_index, rhs_element_index,
(int)lhs.element<Element>(lhs_element_index), (int)rhs.element<Element>(rhs_element_index));
}
}
return result;
}

template <typename Element,
Expand Down
462 changes: 298 additions & 164 deletions cpp/src/io/parquet/decode_fixed.cu

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ inline __device__ void gpuStoreOutput(uint32_t* dst,
bytebuf = 0;
}
*dst = bytebuf;

static constexpr bool enable_print = false;
if constexpr (enable_print) {
if (threadIdx.x == 0) {
printf("STORE VALUE %u at %p, src8 %p, dict_pos %u, dict_size %u, ofs %u\n",
bytebuf, dst, src8, dict_pos, dict_size, ofs);
}
}
}

/**
Expand Down Expand Up @@ -328,6 +336,7 @@ inline __device__ void gpuOutputFast(page_state_s* s, state_buf* sb, int src_pos
uint8_t const* dict;
uint32_t dict_pos, dict_size = s->dict_size;

auto dict_lookup_idx = rolling_index<state_buf::dict_buf_size>(src_pos);
if (s->dict_base) {
// Dictionary
dict_pos =
Expand All @@ -339,6 +348,15 @@ inline __device__ void gpuOutputFast(page_state_s* s, state_buf* sb, int src_pos
dict = s->data_start;
}
dict_pos *= (uint32_t)s->dtype_len_in;

static constexpr bool enable_print = false;
if constexpr (enable_print) {
if (threadIdx.x == 0) {
printf("PREP OUTPUT VALUE at dst %p, dict %p, dict_pos %u, dict_size %u, dict_base %p, dict_bits %d, dict_lookup_idx %d, dtype_len_in %d\n",
dst, dict, dict_pos, dict_size, s->dict_base, s->dict_bits, dict_lookup_idx, s->dtype_len_in);
}
}

gpuStoreOutput(dst, dict, dict_pos, dict_size);
}

Expand Down
101 changes: 63 additions & 38 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ inline __device__ void store_validity(int valid_map_offset,
if (relevant_mask == ~0) {
valid_map[word_offset] = valid_mask;
} else {
atomicAnd(valid_map + word_offset, ~(relevant_mask << bit_offset));
atomicOr(valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset);
atomicAnd(valid_map + word_offset, ~(relevant_mask << bit_offset)); //clears old bits
atomicOr(valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); //sets valid mask
}
}
// we're going to spill over into the next word.
Expand Down Expand Up @@ -719,40 +719,51 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// track (page-relative) row index for the thread so we can compare against input bounds
// keep track of overall # of rows we've read.
int const is_new_row = start_depth == 0 ? 1 : 0;
uint32_t const warp_row_count_mask = ballot(is_new_row);
uint32_t const warp_row_count_mask = ballot(is_new_row); //how many threads are starting a new row
//t is zero through 31. the shifted bit is the 1st through the 32nd bit. then we -1: mask
//the mask we and with is querying PRIOR threads
uint32_t const prior_thread_mask = ((1 << t) - 1); //query "for all threads before me"
uint32_t const prior_new_rows_bits = warp_row_count_mask & prior_thread_mask;
int32_t const num_prior_new_rows = __popc(prior_new_rows_bits);

int32_t const thread_row_index =
input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1);
input_row_count + ((num_prior_new_rows + is_new_row) - 1);

input_row_count += __popc(warp_row_count_mask);
// is this thread within read row bounds?
int const in_row_bounds = thread_row_index >= s->row_index_lower_bound &&
thread_row_index < (s->first_row + s->num_rows)
? 1
: 0;

// if we are within the range of nesting levels we should be adding value indices for
//if list: is from/in current rep level to/in the rep level AT the depth with the def value
int in_nesting_bounds = ((0 >= start_depth && 0 <= end_depth) && in_row_bounds) ? 1 : 0;

// compute warp and thread value counts
uint32_t const warp_count_mask =
ballot((0 >= start_depth && 0 <= end_depth) && in_row_bounds ? 1 : 0);
uint32_t const warp_count_mask = ballot(in_nesting_bounds);

warp_value_count = __popc(warp_count_mask);
// Note : ((1 << t) - 1) implies "for all threads before me"
thread_value_count = __popc(warp_count_mask & ((1 << t) - 1));
// thread_value_count : # of output values from the view of this thread
// is all threads before me that start from rep level zero (new row)
thread_value_count = __popc(warp_count_mask & prior_thread_mask);

// walk from 0 to max_depth
uint32_t next_thread_value_count, next_warp_value_count;
for (int s_idx = 0; s_idx < max_depth; s_idx++) {
PageNestingDecodeInfo* nesting_info = &nesting_info_base[s_idx];

// if we are within the range of nesting levels we should be adding value indices for
int const in_nesting_bounds =
((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0;

// everything up to the max_def_level is a non-null value
//if is NOT list, then means is-not-null, OR is-null in a CHILD node
//if IS list, also: is from/in current rep level to/in the rep level AT the depth with the def value
uint32_t const is_valid = d >= nesting_info->max_def_level && in_nesting_bounds ? 1 : 0;

// compute warp and thread valid counts
// bit is set for each thread in the warp that is_valid
//OR of all is_valid's shifted by thread_value_count
uint32_t const warp_valid_mask =
// for flat schemas, a simple ballot_sync gives us the correct count and bit positions
// because every value in the input matches to a value in the output
//If no lists: every entry is a new row, which may be null
!has_repetition
? ballot(is_valid)
:
Expand All @@ -763,8 +774,10 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// __reduce_or_sync(), but until then we have to do a warp reduce.
WarpReduceOr32(is_valid << thread_value_count);

//For this value, we save an offset at every depth (in the loop)
//# bits prior to this thread that are valid (set)
thread_valid_count = __popc(warp_valid_mask & ((1 << thread_value_count) - 1));
warp_valid_count = __popc(warp_valid_mask);
warp_valid_count = __popc(warp_valid_mask); //#set bits of all threads in warp

// if this is the value column emit an index for value decoding
if (is_valid && s_idx == max_depth - 1) {
Expand All @@ -778,10 +791,15 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// do this for nested schemas so that we can emit an offset for the -current- nesting
// level. more concretely : the offset for the current nesting level == current length of the
// next nesting level
uint32_t next_thread_value_count = 0, next_warp_value_count = 0;
int next_in_nesting_bounds = 0;
if (s_idx < max_depth - 1) {
uint32_t const next_warp_count_mask =
ballot((s_idx + 1 >= start_depth && s_idx + 1 <= end_depth && in_row_bounds) ? 1 : 0);
next_warp_value_count = __popc(next_warp_count_mask);
//mask is different between depths
next_in_nesting_bounds =
(s_idx + 1 >= start_depth && s_idx + 1 <= end_depth && in_row_bounds) ? 1 : 0;
uint32_t const next_warp_count_mask = ballot(next_in_nesting_bounds);

next_warp_value_count = __popc(next_warp_count_mask); //same for all threads, but not all depths
next_thread_value_count = __popc(next_warp_count_mask & ((1 << t) - 1));

// if we're -not- at a leaf column and we're within nesting/row bounds
Expand All @@ -792,34 +810,36 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
cudf::size_type const ofs = nesting_info_base[s_idx + 1].value_count +
next_thread_value_count +
nesting_info_base[s_idx + 1].page_start_value;
//STORE THE OFFSET FOR THE NEW LIST LOCATION
(reinterpret_cast<cudf::size_type*>(nesting_info->data_out))[idx] = ofs;
}
}

// nested schemas always read and write to the same bounds (that is, read and write positions
// are already pre-bounded by first_row/num_rows). flat schemas will start reading at the
// first value, even if that is before first_row, because we cannot trivially jump to
// the correct position to start reading. since we are about to write the validity vector here
// lists always read and write to the same bounds (that is, read and write positions
// are already pre-bounded by first_row/num_rows) how? we have pre-processed them.
// flat schemas will start reading at the first value, even if that is before first_row,
// because we cannot trivially jump to the correct position to start reading.
// why not? because we don't know how many nulls were before it (haven't preprocessed them)
// since we are about to write the validity vector here
// we need to adjust our computed mask to take into account the write row bounds.
int const in_write_row_bounds =
!has_repetition
? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows)
: in_row_bounds;
//is write_start in new
int const first_thread_in_write_range =
!has_repetition ? __ffs(ballot(in_write_row_bounds)) - 1 : 0;

// # of bits to of the validity mask to write out
int const warp_valid_mask_bit_count =
first_thread_in_write_range < 0 ? 0 : warp_value_count - first_thread_in_write_range;
!has_repetition ? __ffs(ballot(in_write_row_bounds)) - 1 : 0; //index of lowest bit set to

// increment count of valid values, count of total values, and update validity mask
if (!t) {
// # of bits to of the validity mask to write out //becomes bit_count
int const warp_valid_mask_bit_count =
first_thread_in_write_range < 0 ? 0 : warp_value_count - first_thread_in_write_range;

if (nesting_info->valid_map != nullptr && warp_valid_mask_bit_count > 0) {
uint32_t const warp_output_valid_mask = warp_valid_mask >> first_thread_in_write_range;
store_validity(nesting_info->valid_map_offset,
nesting_info->valid_map,
warp_output_valid_mask,
warp_valid_mask_bit_count);
store_validity(nesting_info->valid_map_offset, nesting_info->valid_map,
warp_output_valid_mask, warp_valid_mask_bit_count);
nesting_info->valid_map_offset += warp_valid_mask_bit_count;
nesting_info->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask);
}
Expand All @@ -830,7 +850,8 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// propagate value counts for the next level
warp_value_count = next_warp_value_count;
thread_value_count = next_thread_value_count;
}
in_nesting_bounds = next_in_nesting_bounds;
} //END OF DEPTH LOOP

input_value_count += min(32, (target_input_value_count - input_value_count));
__syncwarp();
Expand Down Expand Up @@ -1096,6 +1117,12 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->num_rows = (page_start_row + s->first_row) + max_page_rows <= max_row
? max_page_rows
: max_row - (page_start_row + s->first_row);

static constexpr bool enable_print = false;
if constexpr (enable_print) {
printf("NUM_ROWS: col.start_row %lu, page.chunk_row %d, page_start_row %lu, s->first_row %d, s->page.num_rows %d, max_row %lu, min_row %lu, num_rows %lu, s->num_rows %d\n",
s->col.start_row, s->page.chunk_row, page_start_row, s->first_row, s->page.num_rows, max_row, min_row, num_rows, s->num_rows);
}
}
}

Expand Down Expand Up @@ -1256,13 +1283,11 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}

if (s->col.column_data_base != nullptr) {
nesting_info->data_out = static_cast<uint8_t*>(s->col.column_data_base[idx]);
if (s->col.column_string_base != nullptr) {
nesting_info->string_out = static_cast<uint8_t*>(s->col.column_string_base[idx]);
}

nesting_info->data_out = static_cast<uint8_t*>(s->col.column_data_base[idx]);

if (nesting_info->data_out != nullptr) {
// anything below max depth with a valid data pointer must be a list, so the
// element size is the size of the offset type.
Expand All @@ -1277,8 +1302,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}
nesting_info->valid_map = s->col.valid_map_base[idx];
if (nesting_info->valid_map != nullptr) {
nesting_info->valid_map += output_offset >> 5;
nesting_info->valid_map_offset = (int32_t)(output_offset & 0x1f);
nesting_info->valid_map += output_offset >> 5; //is pointer to warp start
nesting_info->valid_map_offset = (int32_t)(output_offset & 0x1f); //is index within warp
}
}
}
Expand Down Expand Up @@ -1357,7 +1382,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_pos = 0;
s->src_pos = 0;

// for flat hierarchies, we can't know how many leaf values to skip unless we do a full
// for non-lists, we can't know how many leaf values to skip unless we do a full
// preprocess of the definition levels (since nulls will have no actual decodable value, there
// is no direct correlation between # of rows and # of decodable values). so we will start
// processing at the beginning of the value stream and disregard any indices that start
Expand All @@ -1371,15 +1396,15 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,

s->row_index_lower_bound = -1;
}
// for nested hierarchies, we have run a preprocess that lets us skip directly to the values
// for lists, we have run a preprocess that lets us skip directly to the values
// we need to start decoding at
else {
// input_row_count translates to "how many rows we have processed so far", so since we are
// skipping directly to where we want to start decoding, set it to first_row
s->input_row_count = s->first_row;

// return the lower bound to compare (page-relative) thread row index against. Explanation:
// In the case of nested schemas, rows can span page boundaries. That is to say,
// In the case of lists, rows can span page boundaries. That is to say,
// we can encounter the first value for row X on page M, but the last value for page M
// might not be the last value for row X. page M+1 (or further) may contain the last value.
//
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page,
return decode_kernel_mask::STRING;
}

if (is_list(chunk)) {
if (is_list(chunk) && !is_string_col(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) {
//if (is_list(chunk)) {
if (page.encoding == Encoding::PLAIN) {
return decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST;
} else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) {
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
// figure out which kernels to run
auto const kernel_mask = GetAggregatedDecodeKernelMask(subpass.pages, _stream);

printf("DECODE DATA PAGE, mask %d\n", kernel_mask);

// Check to see if there are any string columns present. If so, then we need to get size info
// for each string page. This size info will be used to pre-allocate memory for the column,
// allowing the page decoder to write string data directly to the column buffer, rather than
Expand Down Expand Up @@ -223,6 +221,11 @@ printf("DECODE DATA PAGE, mask %d\n", kernel_mask);
int const nkernels = std::bitset<32>(kernel_mask).count();
auto streams = cudf::detail::fork_streams(_stream, nkernels);

static constexpr bool enable_print = false;
if constexpr (enable_print) {
printf("PAGE DATA DECODE MASK: %d\n", kernel_mask);
}

// launch string decoder
int s_idx = 0;
if (BitAnd(kernel_mask, decode_kernel_mask::STRING) != 0) {
Expand Down Expand Up @@ -333,7 +336,6 @@ printf("DECODE DATA PAGE, mask %d\n", kernel_mask);

// launch fixed width type decoder for lists
if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) != 0) {
printf("LIST PAGE\n");
DecodePageDataFixed(subpass.pages,
pass.chunks,
num_rows,
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
// depth.
//

static constexpr bool enable_print = false;

// compute "X" from above
if constexpr (enable_print) {
printf("REMAPPING: max def %d, max rep %d\n", schema.max_definition_level, schema.max_repetition_level);
}
for (int s_idx = schema.max_repetition_level; s_idx >= 0; s_idx--) {
auto find_shallowest = [&](int r) {
int shallowest = -1;
Expand All @@ -148,6 +153,9 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
if (!cur_schema.is_stub()) { cur_depth--; }
schema_idx = cur_schema.parent_idx;
}
if constexpr (enable_print) {
printf("REMAPPING: s_idx / r %d, shallowest %d\n", r, shallowest);
}
return shallowest;
};
rep_depth_remap[s_idx] = find_shallowest(s_idx);
Expand Down Expand Up @@ -186,6 +194,10 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
prev_schema = cur_schema;
schema_idx = cur_schema.parent_idx;
}

if constexpr (enable_print) {
printf("REMAPPING: s_idx %d, r1 %d, end_depth %d\n", s_idx, r1, depth);
}
return depth;
};
def_depth_remap[s_idx] = find_deepest(s_idx);
Expand Down

0 comments on commit 4b5f91a

Please sign in to comment.