Skip to content

Commit

Permalink
Merge pull request #24147 from rockwotj/parquet-writer
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Nov 16, 2024
2 parents 303a7a2 + cab8e0f commit 628c86c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/v/serde/parquet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ redpanda_cc_library(
":schema",
":value",
"//src/v/base",
"//src/v/compression",
"//src/v/hashing:crc32",
"@seastar",
],
Expand Down
1 change: 1 addition & 0 deletions src/v/serde/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ v_cc_library(
Seastar::seastar
v::bytes
v::container
v::compression
v::utils
v::serde_thrift
)
78 changes: 47 additions & 31 deletions src/v/serde/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "serde/parquet/column_writer.h"

#include "compression/compression.h"
#include "hashing/crc32.h"
#include "serde/parquet/encoding.h"

Expand All @@ -22,6 +23,8 @@

namespace serde::parquet {

using options = column_writer::options;

class column_writer::impl {
public:
impl() = default;
Expand All @@ -32,7 +35,7 @@ class column_writer::impl {
virtual ~impl() noexcept = default;

virtual void add(value, rep_level, def_level) = 0;
virtual data_page flush_page() = 0;
virtual ss::future<data_page> flush_page() = 0;
};

namespace {
Expand All @@ -53,9 +56,11 @@ crc::crc32 compute_crc32(Args&&... args) {
template<typename value_type>
class buffered_column_writer final : public column_writer::impl {
public:
buffered_column_writer(def_level max_def_level, rep_level max_rep_level)
buffered_column_writer(
def_level max_def_level, rep_level max_rep_level, options opts)
: _max_rep_level(max_rep_level)
, _max_def_level(max_def_level) {}
, _max_def_level(max_def_level)
, _opts(opts) {}

void add(value val, rep_level rl, def_level dl) override {
++_num_values;
Expand All @@ -80,7 +85,7 @@ class buffered_column_writer final : public column_writer::impl {
_def_levels.push_back(dl);
}

data_page flush_page() override {
ss::future<data_page> flush_page() override {
iobuf encoded_def_levels;
// If the max level is 0 then we don't write levels at all.
if (_max_def_level > def_level(0)) {
Expand All @@ -100,16 +105,23 @@ class buffered_column_writer final : public column_writer::impl {
} else {
encoded_data = encode_plain(std::exchange(_value_buffer, {}));
}
size_t page_size = encoded_def_levels.size_bytes()
+ encoded_rep_levels.size_bytes()
+ encoded_data.size_bytes();
if (page_size > std::numeric_limits<int32_t>::max()) {
throw std::runtime_error(
fmt::format("page size limit exceeded: {} bytes", page_size));
size_t uncompressed_page_size = encoded_def_levels.size_bytes()
+ encoded_rep_levels.size_bytes()
+ encoded_data.size_bytes();
if (uncompressed_page_size > std::numeric_limits<int32_t>::max()) {
throw std::runtime_error(fmt::format(
"page size limit exceeded: {} bytes", uncompressed_page_size));
}
if (_opts.compress) {
encoded_data = co_await compression::stream_compressor::compress(
std::move(encoded_data), compression::type::zstd);
}
size_t compressed_page_size = encoded_def_levels.size_bytes()
+ encoded_rep_levels.size_bytes()
+ encoded_data.size_bytes();
page_header header{
.uncompressed_page_size = static_cast<int32_t>(page_size),
.compressed_page_size = static_cast<int32_t>(page_size),
.uncompressed_page_size = static_cast<int32_t>(uncompressed_page_size),
.compressed_page_size = static_cast<int32_t>(compressed_page_size),
.crc = compute_crc32(encoded_rep_levels, encoded_def_levels, encoded_data),
.type = data_page_header{
.num_values = std::exchange(_num_values, 0),
Expand All @@ -118,15 +130,15 @@ class buffered_column_writer final : public column_writer::impl {
.data_encoding = encoding::plain,
.definition_levels_byte_length = static_cast<int32_t>(encoded_def_levels.size_bytes()),
.repetition_levels_byte_length = static_cast<int32_t>(encoded_rep_levels.size_bytes()),
.is_compressed = false,
.is_compressed = _opts.compress,
},
};
iobuf full_page_data = encode(header);
auto header_size = static_cast<int64_t>(full_page_data.size_bytes());
full_page_data.append(std::move(encoded_rep_levels));
full_page_data.append(std::move(encoded_def_levels));
full_page_data.append(std::move(encoded_data));
return {
co_return data_page{
.header = header,
.serialized_header_size = header_size,
.serialized = std::move(full_page_data),
Expand All @@ -143,6 +155,7 @@ class buffered_column_writer final : public column_writer::impl {
int32_t _num_values = 0;
rep_level _max_rep_level;
def_level _max_def_level;
options _opts;
};

template class buffered_column_writer<boolean_value>;
Expand All @@ -154,49 +167,50 @@ template class buffered_column_writer<byte_array_value>;
template class buffered_column_writer<fixed_byte_array_value>;

std::unique_ptr<column_writer::impl>
make_impl(const schema_element&, std::monostate) {
make_impl(const schema_element&, std::monostate, options) {
throw std::runtime_error("invariant error: cannot make a column writer "
"from an intermediate value");
}
std::unique_ptr<column_writer::impl>
make_impl(const schema_element& e, bool_type) {
make_impl(const schema_element& e, bool_type, options opts) {
return std::make_unique<buffered_column_writer<boolean_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}
std::unique_ptr<column_writer::impl>
make_impl(const schema_element& e, i32_type) {
make_impl(const schema_element& e, i32_type, options opts) {
return std::make_unique<buffered_column_writer<int32_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}
std::unique_ptr<column_writer::impl>
make_impl(const schema_element& e, i64_type) {
make_impl(const schema_element& e, i64_type, options opts) {
return std::make_unique<buffered_column_writer<int64_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}
std::unique_ptr<column_writer::impl>
make_impl(const schema_element& e, f32_type) {
make_impl(const schema_element& e, f32_type, options opts) {
return std::make_unique<buffered_column_writer<float32_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}
std::unique_ptr<column_writer::impl>
make_impl(const schema_element& e, f64_type) {
make_impl(const schema_element& e, f64_type, options opts) {
return std::make_unique<buffered_column_writer<float64_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}
std::unique_ptr<column_writer::impl>
make_impl(const schema_element& e, byte_array_type t) {
make_impl(const schema_element& e, byte_array_type t, options opts) {
if (t.fixed_length.has_value()) {
return std::make_unique<buffered_column_writer<fixed_byte_array_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}
return std::make_unique<buffered_column_writer<byte_array_value>>(
e.max_definition_level, e.max_repetition_level);
e.max_definition_level, e.max_repetition_level, opts);
}

} // namespace

column_writer::column_writer(const schema_element& col)
: _impl(std::visit([&col](auto x) { return make_impl(col, x); }, col.type)) {}
column_writer::column_writer(const schema_element& col, options opts)
: _impl(std::visit(
[&col, opts](auto x) { return make_impl(col, x, opts); }, col.type)) {}

column_writer::column_writer(column_writer&&) noexcept = default;
column_writer& column_writer::operator=(column_writer&&) noexcept = default;
Expand All @@ -206,6 +220,8 @@ void column_writer::add(value val, rep_level rep_level, def_level def_level) {
return _impl->add(std::move(val), rep_level, def_level);
}

data_page column_writer::flush_page() { return _impl->flush_page(); }
ss::future<data_page> column_writer::flush_page() {
return _impl->flush_page();
}

} // namespace serde::parquet
10 changes: 8 additions & 2 deletions src/v/serde/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ class column_writer {
public:
class impl;

explicit column_writer(const schema_element&);
// Options for changing how a column writer behaves.
struct options {
// If true, use zstd compression for the column data.
bool compress = false;
};

explicit column_writer(const schema_element&, options);
column_writer(const column_writer&) = delete;
column_writer& operator=(const column_writer&) = delete;
column_writer(column_writer&&) noexcept;
Expand All @@ -53,7 +59,7 @@ class column_writer {
// Flush the currently buffered values to a page.
//
// This also resets the writer to be able to start writing a new page.
data_page flush_page();
ss::future<data_page> flush_page();

private:
std::unique_ptr<impl> _impl;
Expand Down
1 change: 1 addition & 0 deletions src/v/serde/parquet/tests/generate_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ ss::future<iobuf> serialize_testcase(size_t test_case) {
{
.schema = all_types_schema(),
.metadata = {{"foo", "bar"}},
.compress = test_case % 2 == 0,
},
make_iobuf_ref_output_stream(file));
co_await w.init();
Expand Down
10 changes: 7 additions & 3 deletions src/v/serde/parquet/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ class writer::impl {
element.position,
column{
.leaf = &element,
.writer = column_writer(element),
.writer = column_writer(
element,
{
.compress = _opts.compress,
}),
});
});
// write the leading magic bytes
Expand Down Expand Up @@ -110,7 +114,7 @@ class writer::impl {
row_group rg{};
rg.file_offset = static_cast<int64_t>(_offset);
for (auto& [pos, col] : _columns) {
auto page = col.writer.flush_page();
auto page = co_await col.writer.flush_page();
const auto& data_header = std::get<data_page_header>(
page.header.type);
rg.num_rows = data_header.num_rows;
Expand All @@ -121,7 +125,7 @@ class writer::impl {
.type = col.leaf->type,
.encodings = {data_header.data_encoding},
.path_in_schema = path_in_schema(*col.leaf),
.codec = compression_codec::uncompressed,
.codec = _opts.compress ? compression_codec::zstd : compression_codec::uncompressed,
.num_values = data_header.num_values,
.total_uncompressed_size = page.header.uncompressed_page_size + page.serialized_header_size,
.total_compressed_size = page.header.compressed_page_size + page.serialized_header_size,
Expand Down
4 changes: 3 additions & 1 deletion src/v/serde/parquet/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class writer {
// Information used when filling out the `created_by` metadata.
ss::sstring version = "latest";
ss::sstring build = "dev";
// TODO(parquet): add settings around buffer settings, compression, etc.
// If true, compress the parquet column chunks using zstd compression
bool compress = false;
// TODO(parquet): add settings around buffer settings, etc.
};

// Create a new parquet file writer using the given options that
Expand Down

0 comments on commit 628c86c

Please sign in to comment.