diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index 11b45b51604d5..3329a445a609d 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -159,6 +159,7 @@ redpanda_cc_library( "//src/v/base", "//src/v/iceberg:datatypes", "//src/v/iceberg:values", + "@seastar", ], ) @@ -456,3 +457,23 @@ redpanda_cc_library( "//src/v/serde/parquet:value", ], ) + +redpanda_cc_library( + name = "local_parquet_file_writer", + srcs = [ + "local_parquet_file_writer.cc", + ], + hdrs = [ + "local_parquet_file_writer.h", + ], + include_prefix = "datalake", + visibility = [":__subpackages__"], + deps = [ + ":logger", + ":writer", + "//src/v/base", + "//src/v/iceberg:datatypes", + "//src/v/iceberg:values", + "@seastar", + ], +) diff --git a/src/v/datalake/CMakeLists.txt b/src/v/datalake/CMakeLists.txt index b0c4c1638af9c..b2d9ba7f7cc48 100644 --- a/src/v/datalake/CMakeLists.txt +++ b/src/v/datalake/CMakeLists.txt @@ -34,7 +34,7 @@ v_cc_library( batching_parquet_writer.cc catalog_schema_manager.cc data_writer_interface.cc - parquet_writer.cc + arrow_writer.cc record_multiplexer.cc record_schema_resolver.cc record_translator.cc @@ -50,6 +50,7 @@ v_cc_library( translation_task.cc schema_parquet.cc values_parquet.cc + local_parquet_file_writer.cc DEPS v::cloud_io v::datalake_common diff --git a/src/v/datalake/parquet_writer.cc b/src/v/datalake/arrow_writer.cc similarity index 97% rename from src/v/datalake/parquet_writer.cc rename to src/v/datalake/arrow_writer.cc index 00363a0269ca6..2cc288c7c2703 100644 --- a/src/v/datalake/parquet_writer.cc +++ b/src/v/datalake/arrow_writer.cc @@ -7,10 +7,9 @@ * * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ -#include "datalake/parquet_writer.h" +#include "datalake/arrow_writer.h" #include "bytes/iobuf.h" -#include "datalake/data_writer_interface.h" #include #include @@ -22,7 +21,6 @@ #include #include -#include #include #include diff --git a/src/v/datalake/parquet_writer.h b/src/v/datalake/arrow_writer.h similarity index 100% rename from src/v/datalake/parquet_writer.h rename to src/v/datalake/arrow_writer.h diff --git a/src/v/datalake/batching_parquet_writer.cc b/src/v/datalake/batching_parquet_writer.cc index 7eaab253b3adc..32ab4a69de36e 100644 --- a/src/v/datalake/batching_parquet_writer.cc +++ b/src/v/datalake/batching_parquet_writer.cc @@ -23,7 +23,6 @@ #include #include -#include #include #include #include @@ -34,50 +33,15 @@ batching_parquet_writer::batching_parquet_writer( const iceberg::struct_type& schema, size_t row_count_threshold, size_t byte_count_threshold, - local_path output_file_path) + ss::output_stream output) : _iceberg_to_arrow(schema) , _arrow_to_iobuf(_iceberg_to_arrow.build_arrow_schema()) , _row_count_threshold{row_count_threshold} , _byte_count_threshold{byte_count_threshold} - , _output_file_path(std::move(output_file_path)) {} + , _output_stream(std::move(output)) {} -ss::future> -batching_parquet_writer::initialize() { - vlog(datalake_log.info, "Writing Parquet file to {}", _output_file_path); - try { - _output_file = co_await ss::open_file_dma( - _output_file_path().string(), - ss::open_flags::create | ss::open_flags::truncate - | ss::open_flags::wo); - } catch (...) { - vlog( - datalake_log.error, - "Error opening output file {} - {}", - _output_file_path, - std::current_exception()); - co_return data_writer_error::file_io_error; - } - bool error = false; - try { - _output_stream = co_await ss::make_file_output_stream(_output_file); - } catch (...) { - vlog( - datalake_log.error, - "Error making output stream for file {} - {}", - _output_file_path, - std::current_exception()); - error = true; - } - if (error) { - co_await _output_file.close(); - co_return data_writer_error::file_io_error; - } - - co_return std::nullopt; -} - -ss::future batching_parquet_writer::add_data_struct( - iceberg::struct_value data, int64_t approx_size) { +ss::future batching_parquet_writer::add_data_struct( + iceberg::struct_value data, size_t approx_size) { bool error = false; try { _iceberg_to_arrow.add_data(std::move(data)); @@ -89,8 +53,7 @@ ss::future batching_parquet_writer::add_data_struct( error = true; } if (error) { - co_await abort(); - co_return data_writer_error::parquet_conversion_error; + co_return writer_error::parquet_conversion_error; } _row_count++; _byte_count += approx_size; @@ -100,22 +63,19 @@ ss::future batching_parquet_writer::add_data_struct( || _byte_count > _byte_count_threshold) { co_return co_await write_row_group(); } - co_return data_writer_error::ok; + + co_return writer_error::ok; } -ss::future> -batching_parquet_writer::finish() { - local_file_metadata file_meta; +ss::future batching_parquet_writer::finish() { auto write_result = co_await write_row_group(); - if (write_result != data_writer_error::ok) { - co_await abort(); - co_return write_result; + if (write_result != writer_error::ok) { + co_return writer_error::ok; } bool error = false; iobuf out; try { out = _arrow_to_iobuf.close_and_take_iobuf(); - _total_bytes += out.size_bytes(); } catch (...) { vlog( @@ -125,8 +85,7 @@ batching_parquet_writer::finish() { error = true; } if (error) { - co_await abort(); - co_return data_writer_error::parquet_conversion_error; + co_return writer_error::parquet_conversion_error; } try { @@ -140,32 +99,25 @@ batching_parquet_writer::finish() { error = true; } if (error) { - co_await abort(); - co_return data_writer_error::file_io_error; + co_return writer_error::file_io_error; } - co_return local_file_metadata{ - .path = _output_file_path, - .row_count = _total_row_count, - .size_bytes = _total_bytes, - }; + co_return writer_error::ok; } -ss::future batching_parquet_writer::write_row_group() { +ss::future batching_parquet_writer::write_row_group() { if (_row_count == 0) { // This can happen if finish() is called when there is no new data. - co_return data_writer_error::ok; + co_return writer_error::ok; } bool error = false; iobuf out; try { auto chunk = _iceberg_to_arrow.take_chunk(); - _total_row_count += _row_count; _row_count = 0; _byte_count = 0; _arrow_to_iobuf.add_arrow_array(chunk); out = _arrow_to_iobuf.take_iobuf(); - _total_bytes += out.size_bytes(); } catch (...) { vlog( datalake_log.error, @@ -174,8 +126,7 @@ ss::future batching_parquet_writer::write_row_group() { error = true; } if (error) { - co_await abort(); - co_return data_writer_error::parquet_conversion_error; + co_return writer_error::parquet_conversion_error; } try { co_await write_iobuf_to_output_stream(std::move(out), _output_stream); @@ -187,48 +138,23 @@ ss::future batching_parquet_writer::write_row_group() { error = true; } if (error) { - co_await abort(); - co_return data_writer_error::file_io_error; - } - co_return data_writer_error::ok; -} - -ss::future<> batching_parquet_writer::abort() { - co_await _output_stream.close(); - auto exists = co_await ss::file_exists(_output_file_path().string()); - if (exists) { - co_await ss::remove_file(_output_file_path().string()); + co_return writer_error::file_io_error; } + co_return writer_error::ok; } batching_parquet_writer_factory::batching_parquet_writer_factory( - local_path base_directory, - ss::sstring file_name_prefix, - size_t row_count_threshold, - size_t byte_count_threshold) - : _base_directory{std::move(base_directory)} - , _file_name_prefix{std::move(file_name_prefix)} - , _row_count_threshold{row_count_threshold} + size_t row_count_threshold, size_t byte_count_threshold) + : _row_count_threshold{row_count_threshold} , _byte_count_threshold{byte_count_threshold} {} -local_path batching_parquet_writer_factory::create_filename() const { - return local_path{ - _base_directory() - / fmt::format("{}-{}.parquet", _file_name_prefix, uuid_t::create())}; -} -ss::future, data_writer_error>> +ss::future> batching_parquet_writer_factory::create_writer( - const iceberg::struct_type& schema) { - auto ret = std::make_unique( + const iceberg::struct_type& schema, ss::output_stream output) { + co_return std::make_unique( std::move(schema), _row_count_threshold, _byte_count_threshold, - create_filename()); - - auto result = co_await ret->initialize(); - if (result.has_error()) { - co_return result.error(); - } - co_return ret; + std::move(output)); } } // namespace datalake diff --git a/src/v/datalake/batching_parquet_writer.h b/src/v/datalake/batching_parquet_writer.h index f331e8a6499ba..c2abfd33bf277 100644 --- a/src/v/datalake/batching_parquet_writer.h +++ b/src/v/datalake/batching_parquet_writer.h @@ -11,8 +11,8 @@ #pragma once #include "base/outcome.h" #include "datalake/arrow_translator.h" +#include "datalake/arrow_writer.h" #include "datalake/data_writer_interface.h" -#include "datalake/parquet_writer.h" #include "iceberg/datatypes.h" #include @@ -27,38 +27,30 @@ namespace datalake { // batching_parquet_writer ties together the low-level components for iceberg to // parquet translation to provide a high-level interface for creating parquet // files from iceberg::value. It: -// 1. Opens a ss::file to store the results -// 2. Accepts iceberg::value and collects them in an arrow_translator -// 3. Once the row count or size threshold is reached it writes data to the -// file: +// 1. Accepts iceberg::value and collects them in an arrow_translator +// 2. Once the row count or size threshold is reached it writes data to the +// output stream: // 1. takes a chunk from the arrow_translator // 2. Adds the chunk to the parquet_writer // 3. Extracts iobufs from the parquet_writer -// 4. Writes them to the open file +// 4. Writes them to the stream // 4. When finish() is called it flushes all remaining data and closes the -// files. -class batching_parquet_writer : public data_writer { +// stream. +class batching_parquet_writer : public parquet_ostream { public: batching_parquet_writer( const iceberg::struct_type& schema, size_t row_count_threshold, size_t byte_count_threshold, - local_path output_file_path); + ss::output_stream output_stream); - ss::future> initialize(); + ss::future + add_data_struct(iceberg::struct_value data, size_t approx_size) override; - ss::future - add_data_struct(iceberg::struct_value data, int64_t approx_size) override; - - ss::future> - finish() override; - - // Close the file handle, delete any temporary data and clean up any other - // state. - ss::future<> abort(); + ss::future finish() override; private: - ss::future write_row_group(); + ss::future write_row_group(); // translating arrow_translator _iceberg_to_arrow; @@ -69,31 +61,20 @@ class batching_parquet_writer : public data_writer { size_t _byte_count_threshold; size_t _row_count = 0; size_t _byte_count = 0; - size_t _total_row_count = 0; - size_t _total_bytes = 0; - // Output - local_path _output_file_path; - ss::file _output_file; ss::output_stream _output_stream; }; -class batching_parquet_writer_factory : public data_writer_factory { +class batching_parquet_writer_factory : public parquet_ostream_factory { public: batching_parquet_writer_factory( - local_path base_directory, - ss::sstring file_name_prefix, - size_t row_count_threshold, - size_t byte_count_threshold); + size_t row_count_threshold, size_t byte_count_threshold); - ss::future, data_writer_error>> - create_writer(const iceberg::struct_type& schema) override; + ss::future> create_writer( + const iceberg::struct_type& schema, + ss::output_stream output) override; private: - local_path create_filename() const; - - local_path _base_directory; - ss::sstring _file_name_prefix; size_t _row_count_threshold; size_t _byte_count_threshold; }; diff --git a/src/v/datalake/data_writer_interface.cc b/src/v/datalake/data_writer_interface.cc index ed43dd8b26cc0..b8f29a2094db2 100644 --- a/src/v/datalake/data_writer_interface.cc +++ b/src/v/datalake/data_writer_interface.cc @@ -13,14 +13,14 @@ #include namespace datalake { std::string data_writer_error_category::message(int ev) const { - switch (static_cast(ev)) { - case data_writer_error::ok: + switch (static_cast(ev)) { + case writer_error::ok: return "Ok"; - case data_writer_error::parquet_conversion_error: + case writer_error::parquet_conversion_error: return "Parquet Conversion Error"; - case data_writer_error::file_io_error: + case writer_error::file_io_error: return "File IO Error"; - case data_writer_error::no_data: + case writer_error::no_data: return "No data"; } } diff --git a/src/v/datalake/data_writer_interface.h b/src/v/datalake/data_writer_interface.h index ab9dd77f22796..d965d000e06aa 100644 --- a/src/v/datalake/data_writer_interface.h +++ b/src/v/datalake/data_writer_interface.h @@ -14,11 +14,13 @@ #include "iceberg/datatypes.h" #include "iceberg/values.h" +#include + #include namespace datalake { -enum class data_writer_error { +enum class writer_error { ok = 0, parquet_conversion_error, file_io_error, @@ -36,38 +38,80 @@ struct data_writer_error_category : std::error_category { } }; -inline std::error_code make_error_code(data_writer_error e) noexcept { +inline std::error_code make_error_code(writer_error e) noexcept { return {static_cast(e), data_writer_error_category::error_category()}; } -class data_writer { +/** + * Parquet writer interface. The writer should write parquet serialized data to + * the output stream provided during its creation. + */ +class parquet_ostream { +public: + explicit parquet_ostream() = default; + parquet_ostream(const parquet_ostream&) = delete; + parquet_ostream(parquet_ostream&&) = default; + parquet_ostream& operator=(const parquet_ostream&) = delete; + parquet_ostream& operator=(parquet_ostream&&) = default; + virtual ~parquet_ostream() = default; + + virtual ss::future + add_data_struct(iceberg::struct_value, size_t) = 0; + + virtual ss::future finish() = 0; +}; + +class parquet_ostream_factory { +public: + parquet_ostream_factory() = default; + parquet_ostream_factory(const parquet_ostream_factory&) = default; + parquet_ostream_factory(parquet_ostream_factory&&) = delete; + parquet_ostream_factory& operator=(const parquet_ostream_factory&) + = default; + parquet_ostream_factory& operator=(parquet_ostream_factory&&) = delete; + + virtual ~parquet_ostream_factory() = default; + + virtual ss::future> + create_writer(const iceberg::struct_type&, ss::output_stream) = 0; +}; + +/** + * Interface of a parquet file writer. The file writer finishes by returning + * file metadata. In future we may want to change the return type of this + * interface to me more generic and allow to express that writer can return + * either a local file path or a remote path. + */ +class parquet_file_writer { public: - data_writer() = default; - data_writer(const data_writer&) = delete; - data_writer(data_writer&&) = default; - data_writer& operator=(const data_writer&) = delete; - data_writer& operator=(data_writer&&) = delete; + parquet_file_writer() = default; + parquet_file_writer(const parquet_file_writer&) = delete; + parquet_file_writer(parquet_file_writer&&) = default; + parquet_file_writer& operator=(const parquet_file_writer&) = delete; + parquet_file_writer& operator=(parquet_file_writer&&) = delete; - virtual ~data_writer() = default; + virtual ~parquet_file_writer() = default; - virtual ss::future add_data_struct( + virtual ss::future add_data_struct( iceberg::struct_value /* data */, int64_t /* approx_size */) = 0; - virtual ss::future> - finish() = 0; + virtual ss::future> finish() = 0; }; -class data_writer_factory { +class parquet_file_writer_factory { public: - data_writer_factory() = default; - data_writer_factory(const data_writer_factory&) = delete; - data_writer_factory(data_writer_factory&&) = default; - data_writer_factory& operator=(const data_writer_factory&) = delete; - data_writer_factory& operator=(data_writer_factory&&) = default; - virtual ~data_writer_factory() = default; - - virtual ss::future, data_writer_error>> + parquet_file_writer_factory() = default; + parquet_file_writer_factory(const parquet_file_writer_factory&) = delete; + parquet_file_writer_factory(parquet_file_writer_factory&&) = default; + parquet_file_writer_factory& operator=(const parquet_file_writer_factory&) + = delete; + parquet_file_writer_factory& operator=(parquet_file_writer_factory&&) + = default; + virtual ~parquet_file_writer_factory() = default; + + virtual ss::future< + result, writer_error>> create_writer(const iceberg::struct_type& /* schema */) = 0; }; @@ -75,5 +119,5 @@ class data_writer_factory { namespace std { template<> -struct is_error_code_enum : true_type {}; +struct is_error_code_enum : true_type {}; } // namespace std diff --git a/src/v/datalake/local_parquet_file_writer.cc b/src/v/datalake/local_parquet_file_writer.cc new file mode 100644 index 0000000000000..c3f2f84ea6e53 --- /dev/null +++ b/src/v/datalake/local_parquet_file_writer.cc @@ -0,0 +1,154 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/local_parquet_file_writer.h" + +#include "base/vlog.h" +#include "datalake/logger.h" + +#include +#include +#include + +namespace datalake { + +local_parquet_file_writer::local_parquet_file_writer( + local_path output_file_path, + ss::shared_ptr writer_factory) + : _output_file_path(std::move(output_file_path)) + , _writer_factory(std::move(writer_factory)) {} + +ss::future> +local_parquet_file_writer::initialize(const iceberg::struct_type& schema) { + vlog(datalake_log.info, "Writing Parquet file to {}", _output_file_path); + try { + _output_file = co_await ss::open_file_dma( + _output_file_path().string(), + ss::open_flags::create | ss::open_flags::truncate + | ss::open_flags::wo); + } catch (...) { + vlog( + datalake_log.error, + "Error opening output file {} - {}", + _output_file_path, + std::current_exception()); + co_return writer_error::file_io_error; + } + + auto fut = co_await ss::coroutine::as_future( + ss::make_file_output_stream(_output_file)); + + if (fut.failed()) { + vlog( + datalake_log.error, + "Error making output stream for file {} - {}", + _output_file_path, + fut.get_exception()); + co_await _output_file.close(); + co_return writer_error::file_io_error; + } + + _writer = co_await _writer_factory->create_writer( + schema, std::move(fut.get())); + _initialized = true; + co_return std::nullopt; +} + +ss::future local_parquet_file_writer::add_data_struct( + iceberg::struct_value data, int64_t sz) { + if (!_initialized) { + co_return writer_error::file_io_error; + } + auto write_result = co_await _writer->add_data_struct(std::move(data), sz); + if (write_result != writer_error::ok) { + vlog( + datalake_log.warn, + "Error writing data to file {} - {}", + _output_file_path, + write_result); + + co_await abort(); + co_return write_result; + } + _raw_bytes_count += sz; + _row_count++; + + co_return writer_error::ok; +} + +ss::future> +local_parquet_file_writer::finish() { + if (!_initialized) { + co_return writer_error::file_io_error; + } + auto result = co_await _writer->finish(); + if (result != writer_error::ok) { + co_await abort(); + co_return result; + } + _initialized = false; + try { + auto f_size = co_await ss::file_size(_output_file_path().string()); + + co_return local_file_metadata{ + .path = _output_file_path, + .row_count = _row_count, + .size_bytes = f_size, + }; + } catch (...) { + vlog( + datalake_log.warn, + "Error querying parquet file {} size - {}", + _output_file_path, + std::current_exception()); + co_return writer_error::file_io_error; + } +} + +ss::future<> local_parquet_file_writer::abort() { + if (!_initialized) { + co_return; + } + co_await _output_file.close(); + auto exists = co_await ss::file_exists(_output_file_path().string()); + if (exists) { + co_await ss::remove_file(_output_file_path().string()); + } + _initialized = false; +} + +local_path local_parquet_file_writer_factory::create_filename() const { + return local_path{ + _base_directory() + / fmt::format("{}-{}.parquet", _file_name_prefix, uuid_t::create())}; +} + +local_parquet_file_writer_factory::local_parquet_file_writer_factory( + local_path base_directory, + ss::sstring file_name_prefix, + ss::shared_ptr writer_factory) + : _base_directory(std::move(base_directory)) + , _file_name_prefix(std::move(file_name_prefix)) + , _writer_factory(std::move(writer_factory)) {} + +ss::future, writer_error>> +local_parquet_file_writer_factory::create_writer( + const iceberg::struct_type& schema) { + auto writer = std::make_unique( + create_filename(), _writer_factory); + + auto res = co_await writer->initialize(schema); + if (res.has_error()) { + co_return res.error(); + } + co_return std::move(writer); +} + +} // namespace datalake diff --git a/src/v/datalake/local_parquet_file_writer.h b/src/v/datalake/local_parquet_file_writer.h new file mode 100644 index 0000000000000..2a22f7334c3fc --- /dev/null +++ b/src/v/datalake/local_parquet_file_writer.h @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "datalake/data_writer_interface.h" + +#include + +namespace datalake { + +/** + * Class wrapping a parquet writer and handling all FILE I/O operations + * necessary to write a temporary local parquet file. The class handles aborts + * and deletes the local file if the error occurs. + */ +class local_parquet_file_writer : public parquet_file_writer { +public: + local_parquet_file_writer( + local_path, ss::shared_ptr); + + ss::future> + initialize(const iceberg::struct_type&); + + ss::future add_data_struct( + iceberg::struct_value /* data */, int64_t /* approx_size */) final; + + ss::future> finish() final; + +private: + ss::future<> abort(); + + local_path _output_file_path; + ss::file _output_file; + size_t _row_count{0}; + size_t _raw_bytes_count{0}; + + std::unique_ptr _writer; + ss::shared_ptr _writer_factory; + bool _initialized{false}; +}; + +class local_parquet_file_writer_factory : public parquet_file_writer_factory { +public: + local_parquet_file_writer_factory( + local_path base_directory, + ss::sstring file_name_prefix, + ss::shared_ptr); + + ss::future, writer_error>> + create_writer(const iceberg::struct_type& schema) final; + +private: + local_path create_filename() const; + + local_path _base_directory; + ss::sstring _file_name_prefix; + ss::shared_ptr _writer_factory; +}; + +} // namespace datalake diff --git a/src/v/datalake/partitioning_writer.cc b/src/v/datalake/partitioning_writer.cc index cb70081543259..bd35c0132ec42 100644 --- a/src/v/datalake/partitioning_writer.cc +++ b/src/v/datalake/partitioning_writer.cc @@ -26,7 +26,7 @@ const auto default_accessors = iceberg::struct_accessor::from_struct_type( default_schema); } // namespace -ss::future +ss::future partitioning_writer::add_data(iceberg::struct_value val, int64_t approx_size) { iceberg::partition_key pk; try { @@ -38,7 +38,7 @@ partitioning_writer::add_data(iceberg::struct_value val, int64_t approx_size) { "Error {} while partitioning value: {}", std::current_exception(), val); - co_return data_writer_error::parquet_conversion_error; + co_return writer_error::parquet_conversion_error; } auto writer_iter = writers_.find(pk); if (writer_iter == writers_.end()) { @@ -57,17 +57,17 @@ partitioning_writer::add_data(iceberg::struct_value val, int64_t approx_size) { auto& writer = writer_iter->second; auto write_res = co_await writer->add_data_struct( std::move(val), approx_size); - if (write_res != data_writer_error::ok) { + if (write_res != writer_error::ok) { vlog(datalake_log.error, "Failed to add data: {}", write_res); co_return write_res; } co_return write_res; } -ss::future, data_writer_error>> +ss::future, writer_error>> partitioning_writer::finish() && { chunked_vector files; - auto first_error = data_writer_error::ok; + auto first_error = writer_error::ok; // TODO: parallelize me! for (auto& [pk, writer] : writers_) { int hour = std::get( @@ -79,7 +79,7 @@ partitioning_writer::finish() && { datalake_log.error, "Failed to finish writer: {}", file_res.error()); - if (first_error == data_writer_error::ok) { + if (first_error == writer_error::ok) { first_error = file_res.error(); } // Even on error, move on so that we can close all the writers. @@ -89,7 +89,7 @@ partitioning_writer::finish() && { file.hour = hour; files.emplace_back(std::move(file)); } - if (first_error != data_writer_error::ok) { + if (first_error != writer_error::ok) { co_return first_error; } co_return files; diff --git a/src/v/datalake/partitioning_writer.h b/src/v/datalake/partitioning_writer.h index d309c8f423993..f13d5560eaa00 100644 --- a/src/v/datalake/partitioning_writer.h +++ b/src/v/datalake/partitioning_writer.h @@ -30,7 +30,7 @@ namespace datalake { class partitioning_writer { public: explicit partitioning_writer( - data_writer_factory& factory, iceberg::struct_type type) + parquet_file_writer_factory& factory, iceberg::struct_type type) : writer_factory_(factory) , type_(std::move(type)) {} @@ -38,24 +38,26 @@ class partitioning_writer { // partition key. // // Expects that the input value abides by the schema denoted by `type_`. - ss::future + ss::future add_data(iceberg::struct_value, int64_t approx_size); // Finishes and returns the list of local files written by the underlying // writers, with the appropriate partitioning metadata filled in. - ss::future, data_writer_error>> + ss::future, writer_error>> finish() &&; private: // Factory for data writers. - data_writer_factory& writer_factory_; + parquet_file_writer_factory& writer_factory_; // The Iceberg message type for the underlying writer. Expected to include // Redpanda-specific fields, e.g. a timestamp field for partitioning. const iceberg::struct_type type_; // Map of partition keys to their corresponding data file writers. - chunked_hash_map> + chunked_hash_map< + iceberg::partition_key, + std::unique_ptr> writers_; }; diff --git a/src/v/datalake/record_multiplexer.cc b/src/v/datalake/record_multiplexer.cc index 1bbaf352c0915..82673b7cd1b6d 100644 --- a/src/v/datalake/record_multiplexer.cc +++ b/src/v/datalake/record_multiplexer.cc @@ -24,7 +24,7 @@ namespace datalake { record_multiplexer::record_multiplexer( const model::ntp& ntp, - std::unique_ptr writer_factory, + std::unique_ptr writer_factory, schema_manager& schema_mgr, type_resolver& type_resolver) : _log(datalake_log, fmt::format("{}", ntp)) @@ -121,7 +121,7 @@ record_multiplexer::operator()(model::record_batch batch) { "Error getting field IDs for record {}: {}", offset, get_ids_res.error()); - _error = data_writer_error::parquet_conversion_error; + _error = writer_error::parquet_conversion_error; } co_return ss::stop_iteration::yes; } @@ -146,7 +146,7 @@ record_multiplexer::operator()(model::record_batch batch) { auto write_result = co_await writer->add_data( std::move(record_data_res.value()), estimated_size); - if (write_result != data_writer_error::ok) { + if (write_result != writer_error::ok) { vlog( _log.warn, "Error adding data to writer for record {}: {}", @@ -161,14 +161,14 @@ record_multiplexer::operator()(model::record_batch batch) { co_return ss::stop_iteration::no; } -ss::future> +ss::future> record_multiplexer::end_of_stream() { if (_error) { co_return *_error; } if (!_result) { // no batches were processed. - co_return data_writer_error::no_data; + co_return writer_error::no_data; } auto writers = std::move(_writers); for (auto& [id, writer] : writers) { @@ -187,7 +187,7 @@ record_multiplexer::end_of_stream() { co_return std::move(*_result); } -ss::future> +ss::future> record_multiplexer::handle_invalid_record( kafka::offset offset, iobuf key, iobuf val, model::timestamp ts) { vlog(_log.debug, "Handling invalid record {}", offset); @@ -204,7 +204,7 @@ record_multiplexer::handle_invalid_record( "Error translating data to binary record {}: {}", offset, record_data_res.error()); - co_return data_writer_error::parquet_conversion_error; + co_return writer_error::parquet_conversion_error; } auto record_type = record_translator::build_type(std::nullopt); @@ -221,7 +221,7 @@ record_multiplexer::handle_invalid_record( "Error getting field IDs for binary record {}: {}", offset, get_ids_res.error()); - co_return data_writer_error::parquet_conversion_error; + co_return writer_error::parquet_conversion_error; } auto [iter, _] = _writers.emplace( record_type.comps, @@ -238,7 +238,7 @@ record_multiplexer::handle_invalid_record( auto& writer = writer_iter->second; auto write_result = co_await writer->add_data( std::move(record_data_res.value()), estimated_size); - if (write_result != data_writer_error::ok) { + if (write_result != writer_error::ok) { vlog( _log.error, "Error adding data to writer for binary record {}: {}", diff --git a/src/v/datalake/record_multiplexer.h b/src/v/datalake/record_multiplexer.h index 1e8a2938d91b1..473d9e2a292dc 100644 --- a/src/v/datalake/record_multiplexer.h +++ b/src/v/datalake/record_multiplexer.h @@ -40,24 +40,24 @@ class record_multiplexer { }; explicit record_multiplexer( const model::ntp& ntp, - std::unique_ptr writer, + std::unique_ptr writer, schema_manager& schema_mgr, type_resolver& type_resolver); ss::future operator()(model::record_batch batch); - ss::future> end_of_stream(); + ss::future> end_of_stream(); private: // Handles the given record components of a record that is invalid for the // target table. // TODO: this just writes to the existing table, populating internal // columns. Consider a separate table entirely. - ss::future> + ss::future> handle_invalid_record(kafka::offset, iobuf, iobuf, model::timestamp); prefix_logger _log; const model::ntp& _ntp; - std::unique_ptr _writer_factory; + std::unique_ptr _writer_factory; schema_manager& _schema_mgr; type_resolver& _type_resolver; chunked_hash_map< @@ -65,7 +65,7 @@ class record_multiplexer { std::unique_ptr> _writers; - std::optional _error; + std::optional _error; std::optional _result; }; diff --git a/src/v/datalake/tests/BUILD b/src/v/datalake/tests/BUILD index 4d12712bad9cb..c7cbb94e5da0d 100644 --- a/src/v/datalake/tests/BUILD +++ b/src/v/datalake/tests/BUILD @@ -65,6 +65,16 @@ redpanda_test_cc_library( ], ) +redpanda_test_cc_library( + name = "test_data", + hdrs = ["test_data.h"], + include_prefix = "datalake/tests", + visibility = ["//visibility:public"], + deps = [ + "//src/v/iceberg:datatypes", + ], +) + #TODO: this wrapper can be removed when we completely remove cmake tests support redpanda_test_cc_library( name = "proto_definitions_wrapper", @@ -292,3 +302,22 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "local_parquet_file_writer_test", + timeout = "short", + srcs = [ + "local_parquet_file_writer_test.cc", + ], + deps = [ + "//src/v/datalake:local_parquet_file_writer", + "//src/v/datalake/tests:test_data", + "//src/v/iceberg:datatypes", + "//src/v/iceberg/tests:value_generator", + "//src/v/test_utils:gtest", + "//src/v/test_utils:seastar_boost", + "@fmt", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/datalake/tests/CMakeLists.txt b/src/v/datalake/tests/CMakeLists.txt index 63dde73fcaf6b..0bd902a797c60 100644 --- a/src/v/datalake/tests/CMakeLists.txt +++ b/src/v/datalake/tests/CMakeLists.txt @@ -175,14 +175,28 @@ target_include_directories(v_datalake_test_proto_cc_files ) rp_test( + UNIT_TEST GTEST - BINARY_NAME batching_parquet_writer_test + BINARY_NAME batching_parquet_writer SOURCES batching_parquet_writer_test.cc LIBRARIES v::gtest_main v::datalake_writer v::iceberg_test_utils - LABELS storage + LABELS datalake + ARGS "-- -c 1" +) + +rp_test( + UNIT_TEST + GTEST + BINARY_NAME local_parquet_file_writer + SOURCES local_parquet_file_writer_test.cc + LIBRARIES + v::gtest_main + v::datalake_writer + v::iceberg_test_utils + LABELS datalake ARGS "-- -c 1" ) diff --git a/src/v/datalake/tests/batching_parquet_writer_test.cc b/src/v/datalake/tests/batching_parquet_writer_test.cc index 2fdfefb35a39e..82d668e68918c 100644 --- a/src/v/datalake/tests/batching_parquet_writer_test.cc +++ b/src/v/datalake/tests/batching_parquet_writer_test.cc @@ -8,6 +8,7 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #include "datalake/batching_parquet_writer.h" +#include "datalake/local_parquet_file_writer.h" #include "datalake/tests/test_data.h" #include "iceberg/tests/value_generator.h" #include "iceberg/values.h" @@ -30,23 +31,21 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) { std::filesystem::path full_path = tmp_dir.get_path() / file_path; int num_rows = 1000; - datalake::batching_parquet_writer writer( - test_schema(iceberg::field_required::no), - 500, - 1000000, - local_path(full_path)); + local_parquet_file_writer file_writer( + local_path(full_path), + ss::make_shared(500, 100000)); - writer.initialize().get(); + file_writer.initialize(test_schema(iceberg::field_required::no)).get(); for (int i = 0; i < num_rows; i++) { auto data = iceberg::tests::make_struct_value( iceberg::tests::value_spec{ .forced_fixed_val = iobuf::from("Hello world")}, test_schema(iceberg::field_required::no)); - writer.add_data_struct(std::move(data), 1000).get(); + file_writer.add_data_struct(std::move(data), 1000).get(); } - auto result = writer.finish().get0(); + auto result = file_writer.finish().get0(); ASSERT_TRUE(result.has_value()); EXPECT_EQ(result.value().path, full_path); EXPECT_EQ(result.value().row_count, num_rows); @@ -71,29 +70,4 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) { EXPECT_EQ(table->num_columns(), 17); } -TEST(BatchingParquetWriterTest, DeletesFileOnAbort) { - temporary_dir tmp_dir("batching_parquet_writer"); - std::filesystem::path file_path = "test_file.parquet"; - int num_rows = 1000; - - datalake::batching_parquet_writer writer( - test_schema(iceberg::field_required::no), - 500, - 1000000, - local_path(tmp_dir.get_path() / file_path)); - - writer.initialize().get(); - - for (int i = 0; i < num_rows; i++) { - auto data = iceberg::tests::make_struct_value( - iceberg::tests::value_spec{ - .forced_fixed_val = iobuf::from("Hello world")}, - test_schema(iceberg::field_required::no)); - writer.add_data_struct(std::move(data), 1000).get0(); - } - writer.abort().get(); - auto exists = ss::file_exists(file_path.c_str()).get(); - EXPECT_FALSE(exists); -} - } // namespace datalake diff --git a/src/v/datalake/tests/gtest_record_multiplexer_test.cc b/src/v/datalake/tests/gtest_record_multiplexer_test.cc index 6566ebc95241e..8b2882743b3b4 100644 --- a/src/v/datalake/tests/gtest_record_multiplexer_test.cc +++ b/src/v/datalake/tests/gtest_record_multiplexer_test.cc @@ -10,6 +10,7 @@ #include "datalake/base_types.h" #include "datalake/batching_parquet_writer.h" #include "datalake/catalog_schema_manager.h" +#include "datalake/local_parquet_file_writer.h" #include "datalake/record_multiplexer.h" #include "datalake/record_schema_resolver.h" #include "datalake/tests/catalog_and_registry_fixture.h" @@ -93,8 +94,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) { }); auto res = reader.consume(std::move(multiplexer), model::no_timeout).get0(); ASSERT_TRUE(res.has_error()); - EXPECT_EQ( - res.error(), datalake::data_writer_error::parquet_conversion_error); + EXPECT_EQ(res.error(), datalake::writer_error::parquet_conversion_error); } TEST(DatalakeMultiplexerTest, WritesDataFiles) { @@ -108,9 +108,11 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) { int batch_count = 20; int start_offset = 1005; - auto writer_factory - = std::make_unique( - datalake::local_path(tmp_dir.get_path()), "data", 100, 10000); + auto writer_factory = std::make_unique( + datalake::local_path(tmp_dir.get_path()), + "data", + ss::make_shared(100, 10000)); + datalake::record_multiplexer multiplexer( ntp, std::move(writer_factory), simple_schema_mgr, bin_resolver); @@ -243,9 +245,11 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) { auto reader = model::make_memory_record_batch_reader(std::move(batches)); temporary_dir tmp_dir("datalake_multiplexer_test"); - auto writer_factory - = std::make_unique( - datalake::local_path(tmp_dir.get_path()), "data", 100, 10000); + + auto writer_factory = std::make_unique( + datalake::local_path(tmp_dir.get_path()), + "data", + ss::make_shared(100, 10000)); record_multiplexer mux( ntp, std::move(writer_factory), schema_mgr, type_resolver); auto res = reader.consume(std::move(mux), model::no_timeout).get(); diff --git a/src/v/datalake/tests/local_parquet_file_writer_test.cc b/src/v/datalake/tests/local_parquet_file_writer_test.cc new file mode 100644 index 0000000000000..a7cc3e9b0c3c2 --- /dev/null +++ b/src/v/datalake/tests/local_parquet_file_writer_test.cc @@ -0,0 +1,158 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/local_parquet_file_writer.h" +#include "datalake/tests/test_data.h" +#include "iceberg/tests/value_generator.h" +#include "test_utils/test.h" +#include "test_utils/tmp_dir.h" + +#include + +#include + +#include + +namespace { +struct test_writer : datalake::parquet_ostream { + test_writer( + size_t error_after_rows, bool error_on_finish, ss::output_stream os) + : error_after_rows_(error_after_rows) + , error_on_finish_(error_on_finish) + , os_(std::move(os)) {} + ss::future + add_data_struct(iceberg::struct_value, size_t) final { + if (rows_ >= error_after_rows_) { + co_return datalake::writer_error::file_io_error; + } + rows_++; + + co_return datalake::writer_error::ok; + }; + + ss::future finish() final { + if (error_on_finish_) { + co_return datalake::writer_error::file_io_error; + } + co_await os_.close(); + co_return datalake::writer_error::ok; + } + + size_t error_after_rows_; + bool error_on_finish_; + size_t rows_{0}; + ss::output_stream os_; +}; + +struct test_writer_factory : datalake::parquet_ostream_factory { + explicit test_writer_factory( + size_t error_after_rows = std::numeric_limits::max(), + bool error_on_finish = false) + : error_after_rows_(error_after_rows) + , error_on_finish_(error_on_finish) {} + + ss::future> create_writer( + const iceberg::struct_type&, ss::output_stream os) final { + co_return std::make_unique( + error_after_rows_, error_on_finish_, std::move(os)); + }; + + size_t error_after_rows_; + bool error_on_finish_; +}; + +} // namespace + +struct LocalFileWriterTest : public testing::Test { + // Sets up the test fixture. + void SetUp() final {} + + // Tears down the test fixture. + void TearDown() final {} + + temporary_dir tmp_dir = temporary_dir("batching_parquet_writer"); + std::filesystem::path file_path = "test_file.parquet"; + std::filesystem::path full_path = tmp_dir.get_path() / file_path; +}; + +TEST_F(LocalFileWriterTest, TestHappyPath) { + datalake::local_parquet_file_writer file_writer( + datalake::local_path(full_path), ss::make_shared()); + + auto schema = test_schema(iceberg::field_required::no); + file_writer.initialize(schema).get(); + + size_t rows = 1000; + for (size_t i = 0; i < rows; i++) { + auto data = iceberg::tests::make_struct_value( + iceberg::tests::value_spec{}, + test_schema(iceberg::field_required::no)); + + auto res = file_writer.add_data_struct(std::move(data), 1000).get(); + ASSERT_EQ(datalake::writer_error::ok, res); + } + + auto result = file_writer.finish().get(); + + ASSERT_TRUE(result.has_value()); + + EXPECT_EQ(result.value().path, full_path); + EXPECT_EQ(result.value().row_count, rows); + auto true_file_size = std::filesystem::file_size(full_path); + EXPECT_EQ(result.value().size_bytes, true_file_size); +} + +TEST_F(LocalFileWriterTest, TestErrorOnWrite) { + datalake::local_parquet_file_writer file_writer( + datalake::local_path(full_path), + ss::make_shared(100)); + auto schema = test_schema(iceberg::field_required::no); + file_writer.initialize(schema).get(); + + size_t rows = 1000; + for (size_t i = 0; i < rows; i++) { + auto data = iceberg::tests::make_struct_value( + iceberg::tests::value_spec{}, + test_schema(iceberg::field_required::no)); + + file_writer.add_data_struct(std::move(data), 1000).get(); + } + + auto result = file_writer.finish().get(); + + ASSERT_TRUE(result.has_error()); + + // intermediate file shuld be removed + ASSERT_FALSE(std::filesystem::exists(full_path)); +} + +TEST_F(LocalFileWriterTest, TestErrorOnFinish) { + datalake::local_parquet_file_writer file_writer( + datalake::local_path(full_path), + ss::make_shared(5000, true)); + auto schema = test_schema(iceberg::field_required::no); + file_writer.initialize(schema).get(); + + size_t rows = 1000; + for (size_t i = 0; i < rows; i++) { + auto data = iceberg::tests::make_struct_value( + iceberg::tests::value_spec{}, + test_schema(iceberg::field_required::no)); + + file_writer.add_data_struct(std::move(data), 1000).get(); + } + + auto result = file_writer.finish().get(); + + ASSERT_TRUE(result.has_error()); + + // intermediate file should be removed + ASSERT_FALSE(std::filesystem::exists(full_path)); +} diff --git a/src/v/datalake/tests/parquet_writer_test.cc b/src/v/datalake/tests/parquet_writer_test.cc index 84697f9cf6c10..5501fcc3d7957 100644 --- a/src/v/datalake/tests/parquet_writer_test.cc +++ b/src/v/datalake/tests/parquet_writer_test.cc @@ -10,7 +10,7 @@ #include "bytes/bytes.h" #include "datalake/arrow_translator.h" -#include "datalake/parquet_writer.h" +#include "datalake/arrow_writer.h" #include "datalake/tests/test_data.h" #include "iceberg/tests/value_generator.h" #include "utils/file_io.h" diff --git a/src/v/datalake/tests/partitioning_writer_test.cc b/src/v/datalake/tests/partitioning_writer_test.cc index f806476053957..1b8bd5134319e 100644 --- a/src/v/datalake/tests/partitioning_writer_test.cc +++ b/src/v/datalake/tests/partitioning_writer_test.cc @@ -68,7 +68,7 @@ TEST_P(PartitioningWriterExtraColumnsTest, TestSchemaHappyPath) { // Give the data to the partitioning writer. for (auto& v : source_vals) { auto err = writer.add_data(std::move(v), /*approx_size=*/0).get(); - EXPECT_EQ(err, data_writer_error::ok); + EXPECT_EQ(err, writer_error::ok); } // The resulting files should match the number of hours the records were @@ -104,7 +104,7 @@ TEST(PartitioningWriterTest, TestWriterError) { val_with_timestamp(field, model::timestamp::now()), /*approx_size=*/0) .get(); - EXPECT_EQ(err, data_writer_error::parquet_conversion_error); + EXPECT_EQ(err, writer_error::parquet_conversion_error); } TEST(PartitioningWriterTest, TestUnexpectedSchema) { @@ -118,5 +118,5 @@ TEST(PartitioningWriterTest, TestUnexpectedSchema) { unexpected_field_type, model::timestamp::now()), /*approx_size=*/0) .get(); - EXPECT_EQ(err, data_writer_error::parquet_conversion_error); + EXPECT_EQ(err, writer_error::parquet_conversion_error); } diff --git a/src/v/datalake/tests/test_data_writer.h b/src/v/datalake/tests/test_data_writer.h index 977e043a0a493..1db0e87444ae5 100644 --- a/src/v/datalake/tests/test_data_writer.h +++ b/src/v/datalake/tests/test_data_writer.h @@ -19,7 +19,7 @@ #include namespace datalake { -class test_data_writer : public data_writer { +class test_data_writer : public parquet_file_writer { public: explicit test_data_writer( const iceberg::struct_type& schema, bool return_error) @@ -27,19 +27,18 @@ class test_data_writer : public data_writer { , _result{} , _return_error{return_error} {} - ss::future add_data_struct( + ss::future add_data_struct( iceberg::struct_value /* data */, int64_t /* approx_size */) override { _result.row_count++; - data_writer_error status - = _return_error ? data_writer_error::parquet_conversion_error - : data_writer_error::ok; - return ss::make_ready_future(status); + writer_error status = _return_error + ? writer_error::parquet_conversion_error + : writer_error::ok; + return ss::make_ready_future(status); } - ss::future> - finish() override { - return ss::make_ready_future< - result>(_result); + ss::future> finish() override { + return ss::make_ready_future>( + _result); } private: @@ -47,12 +46,12 @@ class test_data_writer : public data_writer { local_file_metadata _result; bool _return_error; }; -class test_data_writer_factory : public data_writer_factory { +class test_data_writer_factory : public parquet_file_writer_factory { public: explicit test_data_writer_factory(bool return_error) : _return_error{return_error} {} - ss::future, data_writer_error>> + ss::future, writer_error>> create_writer(const iceberg::struct_type& schema) override { co_return std::make_unique( std::move(schema), _return_error); diff --git a/src/v/datalake/tests/translation_task_test.cc b/src/v/datalake/tests/translation_task_test.cc index 5ef47a025fb71..03ab82b5fa579 100644 --- a/src/v/datalake/tests/translation_task_test.cc +++ b/src/v/datalake/tests/translation_task_test.cc @@ -13,6 +13,7 @@ #include "datalake/batching_parquet_writer.h" #include "datalake/catalog_schema_manager.h" #include "datalake/cloud_data_io.h" +#include "datalake/local_parquet_file_writer.h" #include "datalake/record_schema_resolver.h" #include "datalake/translation_task.h" #include "model/record_batch_reader.h" @@ -72,13 +73,13 @@ class TranslateTaskTest return model::make_memory_record_batch_reader(std::move(batches)); } - std::unique_ptr get_writer_factory( + std::unique_ptr get_writer_factory( size_t row_threshold = 200, size_t bytes_threshold = 4096) { - return std::make_unique( + return std::make_unique( datalake::local_path(tmp_dir.get_path()), "test-prefix", - row_threshold, - bytes_threshold); + ss::make_shared( + row_threshold, bytes_threshold)); } lazy_abort_source& never_abort() { diff --git a/src/v/datalake/translation/partition_translator.cc b/src/v/datalake/translation/partition_translator.cc index 350ac31f664ee..8f954d4e3a0d7 100644 --- a/src/v/datalake/translation/partition_translator.cc +++ b/src/v/datalake/translation/partition_translator.cc @@ -16,6 +16,7 @@ #include "datalake/coordinator/frontend.h" #include "datalake/coordinator/translated_offset_range.h" #include "datalake/data_writer_interface.h" +#include "datalake/local_parquet_file_writer.h" #include "datalake/logger.h" #include "datalake/record_multiplexer.h" #include "datalake/translation/state_machine.h" @@ -178,11 +179,13 @@ partition_translator::do_translation_for_range( kafka::offset begin_offset) { // This configuration only writes a single row group per file but we limit // the bytes via the reader max_bytes. - auto writer_factory = std::make_unique( + auto writer_factory = std::make_unique( local_path{_writer_scratch_space}, // storage temp files are written to fmt::format("{}", begin_offset), // file prefix - max_rows_per_row_group, // max entries per single parquet row group - max_bytes_per_row_group); // max bytes per single parquet row group + ss::make_shared( + max_rows_per_row_group, // max entries per single parquet row group + max_bytes_per_row_group)); // max bytes per single parquet row group + auto task = translation_task{**_cloud_io, *_schema_mgr, *_type_resolver}; const auto& ntp = _partition->ntp(); auto remote_path_prefix = remote_path{ diff --git a/src/v/datalake/translation_task.cc b/src/v/datalake/translation_task.cc index 3e52e2b7fa0c7..0bd5212385b87 100644 --- a/src/v/datalake/translation_task.cc +++ b/src/v/datalake/translation_task.cc @@ -46,7 +46,7 @@ ss::future< checked> translation_task::translate( const model::ntp& ntp, - std::unique_ptr writer_factory, + std::unique_ptr writer_factory, model::record_batch_reader reader, const remote_path& remote_path_prefix, retry_chain_node& rcn, diff --git a/src/v/datalake/translation_task.h b/src/v/datalake/translation_task.h index eced3f4189e9b..683e8ddfa0761 100644 --- a/src/v/datalake/translation_task.h +++ b/src/v/datalake/translation_task.h @@ -40,7 +40,7 @@ class translation_task { */ ss::future> translate( const model::ntp& ntp, - std::unique_ptr writer_factory, + std::unique_ptr writer_factory, model::record_batch_reader reader, const remote_path& remote_path_prefix, retry_chain_node& parent_rcn,