Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid decoding long runs in a single thread #16304

Draft
wants to merge 27 commits into
base: branch-24.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
50f8ab8
rle_stream with dictionary support + micro kernels for fixed and fixed
abellina Feb 20, 2024
990a849
load balancing experiment
abellina Feb 29, 2024
46e8294
parquet process exampel
abellina Mar 1, 2024
03f202c
rebase 24.06
gerashegalov May 15, 2024
b11b74f
suggestions 1
gerashegalov May 17, 2024
35b00ee
suggestions
gerashegalov May 17, 2024
1a655fd
Merge remote-tracking branch 'origin/branch-24.06' into gerashegalov/…
gerashegalov May 20, 2024
6b0f067
Merge remote-tracking branch 'origin/branch-24.08' into gerashegalov/…
gerashegalov May 20, 2024
d6cd603
Merge remote-tracking branch 'gerashegalov/gerashegalov/fixed_ukernel…
gerashegalov May 21, 2024
0072f73
Merge remote-tracking branch 'origin/branch-24.08' into gerashegalov/…
gerashegalov May 31, 2024
06b85aa
Merge remote-tracking branch 'upstream/branch-24.08' into gerashegalo…
gerashegalov Jun 4, 2024
00a38f6
Merge remote-tracking branch 'upstream/branch-24.08' into gerashegalo…
gerashegalov Jun 5, 2024
4c9a1ca
Robert Maynard's patch
gerashegalov Jun 6, 2024
bcdf401
Merge remote-tracking branch 'origin/branch-24.08' into gerashegalov/…
gerashegalov Jul 17, 2024
9b9e37b
Merge remote-tracking branch 'origin/branch-24.08' into gerashegalov/…
gerashegalov Jul 18, 2024
2689e3f
Delete process-parquet
gerashegalov Jul 18, 2024
ed14133
Merge remote-tracking branch 'origin/branch-24.08' into gerashegalov/…
gerashegalov Aug 3, 2024
861f9f7
review
gerashegalov Aug 3, 2024
0691b6e
Merge remote-tracking branch 'origin/branch-24.08' into gerashegalov/…
gerashegalov Aug 10, 2024
b331766
Merge remote-tracking branch 'origin/branch-24.10' into gerashegalov/…
gerashegalov Aug 10, 2024
772d652
Merge remote-tracking branch 'origin/branch-24.10' into gerashegalov/…
gerashegalov Sep 5, 2024
ce5b9e3
Merge commit '478406740a500ce74d8cd4b4bea07fd163256796' into gerasheg…
gerashegalov Sep 8, 2024
f1aa5ec
Merge remote-tracking branch 'origin/branch-24.10' into gerashegalov/…
gerashegalov Sep 16, 2024
e5441ee
Merge remote-tracking branch 'origin/branch-24.10' into gerashegalov/…
gerashegalov Sep 21, 2024
90fa87b
GERA_DEBUG log
gerashegalov Sep 21, 2024
edfed2e
DEBUG LOG warp thread batch
gerashegalov Sep 22, 2024
09dd99e
Acommodate for the output offset
gerashegalov Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,15 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t)
}

// if we have no work to do (eg, in a skip_rows/num_rows case) in this page.
//
// corner case: in the case of lists, we can have pages that contain "0" rows if the current row
// starts before this page and ends after this page:
// P0 P1 P2
// |---------|---------|----------|
// ^------------------^
// row start row end
// P1 will contain 0 rows
//
if (s->num_rows == 0) { return; }

DecodeValuesFunc<decode_block_size_t, state_buf_t> decode_values;
Expand Down
42 changes: 42 additions & 0 deletions cpp/src/io/parquet/decode_fixed.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "parquet_gpu.hpp"

namespace cudf {
namespace io {
namespace parquet {
namespace detail {
void DecodePageDataFixed(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
std::size_t num_rows,
std::size_t min_row,
int level_type_size,
rmm::cuda_stream_view stream);

void DecodePageDataFixedDict(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
std::size_t num_rows,
std::size_t min_row,
int level_type_size,
rmm::cuda_stream_view stream);

} // namespace detail
} // namespace parquet
} // namespace io
} // namespace cudf
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

#include "page_decode.cuh"

#include <io/utilities/column_buffer.hpp>

#include <cudf/hashing/detail/murmurhash3_x86_32.cuh>

#include <rmm/exec_policy.hpp>
#include <thrust/reduce.h>

namespace cudf::io::parquet::detail {

/**
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page,
ColumnChunkDesc const& chunk)
{
if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; }

if (page.encoding == Encoding::DELTA_BINARY_PACKED) {
return decode_kernel_mask::DELTA_BINARY;
} else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) {
Expand Down Expand Up @@ -581,7 +580,6 @@ void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block

gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, chunk_pages, num_chunks, error_code);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "reader_impl.hpp"

#include "decode_fixed.hpp"
#include "error.hpp"

#include <cudf/detail/stream_compaction.hpp>
Expand Down
75 changes: 51 additions & 24 deletions cpp/src/io/parquet/rle_stream.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ struct rle_run {
uint8_t const* start;
int level_run; // level_run header value
int remaining; // number of output items remaining to be decoded
int run_offset;
};

// a stream of rle_runs
Expand Down Expand Up @@ -189,6 +190,9 @@ struct rle_stream {
int fill_index;
int decode_index;

int last_run_bytes;
int last_run_remaining;

__device__ rle_stream(rle_run<level_t>* _runs) : runs(_runs) {}

__device__ inline bool is_last_decode_warp(int warp_id)
Expand All @@ -214,6 +218,9 @@ struct rle_stream {
cur_values = 0;
fill_index = 0;
decode_index = -1; // signals the first iteration. Nothing to decode.

last_run_bytes = 0;
last_run_remaining = 0;
}

__device__ inline void fill_run_batch()
Expand All @@ -229,30 +236,48 @@ struct rle_stream {
auto& run = runs[rolling_index<run_buffer_size>(fill_index)];

// Encoding::RLE

// bytes for the varint header
uint8_t const* _cur = cur;
int const level_run = get_vlq32(_cur, end);
// run_bytes includes the header size
int run_bytes = _cur - cur;

// literal run
if (is_literal_run(level_run)) {
// from the parquet spec: literal runs always come in multiples of 8 values.
run.size = (level_run >> 1) * 8;
run_bytes += ((run.size * level_bits) + 7) >> 3;
if (last_run_remaining == 0) {
uint8_t const* _cur = cur;
int const level_run = get_vlq32(_cur, end);
// run_bytes includes the header size
int run_bytes = _cur - cur;

// literal run
if (is_literal_run(level_run)) {
// from the parquet spec: literal runs always come in multiples of 8 values.
run.size = (level_run >> 1) * 8;
run_bytes += ((run.size * level_bits) + 7) >> 3;
}
// repeated value run
else {
run.size = (level_run >> 1);
run_bytes += ((level_bits) + 7) >> 3;
}
last_run_bytes = run_bytes;
last_run_remaining = run.size;
int const batch = min(96, last_run_remaining);
run.output_pos = output_pos;
run.start = _cur;
run.level_run = level_run;
run.remaining = batch;
run.run_offset = run.size - last_run_remaining;
last_run_remaining -= batch;
} else {
auto& prior = runs[rolling_index<run_buffer_size>(fill_index - 1)];
int const batch = min(96, last_run_remaining);
run.output_pos = prior.output_pos;
run.start = prior.start;
run.level_run = prior.level_run;
run.remaining = batch;
run.size = prior.size;
run.run_offset = run.size - last_run_remaining;
last_run_remaining -= batch;
}
// repeated value run
else {
run.size = (level_run >> 1);
run_bytes += ((level_bits) + 7) >> 3;
if (last_run_remaining == 0) {
cur += last_run_bytes;
output_pos += run.size;
}
run.output_pos = output_pos;
run.start = _cur;
run.level_run = level_run;
run.remaining = run.size;
cur += run_bytes;
output_pos += run.size;
fill_index++;
}
}
Expand Down Expand Up @@ -300,7 +325,7 @@ struct rle_stream {
if (warp_id == 0) {
// fill the next set of runs. fill_runs will generally be the bottleneck for any
// kernel that uses an rle_stream.
if (warp_lane == 0) {
if (!warp_lane) {
fill_run_batch();
if (decode_index == -1) {
// first time, set it to the beginning of the buffer (rolled)
Expand All @@ -322,9 +347,9 @@ struct rle_stream {
int const max_count = cur_values + output_count;
// run.output_pos is absolute position, we start decoding
// if it's supposed to fit in this call to `decode_next`.
if (max_count > run.output_pos) {
if (max_count > run.output_pos + run.run_offset) {
int remaining = run.remaining;
int const run_offset = run.size - remaining;
int run_offset = run.run_offset;
// last_run_pos is the absolute position of the run, including
// what was decoded last time.
int const last_run_pos = run.output_pos + run_offset;
Expand Down Expand Up @@ -358,6 +383,8 @@ struct rle_stream {
decode_index_shared = run_index + 1;
}
run.remaining = remaining;
run_offset += batch_len;
run.run_offset = run_offset;
}
}
}
Expand Down
Loading