diff --git a/src/v/cloud_topics/BUILD b/src/v/cloud_topics/BUILD index 5372da6e90a10..4ee975fc891e7 100644 --- a/src/v/cloud_topics/BUILD +++ b/src/v/cloud_topics/BUILD @@ -9,6 +9,8 @@ package(default_visibility = [ "//src/v/cloud_topics/dl_stm/tests:__pkg__", "//src/v/cloud_topics/reader:__pkg__", "//src/v/cloud_topics/reader/tests:__pkg__", + "//src/v/cloud_topics/request_balancer:__pkg__", + "//src/v/cloud_topics/request_balancer/tests:__pkg__", "//src/v/cloud_topics/tests:__pkg__", "//src/v/cloud_topics/throttler:__pkg__", "//src/v/cloud_topics/throttler/tests:__pkg__", diff --git a/src/v/cloud_topics/core/BUILD b/src/v/cloud_topics/core/BUILD index e2848809a8693..c1b55226f1d8f 100644 --- a/src/v/cloud_topics/core/BUILD +++ b/src/v/cloud_topics/core/BUILD @@ -4,6 +4,8 @@ package(default_visibility = [ "//src/v/cloud_topics/batcher:__pkg__", "//src/v/cloud_topics/batcher/tests:__pkg__", "//src/v/cloud_topics/core/tests:__pkg__", + "//src/v/cloud_topics/request_balancer:__pkg__", + "//src/v/cloud_topics/request_balancer/tests:__pkg__", "//src/v/cloud_topics/throttler:__pkg__", "//src/v/cloud_topics/throttler/tests:__pkg__", ]) diff --git a/src/v/cloud_topics/request_balancer/BUILD b/src/v/cloud_topics/request_balancer/BUILD new file mode 100644 index 0000000000000..f14b6b1841a5c --- /dev/null +++ b/src/v/cloud_topics/request_balancer/BUILD @@ -0,0 +1,26 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +package(default_visibility = ["//src/v/cloud_topics/request_balancer/tests:__pkg__"]) + +redpanda_cc_library( + name = "write_request_balancer", + srcs = [ + "write_request_balancer.cc", + ], + hdrs = [ + "write_request_balancer.h", + ], + include_prefix = "cloud_topics/request_balancer", + deps = [ + "//src/v/base", + "//src/v/bytes", + "//src/v/bytes:iobuf", + "//src/v/cloud_topics:logger", + "//src/v/cloud_topics:types", + "//src/v/cloud_topics/core:event_filter", + "//src/v/cloud_topics/core:write_pipeline", + "//src/v/cloud_topics/core:write_request", + "//src/v/ssx:future_util", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/request_balancer/README.md b/src/v/cloud_topics/request_balancer/README.md new file mode 100644 index 0000000000000..ea5d912f525f9 --- /dev/null +++ b/src/v/cloud_topics/request_balancer/README.md @@ -0,0 +1,11 @@ +### Intro + +Request balancer is responsible for distributing the load between the shards +and for limiting resource usage. + +Our previous approach was to measure the actual resource usage by every shard. +This complicates the design quite a lot. + +The alternative to this approach is to implement load balancing as an external +service. The request balancer uses policy based approach. Currently, only the +simplest possible policy is implemented. \ No newline at end of file diff --git a/src/v/cloud_topics/request_balancer/tests/BUILD b/src/v/cloud_topics/request_balancer/tests/BUILD new file mode 100644 index 0000000000000..f9617a646e23f --- /dev/null +++ b/src/v/cloud_topics/request_balancer/tests/BUILD @@ -0,0 +1,22 @@ +load("//bazel:test.bzl", "redpanda_cc_gtest") + +redpanda_cc_gtest( + name = "write_request_balancer_test", + timeout = "short", + srcs = [ + "write_request_balancer_test.cc", + ], + deps = [ + "//src/v/base", + "//src/v/cloud_topics/core:event_filter", + "//src/v/cloud_topics/core:pipeline_stage", + "//src/v/cloud_topics/core:write_pipeline", + "//src/v/cloud_topics/core:write_request", + "//src/v/cloud_topics/request_balancer:write_request_balancer", + "//src/v/model", + "//src/v/model/tests:random", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc b/src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc new file mode 100644 index 0000000000000..1af3957edad75 --- /dev/null +++ b/src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc @@ -0,0 +1,166 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cloud_topics/core/event_filter.h" +#include "cloud_topics/core/pipeline_stage.h" +#include "cloud_topics/request_balancer/write_request_balancer.h" +#include "model/fundamental.h" +#include "model/namespace.h" +#include "model/record.h" +#include "model/record_batch_reader.h" +#include "model/tests/random_batch.h" +#include "test_utils/test.h" + +#include +#include +#include +#include + +#include +#include +#include + +inline ss::logger test_log("balancer_gtest"); + +namespace cloud_topics = experimental::cloud_topics; +using namespace std::chrono_literals; + +namespace experimental::cloud_topics { + +// Sink consumes and acknowledges all write requests +// in the pipeline. +struct pipeline_sink { + explicit pipeline_sink(core::write_pipeline<>& p) + : pipeline(p) + , _id(pipeline.register_pipeline_stage()) {} + + ss::future<> start() { + ssx::background = bg_run(); + co_return; + } + + ss::future<> stop() { + _as.request_abort(); + co_await _gate.close(); + } + + ss::future<> bg_run() { + auto h = _gate.hold(); + while (!_as.abort_requested()) { + vlog(test_log.debug, "pipeline_sink subscribe, stage id {}", _id); + core::event_filter<> flt(core::event_type::new_write_request, _id); + auto sub = _as.subscribe( + [&flt](const std::optional&) noexcept { + flt.cancel(); + }); + auto fut = co_await ss::coroutine::as_future( + pipeline.subscribe(flt)); + vlog(test_log.debug, "pipeline_sink event"); + if (fut.failed()) { + vlog( + test_log.error, + "Event subscription failed: {}", + fut.get_exception()); + continue; + } + auto event = fut.get(); + if (event.type != core::event_type::new_write_request) { + co_return; + } + // Vacuum all write requests + auto result = pipeline.get_write_requests( + std::numeric_limits::max(), _id); + for (auto& r : result.ready) { + // Set empty result to unblock the caller + r.set_value( + ss::circular_buffer< + model::record_batch>{}); // TODO: return some random batch + write_requests_acked++; + } + } + } + + core::write_pipeline<>& pipeline; + core::pipeline_stage _id; + ss::gate _gate; + ss::abort_source _as; + size_t write_requests_acked{0}; +}; + +/// Balancing policy that redirects all write requests +/// to shard 1 +class shard_one_balancing_policy : public balancing_policy { +public: + void rebalance(std::vector& shards) override { + shards.front().shard = 1; + } +}; + +} // namespace experimental::cloud_topics + +class write_request_balancer_fixture : public seastar_test { +public: + ss::future<> start() { + vlog(test_log.info, "Creating pipeline"); + co_await pipeline.start(); + + // Start the balancer + vlog(test_log.info, "Creating balancer"); + co_await balancer.start( + ss::sharded_parameter([this] { return std::ref(pipeline.local()); }), + ss::sharded_parameter([] { + return std::make_unique< + cloud_topics::shard_one_balancing_policy>(); + })); + vlog(test_log.info, "Starting balancer"); + co_await balancer.invoke_on_all( + [](cloud_topics::write_request_balancer& bal) { + return bal.start(); + }); + + // Start the sink + vlog(test_log.info, "Creating request_sink"); + co_await request_sink.start( + ss::sharded_parameter([this] { return std::ref(pipeline.local()); })); + vlog(test_log.info, "Starting request_sink"); + co_await request_sink.invoke_on_all( + [](cloud_topics::pipeline_sink& sink) { return sink.start(); }); + } + + ss::future<> stop() { + vlog(test_log.info, "Stopping request_sink"); + co_await request_sink.stop(); + vlog(test_log.info, "Stopping balancer"); + co_await balancer.stop(); + vlog(test_log.info, "Stopping pipeline"); + co_await pipeline.stop(); + } + + ss::sharded> pipeline; + ss::sharded balancer; + ss::sharded request_sink; +}; + +static const model::ntp test_ntp( + model::kafka_namespace, + model::topic_partition(model::topic("tapioca"), model::partition_id(42))); + +TEST_F_CORO(write_request_balancer_fixture, smoke_test) { + ASSERT_TRUE_CORO(ss::smp::count > 1); + + co_await start(); + + auto buf = co_await model::test::make_random_batches(); + auto reader = model::make_memory_record_batch_reader(std::move(buf)); + + auto placeholders = co_await pipeline.local().write_and_debounce( + test_ntp, std::move(reader), 1s); + + co_await stop(); +} diff --git a/src/v/cloud_topics/request_balancer/write_request_balancer.cc b/src/v/cloud_topics/request_balancer/write_request_balancer.cc new file mode 100644 index 0000000000000..e6f3adb43fa2d --- /dev/null +++ b/src/v/cloud_topics/request_balancer/write_request_balancer.cc @@ -0,0 +1,198 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/request_balancer/write_request_balancer.h" + +#include "base/unreachable.h" +#include "cloud_topics/core/write_pipeline.h" +#include "cloud_topics/core/write_request.h" +#include "cloud_topics/logger.h" +#include "ssx/future-util.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace experimental::cloud_topics { + +void dummy_balancing_policy::rebalance( + std::vector& shards) { + std::sort( + shards.begin(), + shards.end(), + []( + const shard_resource_utilization& lhs, + const shard_resource_utilization& rhs) { + return lhs.shard < rhs.shard; + }); +} + +write_request_balancer::write_request_balancer( + core::write_pipeline<>& pipeline, std::unique_ptr policy) + : _pipeline(pipeline) + , _my_stage(_pipeline.register_pipeline_stage()) + , _policy(std::move(policy)) { + for (auto cpu : ss::smp::all_cpus()) { + _shards.push_back({.shard = cpu}); + } +} + +ss::future<> write_request_balancer::start() { + ssx::repeat_until_gate_closed_or_aborted( + _gate, _as, [this] { return run_bg(); }); + co_return; +} + +ss::future<> write_request_balancer::stop() { + _as.request_abort(); + return _gate.close(); +} + +ss::future<> write_request_balancer::run_bg() noexcept { + core::event_filter filter( + core::event_type::new_write_request, _my_stage); + auto event = co_await _pipeline.subscribe(filter, _as); + switch (event.type) { + case core::event_type::shutting_down: + co_return; + case core::event_type::err_timedout: + case core::event_type::new_read_request: + case core::event_type::none: + unreachable(); + case core::event_type::new_write_request: + break; + } + auto fut = co_await ss::coroutine::as_future(run_once()); + if (fut.failed()) { + auto ep = fut.get_exception(); + if (!ssx::is_shutdown_exception(ep)) { + vlog(cd_log.error, "Failed to proxy write request: {}", ep); + } + } else if (fut.get().has_failure()) { + vlog( + cd_log.error, "Failed to proxy write request: {}", fut.get().error()); + } +} + +static inline core::serialized_chunk +deep_copy(const core::serialized_chunk& chunk) { + core::serialized_chunk copy; + copy.batches = chunk.batches.copy(); + copy.payload = chunk.payload.copy(); + return copy; +} + +static inline ss::circular_buffer +deep_copy(const ss::circular_buffer& batches) { + ss::circular_buffer res; + for (const auto& b : batches) { + res.push_back(b.copy()); + } + return res; +} + +ss::future> write_request_balancer::run_once() noexcept { + _policy->rebalance(_shards); + if (_shards.front().shard == ss::this_shard_id()) { + // Fast path, no need to rebalance requests. + _pipeline.process_stage( + [](core::write_request<>&) noexcept + -> checked { + return core::get_write_requests_result{ + .stop_iteration = ss::stop_iteration::no, + .advance_next_stage = true, + }; + }, + _my_stage); + co_return false; + } + auto requests = _pipeline.get_write_requests( + std::numeric_limits::max(), _my_stage); + // For every request: + // - make a proxy on a target shard + // - submit it to the pipeline on that shard bypassing + // the load balancer + // - extract result from the proxy request and put it + // into the original one + for (auto& request : requests.ready) { + // The request is stored on the stack of one of the fibers and its + // lifetime is defined by the promise that it contains. The request will + // be alive until the promise is set. After its set all bets are off. + // Since the request is extracted out of the pipeline nothing can set + // the promise except the resource_balancer. + auto target_shard = _shards.front().shard; + // TODO: use zero copy mechanism + // NOTE: it's OK to background because the pipeline has throttler which + // handles memory pressure + ssx::background = roundtrip(target_shard, request); + } + co_return true; +} + +ss::future<> write_request_balancer::roundtrip( + ss::shard_id shard, core::write_request<>& req) { + vassert(shard != ss::this_shard_id(), "Can't roundtrip using one shard"); + auto resp = co_await container().invoke_on( + shard, [&req](write_request_balancer& balancer) { + return balancer.proxy_write_request(&req); + }); + ack_write_response(&req, std::move(resp)); +} + +ss::future> +write_request_balancer::proxy_write_request(const core::write_request<>* req) { + // The request was created on another shard. + auto timeout = std::chrono::duration_cast( + req->expiration_time - req->ingestion_time); + core::write_request<> proxy( + req->ntp, deep_copy(req->data_chunk), timeout, req->stage); + auto fut = proxy.response.get_future(); + _pipeline.reenqueue(proxy); + auto batches_fut = co_await ss::coroutine::as_future(std::move(fut)); + if (batches_fut.failed()) { + vlog( + cd_log.error, + "Proxy write request failed: {}", + batches_fut.get_exception()); + co_return outcome::failure(errc::upload_failure); + } + auto batches = batches_fut.get(); + if (batches.has_error()) { + // Normal errors (S3 upload failure or timeout) + // are handled here + vlog(cd_log.error, "Proxy write request failed: {}", batches.error()); + co_return outcome::failure(batches.error()); + } + auto ptr = ss::make_lw_shared>( + std::move(batches.value())); + co_return outcome::success(ss::make_foreign(std::move(ptr))); +} + +void write_request_balancer::ack_write_response( + core::write_request<>* req, checked resp) { + // The response was created on another shard. + // The req was created on this shard. + // TODO: use zero copy + if (resp.has_error()) { + req->set_value(resp.error()); + } else { + auto batches = deep_copy(*resp.value()); + req->set_value(std::move(batches)); + } +} + +} // namespace experimental::cloud_topics diff --git a/src/v/cloud_topics/request_balancer/write_request_balancer.h b/src/v/cloud_topics/request_balancer/write_request_balancer.h new file mode 100644 index 0000000000000..129abf478c9b2 --- /dev/null +++ b/src/v/cloud_topics/request_balancer/write_request_balancer.h @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "base/seastarx.h" +#include "cloud_topics/core/write_pipeline.h" +#include "cloud_topics/core/write_request.h" + +#include +#include +#include +#include +#include + +#include + +namespace experimental::cloud_topics { + +/// Current resource utilization of the shard. +/// The instances of this struct are exchanged by all shards. +struct shard_resource_utilization { + unsigned shard{ss::this_shard_id()}; + // TODO: implement + // NOTE: currently, the monitoring of the 'remote' is implemented by + // the 'cloud_storage::remote' and not available through the + // 'cloud_io::remote'. We should move this mechanism to the 'cloud_io' + // subsystem and then use it here to collect resource utilization + // information. +}; + +class balancing_policy { +public: + balancing_policy() = default; + balancing_policy(const balancing_policy&) = default; + balancing_policy(balancing_policy&&) = default; + balancing_policy& operator=(const balancing_policy&) = default; + balancing_policy& operator=(balancing_policy&&) = default; + virtual ~balancing_policy() = default; + + /// Reorder the vector so the shard that should be chosen first + /// goes first etc. + virtual void rebalance(std::vector&) = 0; +}; + +/// Balancing policy which choses shards based on shard ids (prefers pushing +/// write requests to shard 0). +class dummy_balancing_policy : public balancing_policy { +public: + void rebalance(std::vector&) override; +}; + +struct resource_balancer_accessor; + +/// The resource balancer is monitoring the write_pipeline and redirects +/// write requests to the correct shard. +class write_request_balancer + : public ss::peering_sharded_service { + using request_ptr = ss::weak_ptr>; + + friend struct resource_balancer_accessor; + +public: + write_request_balancer( + core::write_pipeline<>& pipeline, + std::unique_ptr policy); + + ss::future<> start(); + + ss::future<> stop(); + +private: + /// Run the load balancing for all available write requests + ss::future> run_once() noexcept; + + /// Run background loop + ss::future<> run_bg() noexcept; + + using foreign_resp_ptr = ss::foreign_ptr< + ss::lw_shared_ptr>>; + + /// Make a copy of the write request and enqueue it + ss::future> + proxy_write_request(const core::write_request<>* req); + + void ack_write_response( + core::write_request<>* req, checked resp); + + ss::future<> roundtrip(ss::shard_id shard, core::write_request<>& req); + + std::vector _shards; + core::write_pipeline<>& _pipeline; + core::pipeline_stage _my_stage; + std::unique_ptr _policy; + ss::gate _gate; + ss::abort_source _as; +}; + +} // namespace experimental::cloud_topics