diff --git a/src/v/serde/parquet/column_writer.cc b/src/v/serde/parquet/column_writer.cc index 15af3e7170708..5d4471ef46f7a 100644 --- a/src/v/serde/parquet/column_writer.cc +++ b/src/v/serde/parquet/column_writer.cc @@ -34,7 +34,7 @@ class column_writer::impl { impl& operator=(impl&&) noexcept = default; virtual ~impl() noexcept = default; - virtual void add(value, rep_level, def_level) = 0; + virtual incremental_column_stats add(value, rep_level, def_level) = 0; virtual ss::future flush_page() = 0; }; @@ -62,16 +62,27 @@ class buffered_column_writer final : public column_writer::impl { , _max_def_level(max_def_level) , _opts(opts) {} - void add(value val, rep_level rl, def_level dl) override { + incremental_column_stats + add(value val, rep_level rl, def_level dl) override { ++_num_values; // A repetition level of zero means that it's the start of a new row and // not a repeated value within the same row. if (rl == rep_level(0)) { ++_num_rows; } + + uint64_t value_memory_usage = 0; + ss::visit( std::move(val), - [this](value_type& v) { _value_buffer.push_back(std::move(v)); }, + [this, &value_memory_usage](value_type& v) { + if constexpr (!std::is_trivially_copyable_v) { + value_memory_usage = v.val.size_bytes(); + } else { + value_memory_usage = sizeof(value_type); + } + _value_buffer.push_back(std::move(v)); + }, [this](null_value&) { // null values are valid, but are not encoded in the actual data, // they are encoded in the defintion levels. @@ -83,6 +94,16 @@ class buffered_column_writer final : public column_writer::impl { }); _rep_levels.push_back(rl); _def_levels.push_back(dl); + + // NOTE: This does not account for the underlying buffer memory + // but we don't want account for the capacity here, ideally we + // always use the full capacity in our value buffer, and eagerly + // accounting that usage might cause callers to overagressively + // flush pages/row groups. + return { + .memory_usage = value_memory_usage + sizeof(rep_level) + + sizeof(def_level), + }; } ss::future flush_page() override { @@ -216,7 +237,8 @@ column_writer::column_writer(column_writer&&) noexcept = default; column_writer& column_writer::operator=(column_writer&&) noexcept = default; column_writer::~column_writer() noexcept = default; -void column_writer::add(value val, rep_level rep_level, def_level def_level) { +incremental_column_stats +column_writer::add(value val, rep_level rep_level, def_level def_level) { return _impl->add(std::move(val), rep_level, def_level); } diff --git a/src/v/serde/parquet/column_writer.h b/src/v/serde/parquet/column_writer.h index d17e55c6404a1..3b52af19fc5bd 100644 --- a/src/v/serde/parquet/column_writer.h +++ b/src/v/serde/parquet/column_writer.h @@ -28,6 +28,13 @@ struct data_page { iobuf serialized; }; +// The delta in stats when a value is written to a column. +// +// This is used to account memory usage at the row group level. +struct incremental_column_stats { + uint64_t memory_usage; +}; + // A writer for a single column of parquet data. class column_writer { public: @@ -54,7 +61,7 @@ class column_writer { // nodes. // // Use `shred_record` to get the value and levels from an arbitrary value. - void add(value, rep_level, def_level); + incremental_column_stats add(value, rep_level, def_level); // Flush the currently buffered values to a page. // diff --git a/src/v/serde/parquet/tests/generate_file.cc b/src/v/serde/parquet/tests/generate_file.cc index 7c699cfa0a1d7..bc531b6e337cd 100644 --- a/src/v/serde/parquet/tests/generate_file.cc +++ b/src/v/serde/parquet/tests/generate_file.cc @@ -350,6 +350,10 @@ ss::future serialize_testcase(size_t test_case) { auto v = generate_value(schema); rows.push_back(copy(v)); co_await w.write_row(std::get(std::move(v))); + // Create multiple row groups and make sure it works + if (i % 32 == 0) { + co_await w.flush_row_group(); + } } co_await w.close(); co_return json(testcase{ diff --git a/src/v/serde/parquet/writer.cc b/src/v/serde/parquet/writer.cc index cb92a801193d1..8d9fa3eb2232e 100644 --- a/src/v/serde/parquet/writer.cc +++ b/src/v/serde/parquet/writer.cc @@ -67,23 +67,56 @@ class writer::impl { _opts.schema, std::move(row), [this](shredded_value sv) { return write_value(std::move(sv)); }); - // TODO: periodically flush the row_group if we're using enough memory - // or at least flush the pages if we don't want to create too many row - // groups. + ++_current_row_group_stats.rows; + } + + row_group_stats current_row_group_stats() const { + return _current_row_group_stats; + } + + ss::future<> flush_row_group() { + if (_current_row_group_stats.rows == 0) { + co_return; + } + row_group rg{}; + rg.file_offset = static_cast(_offset); + for (auto& [pos, col] : _columns) { + auto page = co_await col.writer.flush_page(); + const auto& data_header = std::get( + page.header.type); + rg.num_rows = data_header.num_rows; + auto page_size = static_cast(page.serialized.size_bytes()); + rg.total_byte_size += page_size; + rg.columns.push_back(column_chunk{ + .meta_data = column_meta_data{ + .type = col.leaf->type, + .encodings = {data_header.data_encoding}, + .path_in_schema = path_in_schema(*col.leaf), + .codec = _opts.compress ? compression_codec::zstd : compression_codec::uncompressed, + .num_values = data_header.num_values, + .total_uncompressed_size = page.header.uncompressed_page_size + page.serialized_header_size, + .total_compressed_size = page.header.compressed_page_size + page.serialized_header_size, + .key_value_metadata = {}, + .data_page_offset = static_cast(_offset), + }, + }); + co_await write_iobuf(std::move(page.serialized)); + } + _current_row_group_stats = {}; + _row_groups.push_back(std::move(rg)); } ss::future<> close() { - chunked_vector row_groups; - row_groups.push_back(co_await flush_row_group()); + co_await flush_row_group(); int64_t num_rows = 0; - for (const auto& rg : row_groups) { + for (const auto& rg : _row_groups) { num_rows += rg.num_rows; } auto encoded_footer = encode(file_metadata{ .version = 2, .schema = flatten(_opts.schema), .num_rows = num_rows, - .row_groups = std::move(row_groups), + .row_groups = std::move(_row_groups), .key_value_metadata = std::move(_opts.metadata), .created_by = fmt::format( "Redpanda version {} (build {})", _opts.version, _opts.build), @@ -106,38 +139,12 @@ class writer::impl { ss::future<> write_value(shredded_value sv) { auto& col = _columns.at(sv.schema_element_position); - col.writer.add(std::move(sv.val), sv.rep_level, sv.def_level); + auto stats = col.writer.add( + std::move(sv.val), sv.rep_level, sv.def_level); + _current_row_group_stats.memory_usage += stats.memory_usage; return ss::now(); } - ss::future flush_row_group() { - row_group rg{}; - rg.file_offset = static_cast(_offset); - for (auto& [pos, col] : _columns) { - auto page = co_await col.writer.flush_page(); - const auto& data_header = std::get( - page.header.type); - rg.num_rows = data_header.num_rows; - auto page_size = static_cast(page.serialized.size_bytes()); - rg.total_byte_size += page_size; - rg.columns.push_back(column_chunk{ - .meta_data = column_meta_data{ - .type = col.leaf->type, - .encodings = {data_header.data_encoding}, - .path_in_schema = path_in_schema(*col.leaf), - .codec = _opts.compress ? compression_codec::zstd : compression_codec::uncompressed, - .num_values = data_header.num_values, - .total_uncompressed_size = page.header.uncompressed_page_size + page.serialized_header_size, - .total_compressed_size = page.header.compressed_page_size + page.serialized_header_size, - .key_value_metadata = {}, - .data_page_offset = static_cast(_offset), - }, - }); - co_await write_iobuf(std::move(page.serialized)); - } - co_return rg; - } - ss::future<> write_iobuf(iobuf b) { _offset += b.size_bytes(); co_await write_iobuf_to_output_stream(std::move(b), _output); @@ -152,6 +159,8 @@ class writer::impl { ss::output_stream _output; size_t _offset = 0; // offset written to the stream contiguous_range_map _columns; + chunked_vector _row_groups; + row_group_stats _current_row_group_stats; }; writer::writer(options opts, ss::output_stream output) @@ -167,6 +176,12 @@ ss::future<> writer::write_row(group_value row) { return _impl->write_row(std::move(row)); } +row_group_stats writer::current_row_group_stats() const { + return _impl->current_row_group_stats(); +} + +ss::future<> writer::flush_row_group() { return _impl->flush_row_group(); } + ss::future<> writer::close() { return _impl->close(); } } // namespace serde::parquet diff --git a/src/v/serde/parquet/writer.h b/src/v/serde/parquet/writer.h index f529ee75b4b89..cbdfd8c52ea35 100644 --- a/src/v/serde/parquet/writer.h +++ b/src/v/serde/parquet/writer.h @@ -19,6 +19,14 @@ namespace serde::parquet { +// Statistics about the current row group. +// +// These are mainly provided to limit memory usage. +struct row_group_stats { + uint64_t rows = 0; + uint64_t memory_usage = 0; +}; + // A parquet file writer for seastar. class writer { public: @@ -54,13 +62,34 @@ class writer { // Write the record as a row to the parquet file. This value must exactly // match the provided schema. // - // The returned future must be awaited *before* calling write_row again with - // another value. + // This method may not be called concurrently with other methods on this + // class. ss::future<> write_row(group_value); + // The current stats on the buffered row group. + // + // This can be used to account for memory usage and flush a row group + // when the memory usage is over some limit. + row_group_stats current_row_group_stats() const; + + // Flush the current row group to the output stream, creating a new row + // group. + // + // This may only be called if there is at least a single row in the group. + // + // This method may not be called concurrently with other methods on this + // class. + ss::future<> flush_row_group(); + // Close the writer by writing the parquet file footer. Then flush/close the // underlying stream that is being written too. // + // This method will automatically flush the current row group if there are + // any rows buffered in it. + // + // This method may not be called concurrently with other methods on this + // class. + // // The resulting future must be awaited before destroying this object. ss::future<> close();