Skip to content

Commit

Permalink
Merge pull request #24228 from rockwotj/flush
Browse files Browse the repository at this point in the history
serde/parquet: support creating multiple row groups
  • Loading branch information
rockwotj authored Nov 21, 2024
2 parents fc191eb + 7bccba6 commit d0a03e6
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 43 deletions.
30 changes: 26 additions & 4 deletions src/v/serde/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class column_writer::impl {
impl& operator=(impl&&) noexcept = default;
virtual ~impl() noexcept = default;

virtual void add(value, rep_level, def_level) = 0;
virtual incremental_column_stats add(value, rep_level, def_level) = 0;
virtual ss::future<data_page> flush_page() = 0;
};

Expand Down Expand Up @@ -62,16 +62,27 @@ class buffered_column_writer final : public column_writer::impl {
, _max_def_level(max_def_level)
, _opts(opts) {}

void add(value val, rep_level rl, def_level dl) override {
incremental_column_stats
add(value val, rep_level rl, def_level dl) override {
++_num_values;
// A repetition level of zero means that it's the start of a new row and
// not a repeated value within the same row.
if (rl == rep_level(0)) {
++_num_rows;
}

uint64_t value_memory_usage = 0;

ss::visit(
std::move(val),
[this](value_type& v) { _value_buffer.push_back(std::move(v)); },
[this, &value_memory_usage](value_type& v) {
if constexpr (!std::is_trivially_copyable_v<value_type>) {
value_memory_usage = v.val.size_bytes();
} else {
value_memory_usage = sizeof(value_type);
}
_value_buffer.push_back(std::move(v));
},
[this](null_value&) {
// null values are valid, but are not encoded in the actual data,
// they are encoded in the defintion levels.
Expand All @@ -83,6 +94,16 @@ class buffered_column_writer final : public column_writer::impl {
});
_rep_levels.push_back(rl);
_def_levels.push_back(dl);

// NOTE: This does not account for the underlying buffer memory
// but we don't want account for the capacity here, ideally we
// always use the full capacity in our value buffer, and eagerly
// accounting that usage might cause callers to overagressively
// flush pages/row groups.
return {
.memory_usage = value_memory_usage + sizeof(rep_level)
+ sizeof(def_level),
};
}

ss::future<data_page> flush_page() override {
Expand Down Expand Up @@ -216,7 +237,8 @@ column_writer::column_writer(column_writer&&) noexcept = default;
column_writer& column_writer::operator=(column_writer&&) noexcept = default;
column_writer::~column_writer() noexcept = default;

void column_writer::add(value val, rep_level rep_level, def_level def_level) {
incremental_column_stats
column_writer::add(value val, rep_level rep_level, def_level def_level) {
return _impl->add(std::move(val), rep_level, def_level);
}

Expand Down
9 changes: 8 additions & 1 deletion src/v/serde/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ struct data_page {
iobuf serialized;
};

// The delta in stats when a value is written to a column.
//
// This is used to account memory usage at the row group level.
struct incremental_column_stats {
uint64_t memory_usage;
};

// A writer for a single column of parquet data.
class column_writer {
public:
Expand All @@ -54,7 +61,7 @@ class column_writer {
// nodes.
//
// Use `shred_record` to get the value and levels from an arbitrary value.
void add(value, rep_level, def_level);
incremental_column_stats add(value, rep_level, def_level);

// Flush the currently buffered values to a page.
//
Expand Down
4 changes: 4 additions & 0 deletions src/v/serde/parquet/tests/generate_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ ss::future<iobuf> serialize_testcase(size_t test_case) {
auto v = generate_value(schema);
rows.push_back(copy(v));
co_await w.write_row(std::get<group_value>(std::move(v)));
// Create multiple row groups and make sure it works
if (i % 32 == 0) {
co_await w.flush_row_group();
}
}
co_await w.close();
co_return json(testcase{
Expand Down
87 changes: 51 additions & 36 deletions src/v/serde/parquet/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,56 @@ class writer::impl {
_opts.schema, std::move(row), [this](shredded_value sv) {
return write_value(std::move(sv));
});
// TODO: periodically flush the row_group if we're using enough memory
// or at least flush the pages if we don't want to create too many row
// groups.
++_current_row_group_stats.rows;
}

row_group_stats current_row_group_stats() const {
return _current_row_group_stats;
}

ss::future<> flush_row_group() {
if (_current_row_group_stats.rows == 0) {
co_return;
}
row_group rg{};
rg.file_offset = static_cast<int64_t>(_offset);
for (auto& [pos, col] : _columns) {
auto page = co_await col.writer.flush_page();
const auto& data_header = std::get<data_page_header>(
page.header.type);
rg.num_rows = data_header.num_rows;
auto page_size = static_cast<int64_t>(page.serialized.size_bytes());
rg.total_byte_size += page_size;
rg.columns.push_back(column_chunk{
.meta_data = column_meta_data{
.type = col.leaf->type,
.encodings = {data_header.data_encoding},
.path_in_schema = path_in_schema(*col.leaf),
.codec = _opts.compress ? compression_codec::zstd : compression_codec::uncompressed,
.num_values = data_header.num_values,
.total_uncompressed_size = page.header.uncompressed_page_size + page.serialized_header_size,
.total_compressed_size = page.header.compressed_page_size + page.serialized_header_size,
.key_value_metadata = {},
.data_page_offset = static_cast<int64_t>(_offset),
},
});
co_await write_iobuf(std::move(page.serialized));
}
_current_row_group_stats = {};
_row_groups.push_back(std::move(rg));
}

ss::future<> close() {
chunked_vector<row_group> row_groups;
row_groups.push_back(co_await flush_row_group());
co_await flush_row_group();
int64_t num_rows = 0;
for (const auto& rg : row_groups) {
for (const auto& rg : _row_groups) {
num_rows += rg.num_rows;
}
auto encoded_footer = encode(file_metadata{
.version = 2,
.schema = flatten(_opts.schema),
.num_rows = num_rows,
.row_groups = std::move(row_groups),
.row_groups = std::move(_row_groups),
.key_value_metadata = std::move(_opts.metadata),
.created_by = fmt::format(
"Redpanda version {} (build {})", _opts.version, _opts.build),
Expand All @@ -106,38 +139,12 @@ class writer::impl {

ss::future<> write_value(shredded_value sv) {
auto& col = _columns.at(sv.schema_element_position);
col.writer.add(std::move(sv.val), sv.rep_level, sv.def_level);
auto stats = col.writer.add(
std::move(sv.val), sv.rep_level, sv.def_level);
_current_row_group_stats.memory_usage += stats.memory_usage;
return ss::now();
}

ss::future<row_group> flush_row_group() {
row_group rg{};
rg.file_offset = static_cast<int64_t>(_offset);
for (auto& [pos, col] : _columns) {
auto page = co_await col.writer.flush_page();
const auto& data_header = std::get<data_page_header>(
page.header.type);
rg.num_rows = data_header.num_rows;
auto page_size = static_cast<int64_t>(page.serialized.size_bytes());
rg.total_byte_size += page_size;
rg.columns.push_back(column_chunk{
.meta_data = column_meta_data{
.type = col.leaf->type,
.encodings = {data_header.data_encoding},
.path_in_schema = path_in_schema(*col.leaf),
.codec = _opts.compress ? compression_codec::zstd : compression_codec::uncompressed,
.num_values = data_header.num_values,
.total_uncompressed_size = page.header.uncompressed_page_size + page.serialized_header_size,
.total_compressed_size = page.header.compressed_page_size + page.serialized_header_size,
.key_value_metadata = {},
.data_page_offset = static_cast<int64_t>(_offset),
},
});
co_await write_iobuf(std::move(page.serialized));
}
co_return rg;
}

ss::future<> write_iobuf(iobuf b) {
_offset += b.size_bytes();
co_await write_iobuf_to_output_stream(std::move(b), _output);
Expand All @@ -152,6 +159,8 @@ class writer::impl {
ss::output_stream<char> _output;
size_t _offset = 0; // offset written to the stream
contiguous_range_map<int32_t, column> _columns;
chunked_vector<row_group> _row_groups;
row_group_stats _current_row_group_stats;
};

writer::writer(options opts, ss::output_stream<char> output)
Expand All @@ -167,6 +176,12 @@ ss::future<> writer::write_row(group_value row) {
return _impl->write_row(std::move(row));
}

row_group_stats writer::current_row_group_stats() const {
return _impl->current_row_group_stats();
}

ss::future<> writer::flush_row_group() { return _impl->flush_row_group(); }

ss::future<> writer::close() { return _impl->close(); }

} // namespace serde::parquet
33 changes: 31 additions & 2 deletions src/v/serde/parquet/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

namespace serde::parquet {

// Statistics about the current row group.
//
// These are mainly provided to limit memory usage.
struct row_group_stats {
uint64_t rows = 0;
uint64_t memory_usage = 0;
};

// A parquet file writer for seastar.
class writer {
public:
Expand Down Expand Up @@ -54,13 +62,34 @@ class writer {
// Write the record as a row to the parquet file. This value must exactly
// match the provided schema.
//
// The returned future must be awaited *before* calling write_row again with
// another value.
// This method may not be called concurrently with other methods on this
// class.
ss::future<> write_row(group_value);

// The current stats on the buffered row group.
//
// This can be used to account for memory usage and flush a row group
// when the memory usage is over some limit.
row_group_stats current_row_group_stats() const;

// Flush the current row group to the output stream, creating a new row
// group.
//
// This may only be called if there is at least a single row in the group.
//
// This method may not be called concurrently with other methods on this
// class.
ss::future<> flush_row_group();

// Close the writer by writing the parquet file footer. Then flush/close the
// underlying stream that is being written too.
//
// This method will automatically flush the current row group if there are
// any rows buffered in it.
//
// This method may not be called concurrently with other methods on this
// class.
//
// The resulting future must be awaited before destroying this object.
ss::future<> close();

Expand Down

0 comments on commit d0a03e6

Please sign in to comment.