Skip to content

Commit

Permalink
Merge branch 'branch-24.08' into arrow-schema-support-pq-writer
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 authored Jul 9, 2024
2 parents 1ceca42 + 7cc01be commit db54c0b
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 31 deletions.
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {

// # of threads we're decoding with
Expand Down Expand Up @@ -163,7 +165,8 @@ __device__ size_type gpuDecodeTotalPageStringSize(page_state_s* s, int t)
// For V1, the choice is an overestimate (s->dict_size), or an exact number that's
// expensive to compute. For now we're going with the latter.
else {
str_len = gpuInitStringDescriptors<true, unused_state_buf>(s, nullptr, target_pos, t);
str_len = gpuInitStringDescriptors<true, unused_state_buf>(
s, nullptr, target_pos, cg::this_thread_block());
}
break;

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {

constexpr int decode_block_size = 128;
Expand Down Expand Up @@ -277,6 +279,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();
auto const tile_warp = cg::tiled_partition<cudf::detail::warp_size>(cg::this_thread_block());
if (t < 32) {
// decode repetition and definition levels.
// - update validity vectors
Expand All @@ -298,9 +301,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f);
} else if (s->col.physical_type == BYTE_ARRAY or
s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, t & 0x1f);
gpuInitStringDescriptors<false>(s, sb, src_target_pos, tile_warp);
}
if (t == 32) { s->dict_pos = src_target_pos; }
if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; }
} else {
// WARP1..WARP3: Decode values
int const dtype = s->col.physical_type;
Expand Down
69 changes: 43 additions & 26 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "parquet_gpu.hpp"
#include "rle_stream.cuh"

#include <cooperative_groups.h>
#include <cuda/atomic>
#include <cuda/std/tuple>

Expand Down Expand Up @@ -420,46 +421,62 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t
* @param[in,out] s Page state input/output
* @param[out] sb Page state buffer output
* @param[in] target_pos Target output position
* @param[in] t Thread ID
* @param[in] g Cooperative group (thread block or tile)
* @tparam sizes_only True if only sizes are to be calculated
* @tparam state_buf Typename of the `state_buf` (usually inferred)
* @tparam thread_group Typename of the cooperative group (inferred)
*
* @return Total length of strings processed
*/
template <bool sizes_only, typename state_buf>
__device__ size_type
gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int target_pos, int t)
template <bool sizes_only, typename state_buf, typename thread_group>
__device__ size_type gpuInitStringDescriptors(page_state_s* s,
[[maybe_unused]] state_buf* sb,
int target_pos,
thread_group const& g)
{
int pos = s->dict_pos;
int total_len = 0;
int const t = g.thread_rank();
int const dict_size = s->dict_size;
int k = s->dict_val;
int pos = s->dict_pos;
int total_len = 0;

// All group threads can participate for fixed len byte arrays.
if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
int const dtype_len_in = s->dtype_len_in;
total_len = min((target_pos - pos) * dtype_len_in, dict_size - s->dict_val);
if constexpr (!sizes_only) {
for (pos += t, k += t * dtype_len_in; pos < target_pos; pos += g.size()) {
sb->str_len[rolling_index<state_buf::str_buf_size>(pos)] =
(k < dict_size) ? dtype_len_in : 0;
// dict_idx is upperbounded by dict_size.
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
// Increment k if needed.
if (k < dict_size) { k = min(k + (g.size() * dtype_len_in), dict_size); }
}
}
// Only thread_rank = 0 updates the s->dict_val
if (!t) { s->dict_val += total_len; }
}
// This step is purely serial for byte arrays
else {
if (!t) {
uint8_t const* cur = s->data_start;

// This step is purely serial
if (!t) {
uint8_t const* cur = s->data_start;
int dict_size = s->dict_size;
int k = s->dict_val;

while (pos < target_pos) {
int len = 0;
if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
if (k < dict_size) { len = s->dtype_len_in; }
} else {
for (int len = 0; pos < target_pos; pos++, len = 0) {
if (k + 4 <= dict_size) {
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
k += 4;
if (k + len > dict_size) { len = 0; }
}
if constexpr (!sizes_only) {
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
sb->str_len[rolling_index<state_buf::str_buf_size>(pos)] = len;
}
k += len;
total_len += len;
}
if constexpr (!sizes_only) {
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
sb->str_len[rolling_index<state_buf::str_buf_size>(pos)] = len;
}
k += len;
total_len += len;
pos++;
s->dict_val = k;
}
s->dict_val = k;
__threadfence_block();
}

return total_len;
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {

constexpr int preprocess_block_size = 512;
Expand Down Expand Up @@ -1006,6 +1008,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
// this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it
__syncthreads();

// Create a warp sized thread block tile
auto const tile_warp = cg::tiled_partition<cudf::detail::warp_size>(cg::this_thread_block());

if (t < 32) {
// decode repetition and definition levels.
// - update validity vectors
Expand All @@ -1020,9 +1026,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
if (s->dict_base) {
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, lane_id).first;
} else {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, lane_id);
gpuInitStringDescriptors<false>(s, sb, src_target_pos, tile_warp);
}
if (t == 32) { s->dict_pos = src_target_pos; }
if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; }
} else {
int const me = t - out_thread0;

Expand Down
6 changes: 6 additions & 0 deletions python/cudf/cudf/pandas/_wrappers/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,12 @@ def Index__new__(cls, *args, **kwargs):

_eval_func = _FunctionProxy(_Unusable(), pd.eval)

register_proxy_func(pd.read_pickle)(
_FunctionProxy(_Unusable(), pd.read_pickle)
)

register_proxy_func(pd.to_pickle)(_FunctionProxy(_Unusable(), pd.to_pickle))


def _get_eval_locals_and_globals(level, local_dict=None, global_dict=None):
frame = sys._getframe(level + 3)
Expand Down
7 changes: 7 additions & 0 deletions python/cudf/cudf_pandas_tests/test_cudf_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,13 @@ def test_pickle(obj):

tm.assert_equal(obj, copy)

with tempfile.TemporaryFile() as f:
xpd.to_pickle(obj, f)
f.seek(0)
copy = xpd.read_pickle(f)

tm.assert_equal(obj, copy)


def test_dataframe_query():
cudf_pandas_df = xpd.DataFrame({"foo": [1, 2, 3], "bar": [4, 5, 6]})
Expand Down

0 comments on commit db54c0b

Please sign in to comment.