Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datalake: Convert protobuf repeated fields into Arrow lists #3

Draft
wants to merge 2 commits into
base: jcipar/proto-to-arrow
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ deb_deps=(
cmake
git
golang
libarrow-dev
libboost-all-dev
libc-ares-dev
libgssapi-krb5-2
libkrb5-dev
liblz4-dev
libparquet-dev
libprotobuf-dev
libprotoc-dev
libre2-dev
Expand Down Expand Up @@ -71,6 +73,7 @@ fedora_deps=(
golang
hwloc-devel
krb5-devel
libarrow-devel
libxml2-devel
libzstd-devel
lksctp-tools-devel
Expand All @@ -82,6 +85,7 @@ fedora_deps=(
numactl-devel
openssl
openssl-devel
parquet-libs-devel
procps
protobuf-devel
python3
Expand Down Expand Up @@ -124,8 +128,14 @@ arch_deps=(

case "$ID" in
ubuntu | debian | pop)
apt-get update
DEBIAN_FRONTEND=noninteractive apt-get install -y "${deb_deps[@]}"
export DEBIAN_FRONTEND=noninteractive
apt update
apt install -y -V ca-certificates lsb-release wget
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
rm apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
apt update
apt-get install -y "${deb_deps[@]}"
if [[ $CLEAN_PKG_CACHE == true ]]; then
rm -rf /var/lib/apt/lists/*
fi
Expand Down
1 change: 1 addition & 0 deletions licenses/third_party.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ please keep this up to date with every new library use.
| :---------- | :------------ |
| abseil | Apache License 2 |
| ada | Apache License 2 / MIT |
| arrow | Apache License 2 / MIT / Boost / BSD 2 & 3 clause / ZPL / LLVM / <https://github.com/apache/arrow/blob/main/LICENSE.txt> |
| avro | Apache License 2 |
| base64 | BSD 2 |
| boost libraries | Boost Software License Version 1.0 |
Expand Down
1 change: 1 addition & 0 deletions src/v/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ add_subdirectory(compat)
add_subdirectory(rp_util)
add_subdirectory(resource_mgmt)
add_subdirectory(migrations)
add_subdirectory(datalake)

option(ENABLE_GIT_VERSION "Build with Git metadata" OFF)

Expand Down
1 change: 1 addition & 0 deletions src/v/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ compression | utilities supporting compression/decompression of many
config | Redpanda cluster level and node level configuration options |
container | Generic Redpanda specific containers and data structures |
crypto | Middleware library used to perform cryptographic operations |
datalake | Writing Redpanda data to Iceberg |
features | Cluster feature flags for rolling upgrades |
finjector | Failure injector framework for testing and correctness |
hashing | hashing utility adaptors often used in cryptography or checksumming |
Expand Down
21 changes: 21 additions & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
find_package(Arrow REQUIRED)
find_package(Parquet REQUIRED)
find_package(Protobuf REQUIRED)


v_cc_library(
NAME datalake
SRCS
protobuf_to_arrow_converter.cc
proto_to_arrow_repeated.cc
proto_to_arrow_scalar.cc
proto_to_arrow_struct.cc
DEPS
v::storage
Seastar::seastar
Arrow::arrow_shared
Parquet::parquet_shared
protobuf::libprotobuf
)

add_subdirectory(tests)
32 changes: 32 additions & 0 deletions src/v/datalake/errors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include <stdexcept>
namespace datalake {

// TODO: Make an std::error_category instance for this
enum class arrow_converter_status {
ok,

// User errors
parse_error,

// System Errors
internal_error,
};

class initialization_error : public std::runtime_error {
public:
explicit initialization_error(const std::string& what_arg)
: std::runtime_error(what_arg) {}
};

} // namespace datalake
19 changes: 19 additions & 0 deletions src/v/datalake/logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "base/seastarx.h"

#include <seastar/util/log.hh>

namespace datalake {
inline ss::logger datalake_log("datalake");
} // namespace datalake
73 changes: 73 additions & 0 deletions src/v/datalake/proto_to_arrow_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include <arrow/api.h>
#include <arrow/array/array_base.h>
#include <arrow/status.h>
#include <arrow/type_fwd.h>

#include <memory>
#include <stdexcept>

namespace google::protobuf {
class Message;
}

namespace datalake::detail {

// This interface is used to convert a set of Protobuf messages to an Arrow
// Array. Proto messages are passed one-by-one along to this interface, and it
// builds internal state representing the Arrow array. Finally, the `finish`
// method returns the completed array.
class proto_to_arrow_interface {
public:
proto_to_arrow_interface(const proto_to_arrow_interface&) = delete;
proto_to_arrow_interface(proto_to_arrow_interface&&) = delete;
proto_to_arrow_interface& operator=(const proto_to_arrow_interface&)
= delete;
proto_to_arrow_interface& operator=(proto_to_arrow_interface&&) = delete;

proto_to_arrow_interface() = default;
virtual ~proto_to_arrow_interface() = default;

// Called on a struct message to parse an individual child field.
// We expect the given child field to match the type of the column
// represented by this converter. E.g. a proto_to_arrow_scalar<int32> would
// expect the column referred to by field_idx to be an int32 column.
virtual arrow::Status
add_child_value(const google::protobuf::Message*, int field_idx)
= 0;

/// Return an Arrow field descriptor for this Array. Used for building
/// A schema.
// The Arrow API is built around shared_ptr: the creation functions return
// shared pointers, and other expect them as arguments.
// TODO: investigate if we can change the shared_ptr type in Arrow to use
// ss::shared_ptr
virtual std::shared_ptr<arrow::Field> field(const std::string& name) = 0;

/// Return the underlying ArrayBuilder. Used when this is a child of another
/// Builder
virtual std::shared_ptr<arrow::ArrayBuilder> builder() = 0;

virtual arrow::Status finish_batch() = 0;

// Methods with defaults
std::shared_ptr<arrow::ChunkedArray> finish() {
return std::make_shared<arrow::ChunkedArray>(std::move(_values));
}

protected:
arrow::Status _arrow_status;
arrow::ArrayVector _values;
};

} // namespace datalake::detail
90 changes: 90 additions & 0 deletions src/v/datalake/proto_to_arrow_repeated.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "datalake/proto_to_arrow_repeated.h"

#include "datalake/logger.h"
#include "datalake/proto_to_arrow_struct.h"

#include <arrow/array/builder_base.h>
#include <arrow/array/builder_nested.h>
#include <arrow/scalar.h>
#include <arrow/status.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <google/protobuf/message.h>

#include <memory>

namespace datalake::detail {
proto_to_arrow_repeated::proto_to_arrow_repeated(
const google::protobuf::FieldDescriptor* desc) {
// The documentation for ListBuilder is not very detailed. From the linked
// StackOverflow and some trial-and-error, it looks like it's used like
// this:
// 1. A ListBuilder is created with a shared pointer to another
// arrow::ArrayBuilder as a child.
// 2. *Before* adding elements to the child, call Append() on the
// ListBuilder.
// 3. Call Append(val) on the child builder in the normal way.
// 4. *Do not* call Finish() onthe child builder.
// 5. Call Finish() on the ListBuilder to get the array.
//
// https://stackoverflow.com/questions/78277111/is-there-a-way-to-nest-an-arrowarray-in-apache-arrow
_child_converter = make_converter(desc, true);
_builder = std::make_shared<arrow::ListBuilder>(
arrow::default_memory_pool(), _child_converter->builder());
}

arrow::Status proto_to_arrow_repeated::add_child_value(
const google::protobuf::Message* msg, int field_idx) {
if (!_arrow_status.ok()) {
return _arrow_status;
}
_arrow_status = _builder->Append();
if (!_arrow_status.ok()) {
return _arrow_status;
}
_arrow_status = _child_converter->add_child_value(msg, field_idx);

return _arrow_status;
}

std::shared_ptr<arrow::Field>
proto_to_arrow_repeated::field(const std::string& name) {
std::shared_ptr<arrow::DataType> arrow_type = arrow::list(arrow::int32());
return arrow::field(name, arrow_type);
};

std::shared_ptr<arrow::ArrayBuilder> proto_to_arrow_repeated::builder() {
return _builder;
};

arrow::Status proto_to_arrow_repeated::finish_batch() {
std::cerr << "finish_batch with " << _builder->length() << " items\n";

if (!_arrow_status.ok()) {
return _arrow_status;
}
auto builder_result = _builder->Finish();
_arrow_status = builder_result.status();
std::shared_ptr<arrow::Array> array;
if (!_arrow_status.ok()) {
return _arrow_status;
}

// Safe because we validated the status after calling `Finish`
array = std::move(builder_result).ValueUnsafe();
std::cerr << "adding array with " << array->length()
<< " elements to _values\n";
_values.push_back(array);
return _arrow_status;
}
} // namespace datalake::detail
40 changes: 40 additions & 0 deletions src/v/datalake/proto_to_arrow_repeated.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "datalake/proto_to_arrow_interface.h"

#include <arrow/array/builder_base.h>
#include <arrow/array/builder_nested.h>
#include <arrow/status.h>
#include <google/protobuf/descriptor.h>

#include <memory>

namespace datalake::detail {

class proto_to_arrow_repeated : public proto_to_arrow_interface {
public:
proto_to_arrow_repeated(const google::protobuf::FieldDescriptor*);
arrow::Status
add_child_value(const google::protobuf::Message*, int) override;

std::shared_ptr<arrow::Field> field(const std::string&) override;

std::shared_ptr<arrow::ArrayBuilder> builder() override;

arrow::Status finish_batch() override;

private:
std::shared_ptr<arrow::ListBuilder> _builder;
std::unique_ptr<proto_to_arrow_interface> _child_converter;
};

} // namespace datalake::detail
Loading