Skip to content

Commit

Permalink
ct: Add request_balancer component
Browse files Browse the repository at this point in the history
The request balancer is used to move write requests betwen shards.
Current implementation is very basic. The intention is to show how this
could be implemented. The balancing policy can be swapped but shards do
not exchange any information to make informed decisions.

The request_balancer only implements a basic data flow. The
write_request instance is extracted from the write_pipeline and then
submitted to another shard (through a proxy object). The result is
propagated back to the original write request and eventually to the
caller.

The fast path in no op. Initially, we can just use basic balancing
policy that disables actual balancing and leaves all write requests on
the original shard. In the future we need to add code that propagates
the information needed to make descisions (by utilizing existing
mechanism in the cloud_storage::remote) and use it to make more optimal
decisions.

Signed-off-by: Evgeny Lazin <[email protected]>
  • Loading branch information
Lazin committed Nov 13, 2024
1 parent 9a06509 commit 6be29e6
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package(default_visibility = [
"//src/v/cloud_topics/dl_stm/tests:__pkg__",
"//src/v/cloud_topics/reader:__pkg__",
"//src/v/cloud_topics/reader/tests:__pkg__",
"//src/v/cloud_topics/request_balancer:__pkg__",
"//src/v/cloud_topics/request_balancer/tests:__pkg__",
"//src/v/cloud_topics/tests:__pkg__",
"//src/v/cloud_topics/throttler:__pkg__",
"//src/v/cloud_topics/throttler/tests:__pkg__",
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_topics/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package(default_visibility = [
"//src/v/cloud_topics/batcher:__pkg__",
"//src/v/cloud_topics/batcher/tests:__pkg__",
"//src/v/cloud_topics/core/tests:__pkg__",
"//src/v/cloud_topics/request_balancer:__pkg__",
"//src/v/cloud_topics/request_balancer/tests:__pkg__",
"//src/v/cloud_topics/throttler:__pkg__",
"//src/v/cloud_topics/throttler/tests:__pkg__",
])
Expand Down
26 changes: 26 additions & 0 deletions src/v/cloud_topics/request_balancer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/request_balancer/tests:__pkg__"])

redpanda_cc_library(
name = "write_request_balancer",
srcs = [
"write_request_balancer.cc",
],
hdrs = [
"write_request_balancer.h",
],
include_prefix = "cloud_topics/request_balancer",
deps = [
"//src/v/base",
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/core:event_filter",
"//src/v/cloud_topics/core:write_pipeline",
"//src/v/cloud_topics/core:write_request",
"//src/v/ssx:future_util",
"@seastar",
],
)
11 changes: 11 additions & 0 deletions src/v/cloud_topics/request_balancer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### Intro

Request balancer is responsible for distributing the load between the shards
and for limiting resource usage.

Our previous approach was to measure the actual resource usage by every shard.
This complicates the design quite a lot.

The alternative to this approach is to implement load balancing as an external
service. The request balancer uses policy based approach. Currently, only the
simplest possible policy is implemented.
22 changes: 22 additions & 0 deletions src/v/cloud_topics/request_balancer/tests/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("//bazel:test.bzl", "redpanda_cc_gtest")

redpanda_cc_gtest(
name = "write_request_balancer_test",
timeout = "short",
srcs = [
"write_request_balancer_test.cc",
],
deps = [
"//src/v/base",
"//src/v/cloud_topics/core:event_filter",
"//src/v/cloud_topics/core:pipeline_stage",
"//src/v/cloud_topics/core:write_pipeline",
"//src/v/cloud_topics/core:write_request",
"//src/v/cloud_topics/request_balancer:write_request_balancer",
"//src/v/model",
"//src/v/model/tests:random",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
"@seastar",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "cloud_topics/core/event_filter.h"
#include "cloud_topics/core/pipeline_stage.h"
#include "cloud_topics/request_balancer/write_request_balancer.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/record.h"
#include "model/record_batch_reader.h"
#include "model/tests/random_batch.h"
#include "test_utils/test.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/coroutine/as_future.hh>

#include <chrono>
#include <exception>
#include <limits>

inline ss::logger test_log("balancer_gtest");

namespace cloud_topics = experimental::cloud_topics;
using namespace std::chrono_literals;

namespace experimental::cloud_topics {

// Sink consumes and acknowledges all write requests
// in the pipeline.
struct pipeline_sink {
explicit pipeline_sink(core::write_pipeline<>& p)
: pipeline(p)
, _id(pipeline.register_pipeline_stage()) {}

ss::future<> start() {
ssx::background = bg_run();
co_return;
}

ss::future<> stop() {
_as.request_abort();
co_await _gate.close();
}

ss::future<> bg_run() {
auto h = _gate.hold();
while (!_as.abort_requested()) {
vlog(test_log.debug, "pipeline_sink subscribe, stage id {}", _id);
core::event_filter<> flt(core::event_type::new_write_request, _id);
auto sub = _as.subscribe(
[&flt](const std::optional<std::exception_ptr>&) noexcept {
flt.cancel();
});
auto fut = co_await ss::coroutine::as_future(
pipeline.subscribe(flt));
vlog(test_log.debug, "pipeline_sink event");
if (fut.failed()) {
vlog(
test_log.error,
"Event subscription failed: {}",
fut.get_exception());
continue;
}
auto event = fut.get();
if (event.type != core::event_type::new_write_request) {
co_return;
}
// Vacuum all write requests
auto result = pipeline.get_write_requests(
std::numeric_limits<size_t>::max(), _id);
for (auto& r : result.ready) {
// Set empty result to unblock the caller
r.set_value(
ss::circular_buffer<
model::record_batch>{}); // TODO: return some random batch
write_requests_acked++;
}
}
}

core::write_pipeline<>& pipeline;
core::pipeline_stage _id;
ss::gate _gate;
ss::abort_source _as;
size_t write_requests_acked{0};
};

/// Balancing policy that redirects all write requests
/// to shard 1
class shard_one_balancing_policy : public balancing_policy {
public:
void rebalance(std::vector<shard_resource_utilization>& shards) override {
shards.front().shard = 1;
}
};

} // namespace experimental::cloud_topics

class write_request_balancer_fixture : public seastar_test {
public:
ss::future<> start() {
vlog(test_log.info, "Creating pipeline");
co_await pipeline.start();

// Start the balancer
vlog(test_log.info, "Creating balancer");
co_await balancer.start(
ss::sharded_parameter([this] { return std::ref(pipeline.local()); }),
ss::sharded_parameter([] {
return std::make_unique<
cloud_topics::shard_one_balancing_policy>();
}));
vlog(test_log.info, "Starting balancer");
co_await balancer.invoke_on_all(
[](cloud_topics::write_request_balancer& bal) {
return bal.start();
});

// Start the sink
vlog(test_log.info, "Creating request_sink");
co_await request_sink.start(
ss::sharded_parameter([this] { return std::ref(pipeline.local()); }));
vlog(test_log.info, "Starting request_sink");
co_await request_sink.invoke_on_all(
[](cloud_topics::pipeline_sink& sink) { return sink.start(); });
}

ss::future<> stop() {
vlog(test_log.info, "Stopping request_sink");
co_await request_sink.stop();
vlog(test_log.info, "Stopping balancer");
co_await balancer.stop();
vlog(test_log.info, "Stopping pipeline");
co_await pipeline.stop();
}

ss::sharded<cloud_topics::core::write_pipeline<>> pipeline;
ss::sharded<cloud_topics::write_request_balancer> balancer;
ss::sharded<cloud_topics::pipeline_sink> request_sink;
};

static const model::ntp test_ntp(
model::kafka_namespace,
model::topic_partition(model::topic("tapioca"), model::partition_id(42)));

TEST_F_CORO(write_request_balancer_fixture, smoke_test) {
ASSERT_TRUE_CORO(ss::smp::count > 1);

co_await start();

auto buf = co_await model::test::make_random_batches();
auto reader = model::make_memory_record_batch_reader(std::move(buf));

auto placeholders = co_await pipeline.local().write_and_debounce(
test_ntp, std::move(reader), 1s);

co_await stop();
}
Loading

0 comments on commit 6be29e6

Please sign in to comment.