Skip to content

Commit

Permalink
Merge pull request #23919 from Lazin/ct/write-pipeline-and-throttler
Browse files Browse the repository at this point in the history
ct: Add `write_pipeline` and `throttler` components
  • Loading branch information
Lazin authored Dec 2, 2024
2 parents 8ebd54a + afb40e8 commit 38801dd
Show file tree
Hide file tree
Showing 30 changed files with 2,011 additions and 358 deletions.
4 changes: 4 additions & 0 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ load("//bazel:build.bzl", "redpanda_cc_library")
package(default_visibility = [
"//src/v/cloud_topics/batcher:__pkg__",
"//src/v/cloud_topics/batcher/tests:__pkg__",
"//src/v/cloud_topics/core:__pkg__",
"//src/v/cloud_topics/core/tests:__pkg__",
"//src/v/cloud_topics/dl_stm:__pkg__",
"//src/v/cloud_topics/dl_stm/tests:__pkg__",
"//src/v/cloud_topics/reader:__pkg__",
"//src/v/cloud_topics/reader/tests:__pkg__",
"//src/v/cloud_topics/tests:__pkg__",
"//src/v/cloud_topics/throttler:__pkg__",
"//src/v/cloud_topics/throttler/tests:__pkg__",
])

redpanda_cc_library(
Expand Down
52 changes: 7 additions & 45 deletions src/v/cloud_topics/batcher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,6 @@ load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/batcher/tests:__pkg__"])

redpanda_cc_library(
name = "serializer",
srcs = [
"serializer.cc",
],
hdrs = [
"serializer.h",
],
implementation_deps = [
"//src/v/storage:record_batch_utils",
],
include_prefix = "cloud_topics/batcher",
deps = [
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/container:fragmented_vector",
"//src/v/model",
],
)

redpanda_cc_library(
name = "write_request",
srcs = [
"write_request.cc",
],
hdrs = [
"write_request.h",
],
implementation_deps = [
"//src/v/cloud_topics:logger",
],
include_prefix = "cloud_topics/batcher",
deps = [
":serializer",
"//src/v/base",
"//src/v/cloud_topics:types",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "aggregator",
srcs = [
Expand All @@ -52,16 +11,16 @@ redpanda_cc_library(
"aggregator.h",
],
implementation_deps = [
":serializer",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics:placeholder",
"//src/v/cloud_topics/core:serializer",
"//src/v/storage:record_batch_builder",
],
include_prefix = "cloud_topics/batcher",
deps = [
":write_request",
"//src/v/base",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/core:write_request",
"//src/v/container:fragmented_vector",
"//src/v/model",
"@abseil-cpp//absl/container:btree",
Expand All @@ -81,7 +40,7 @@ redpanda_cc_library(
"//src/v/cloud_io:remote",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics/batcher:aggregator",
"//src/v/cloud_topics/batcher:serializer",
"//src/v/cloud_topics/core:serializer",
"//src/v/ssx:sformat",
"//src/v/utils:human",
],
Expand All @@ -91,7 +50,10 @@ redpanda_cc_library(
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/batcher:write_request",
"//src/v/cloud_topics/core:event_filter",
"//src/v/cloud_topics/core:pipeline_stage",
"//src/v/cloud_topics/core:write_pipeline",
"//src/v/cloud_topics/core:write_request",
"//src/v/config",
"//src/v/model",
"//src/v/utils:retry_chain_node",
Expand Down
17 changes: 9 additions & 8 deletions src/v/cloud_topics/batcher/aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

#include "cloud_topics/batcher/aggregator.h"

#include "cloud_topics/batcher/serializer.h"
#include "cloud_topics/batcher/write_request.h"
#include "cloud_topics/core/serializer.h"
#include "cloud_topics/core/write_request.h"
#include "cloud_topics/dl_placeholder.h"
#include "storage/record_batch_builder.h"

#include <seastar/core/future.hh>
#include <seastar/util/defer.hh>

namespace experimental::cloud_topics::details {
namespace experimental::cloud_topics {

template<class Clock>
aggregator<Clock>::aggregator(object_id id)
Expand Down Expand Up @@ -53,8 +53,8 @@ namespace {
template<class Clock>
void make_dl_placeholder_batches(
prepared_placeholder_batches<Clock>& ctx,
write_request<Clock>& req,
const serialized_chunk& chunk) {
core::write_request<Clock>& req,
const core::serialized_chunk& chunk) {
auto result = std::make_unique<batches_for_req<Clock>>();
for (const auto& b : chunk.batches) {
dl_placeholder placeholder{
Expand Down Expand Up @@ -164,10 +164,11 @@ void aggregator<Clock>::ack_error(errc e) {
}

template<class Clock>
void aggregator<Clock>::add(write_request<Clock>& req) {
void aggregator<Clock>::add(core::write_request<Clock>& req) {
auto it = _staging.find(req.ntp);
if (it == _staging.end()) {
it = _staging.emplace_hint(it, req.ntp, write_request_list<Clock>());
it = _staging.emplace_hint(
it, req.ntp, core::write_request_list<Clock>());
}
req._hook.unlink();
it->second.push_back(req);
Expand All @@ -181,4 +182,4 @@ size_t aggregator<Clock>::size_bytes() const noexcept {

template class aggregator<ss::lowres_clock>;
template class aggregator<ss::manual_clock>;
} // namespace experimental::cloud_topics::details
} // namespace experimental::cloud_topics
13 changes: 7 additions & 6 deletions src/v/cloud_topics/batcher/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#pragma once

#include "base/seastarx.h"
#include "cloud_topics/batcher/write_request.h"
#include "cloud_topics/core/write_request.h"
#include "cloud_topics/errc.h"
#include "cloud_topics/types.h"
#include "container/fragmented_vector.h"
Expand All @@ -22,7 +22,7 @@

#include <absl/container/btree_map.h>

namespace experimental::cloud_topics::details {
namespace experimental::cloud_topics {

/// List of placeholder batches that has to be propagated
/// to the particular write request.
Expand All @@ -31,7 +31,7 @@ struct batches_for_req {
/// Generated placeholder batches
ss::circular_buffer<model::record_batch> placeholders;
/// Source write request
ss::weak_ptr<write_request<Clock>> ref;
ss::weak_ptr<core::write_request<Clock>> ref;
};

// This component aggregates a bunch of write
Expand All @@ -53,7 +53,7 @@ class aggregator {
/// included into L0 object. The size value returned by
/// the 'size_bytes' call will not match the actual size
/// of the object.
void add(write_request<Clock>& req);
void add(core::write_request<Clock>& req);

/// Estimate L0 object size
size_t size_bytes() const noexcept;
Expand All @@ -77,11 +77,12 @@ class aggregator {
iobuf get_stream();

object_id _id;

/// Source data for the aggregator
absl::btree_map<model::ntp, write_request_list<Clock>> _staging;
absl::btree_map<model::ntp, core::write_request_list<Clock>> _staging;
/// Prepared placeholders
chunked_vector<std::unique_ptr<batches_for_req<Clock>>> _aggregated;
size_t _size_bytes{0};
};

} // namespace experimental::cloud_topics::details
} // namespace experimental::cloud_topics
Loading

0 comments on commit 38801dd

Please sign in to comment.