Skip to content

Commit

Permalink
Merge pull request #24129 from mmaslankaprv/writer-refactoring
Browse files Browse the repository at this point in the history
Datalake `data_writer` interface refactoring
  • Loading branch information
mmaslankaprv authored Nov 18, 2024
2 parents bbc1d0f + ca6c90c commit 7e06f41
Show file tree
Hide file tree
Showing 26 changed files with 636 additions and 259 deletions.
21 changes: 21 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ redpanda_cc_library(
"//src/v/base",
"//src/v/iceberg:datatypes",
"//src/v/iceberg:values",
"@seastar",
],
)

Expand Down Expand Up @@ -456,3 +457,23 @@ redpanda_cc_library(
"//src/v/serde/parquet:value",
],
)

redpanda_cc_library(
name = "local_parquet_file_writer",
srcs = [
"local_parquet_file_writer.cc",
],
hdrs = [
"local_parquet_file_writer.h",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
":logger",
":writer",
"//src/v/base",
"//src/v/iceberg:datatypes",
"//src/v/iceberg:values",
"@seastar",
],
)
3 changes: 2 additions & 1 deletion src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ v_cc_library(
batching_parquet_writer.cc
catalog_schema_manager.cc
data_writer_interface.cc
parquet_writer.cc
arrow_writer.cc
record_multiplexer.cc
record_schema_resolver.cc
record_translator.cc
Expand All @@ -50,6 +50,7 @@ v_cc_library(
translation_task.cc
schema_parquet.cc
values_parquet.cc
local_parquet_file_writer.cc
DEPS
v::cloud_io
v::datalake_common
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/parquet_writer.h"
#include "datalake/arrow_writer.h"

#include "bytes/iobuf.h"
#include "datalake/data_writer_interface.h"

#include <arrow/array/array_base.h>
#include <arrow/chunked_array.h>
Expand All @@ -22,7 +21,6 @@
#include <parquet/arrow/writer.h>
#include <parquet/properties.h>

#include <iostream>
#include <memory>
#include <stdexcept>

Expand Down
File renamed without changes.
122 changes: 24 additions & 98 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <seastar/core/seastar.hh>
#include <seastar/coroutine/as_future.hh>

#include <cstdint>
#include <exception>
#include <memory>
#include <utility>
Expand All @@ -34,50 +33,15 @@ batching_parquet_writer::batching_parquet_writer(
const iceberg::struct_type& schema,
size_t row_count_threshold,
size_t byte_count_threshold,
local_path output_file_path)
ss::output_stream<char> output)
: _iceberg_to_arrow(schema)
, _arrow_to_iobuf(_iceberg_to_arrow.build_arrow_schema())
, _row_count_threshold{row_count_threshold}
, _byte_count_threshold{byte_count_threshold}
, _output_file_path(std::move(output_file_path)) {}
, _output_stream(std::move(output)) {}

ss::future<checked<std::nullopt_t, data_writer_error>>
batching_parquet_writer::initialize() {
vlog(datalake_log.info, "Writing Parquet file to {}", _output_file_path);
try {
_output_file = co_await ss::open_file_dma(
_output_file_path().string(),
ss::open_flags::create | ss::open_flags::truncate
| ss::open_flags::wo);
} catch (...) {
vlog(
datalake_log.error,
"Error opening output file {} - {}",
_output_file_path,
std::current_exception());
co_return data_writer_error::file_io_error;
}
bool error = false;
try {
_output_stream = co_await ss::make_file_output_stream(_output_file);
} catch (...) {
vlog(
datalake_log.error,
"Error making output stream for file {} - {}",
_output_file_path,
std::current_exception());
error = true;
}
if (error) {
co_await _output_file.close();
co_return data_writer_error::file_io_error;
}

co_return std::nullopt;
}

ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
iceberg::struct_value data, int64_t approx_size) {
ss::future<writer_error> batching_parquet_writer::add_data_struct(
iceberg::struct_value data, size_t approx_size) {
bool error = false;
try {
_iceberg_to_arrow.add_data(std::move(data));
Expand All @@ -89,8 +53,7 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::parquet_conversion_error;
co_return writer_error::parquet_conversion_error;
}
_row_count++;
_byte_count += approx_size;
Expand All @@ -100,22 +63,19 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
|| _byte_count > _byte_count_threshold) {
co_return co_await write_row_group();
}
co_return data_writer_error::ok;

co_return writer_error::ok;
}

ss::future<result<local_file_metadata, data_writer_error>>
batching_parquet_writer::finish() {
local_file_metadata file_meta;
ss::future<writer_error> batching_parquet_writer::finish() {
auto write_result = co_await write_row_group();
if (write_result != data_writer_error::ok) {
co_await abort();
co_return write_result;
if (write_result != writer_error::ok) {
co_return writer_error::ok;
}
bool error = false;
iobuf out;
try {
out = _arrow_to_iobuf.close_and_take_iobuf();
_total_bytes += out.size_bytes();

} catch (...) {
vlog(
Expand All @@ -125,8 +85,7 @@ batching_parquet_writer::finish() {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::parquet_conversion_error;
co_return writer_error::parquet_conversion_error;
}

try {
Expand All @@ -140,32 +99,25 @@ batching_parquet_writer::finish() {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::file_io_error;
co_return writer_error::file_io_error;
}

co_return local_file_metadata{
.path = _output_file_path,
.row_count = _total_row_count,
.size_bytes = _total_bytes,
};
co_return writer_error::ok;
}

ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
ss::future<writer_error> batching_parquet_writer::write_row_group() {
if (_row_count == 0) {
// This can happen if finish() is called when there is no new data.
co_return data_writer_error::ok;
co_return writer_error::ok;
}
bool error = false;
iobuf out;
try {
auto chunk = _iceberg_to_arrow.take_chunk();
_total_row_count += _row_count;
_row_count = 0;
_byte_count = 0;
_arrow_to_iobuf.add_arrow_array(chunk);
out = _arrow_to_iobuf.take_iobuf();
_total_bytes += out.size_bytes();
} catch (...) {
vlog(
datalake_log.error,
Expand All @@ -174,8 +126,7 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::parquet_conversion_error;
co_return writer_error::parquet_conversion_error;
}
try {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
Expand All @@ -187,48 +138,23 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::file_io_error;
}
co_return data_writer_error::ok;
}

ss::future<> batching_parquet_writer::abort() {
co_await _output_stream.close();
auto exists = co_await ss::file_exists(_output_file_path().string());
if (exists) {
co_await ss::remove_file(_output_file_path().string());
co_return writer_error::file_io_error;
}
co_return writer_error::ok;
}

batching_parquet_writer_factory::batching_parquet_writer_factory(
local_path base_directory,
ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold)
: _base_directory{std::move(base_directory)}
, _file_name_prefix{std::move(file_name_prefix)}
, _row_count_threshold{row_count_threshold}
size_t row_count_threshold, size_t byte_count_threshold)
: _row_count_threshold{row_count_threshold}
, _byte_count_threshold{byte_count_threshold} {}

local_path batching_parquet_writer_factory::create_filename() const {
return local_path{
_base_directory()
/ fmt::format("{}-{}.parquet", _file_name_prefix, uuid_t::create())};
}
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
ss::future<std::unique_ptr<parquet_ostream>>
batching_parquet_writer_factory::create_writer(
const iceberg::struct_type& schema) {
auto ret = std::make_unique<batching_parquet_writer>(
const iceberg::struct_type& schema, ss::output_stream<char> output) {
co_return std::make_unique<batching_parquet_writer>(
std::move(schema),
_row_count_threshold,
_byte_count_threshold,
create_filename());

auto result = co_await ret->initialize();
if (result.has_error()) {
co_return result.error();
}
co_return ret;
std::move(output));
}
} // namespace datalake
53 changes: 17 additions & 36 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
#pragma once
#include "base/outcome.h"
#include "datalake/arrow_translator.h"
#include "datalake/arrow_writer.h"
#include "datalake/data_writer_interface.h"
#include "datalake/parquet_writer.h"
#include "iceberg/datatypes.h"

#include <seastar/core/file.hh>
Expand All @@ -27,38 +27,30 @@ namespace datalake {
// batching_parquet_writer ties together the low-level components for iceberg to
// parquet translation to provide a high-level interface for creating parquet
// files from iceberg::value. It:
// 1. Opens a ss::file to store the results
// 2. Accepts iceberg::value and collects them in an arrow_translator
// 3. Once the row count or size threshold is reached it writes data to the
// file:
// 1. Accepts iceberg::value and collects them in an arrow_translator
// 2. Once the row count or size threshold is reached it writes data to the
// output stream:
// 1. takes a chunk from the arrow_translator
// 2. Adds the chunk to the parquet_writer
// 3. Extracts iobufs from the parquet_writer
// 4. Writes them to the open file
// 4. Writes them to the stream
// 4. When finish() is called it flushes all remaining data and closes the
// files.
class batching_parquet_writer : public data_writer {
// stream.
class batching_parquet_writer : public parquet_ostream {
public:
batching_parquet_writer(
const iceberg::struct_type& schema,
size_t row_count_threshold,
size_t byte_count_threshold,
local_path output_file_path);
ss::output_stream<char> output_stream);

ss::future<checked<std::nullopt_t, data_writer_error>> initialize();
ss::future<writer_error>
add_data_struct(iceberg::struct_value data, size_t approx_size) override;

ss::future<data_writer_error>
add_data_struct(iceberg::struct_value data, int64_t approx_size) override;

ss::future<result<local_file_metadata, data_writer_error>>
finish() override;

// Close the file handle, delete any temporary data and clean up any other
// state.
ss::future<> abort();
ss::future<writer_error> finish() override;

private:
ss::future<data_writer_error> write_row_group();
ss::future<writer_error> write_row_group();

// translating
arrow_translator _iceberg_to_arrow;
Expand All @@ -69,31 +61,20 @@ class batching_parquet_writer : public data_writer {
size_t _byte_count_threshold;
size_t _row_count = 0;
size_t _byte_count = 0;
size_t _total_row_count = 0;
size_t _total_bytes = 0;

// Output
local_path _output_file_path;
ss::file _output_file;
ss::output_stream<char> _output_stream;
};

class batching_parquet_writer_factory : public data_writer_factory {
class batching_parquet_writer_factory : public parquet_ostream_factory {
public:
batching_parquet_writer_factory(
local_path base_directory,
ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold);
size_t row_count_threshold, size_t byte_count_threshold);

ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
create_writer(const iceberg::struct_type& schema) override;
ss::future<std::unique_ptr<parquet_ostream>> create_writer(
const iceberg::struct_type& schema,
ss::output_stream<char> output) override;

private:
local_path create_filename() const;

local_path _base_directory;
ss::sstring _file_name_prefix;
size_t _row_count_threshold;
size_t _byte_count_threshold;
};
Expand Down
Loading

0 comments on commit 7e06f41

Please sign in to comment.