From 6be29e6b0ac427390b4fbc9033ee112d7edd5861 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Mon, 11 Nov 2024 14:54:48 -0500 Subject: [PATCH] ct: Add request_balancer component The request balancer is used to move write requests betwen shards. Current implementation is very basic. The intention is to show how this could be implemented. The balancing policy can be swapped but shards do not exchange any information to make informed decisions. The request_balancer only implements a basic data flow. The write_request instance is extracted from the write_pipeline and then submitted to another shard (through a proxy object). The result is propagated back to the original write request and eventually to the caller. The fast path in no op. Initially, we can just use basic balancing policy that disables actual balancing and leaves all write requests on the original shard. In the future we need to add code that propagates the information needed to make descisions (by utilizing existing mechanism in the cloud_storage::remote) and use it to make more optimal decisions. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- src/v/cloud_topics/BUILD | 2 + src/v/cloud_topics/core/BUILD | 2 + src/v/cloud_topics/request_balancer/BUILD | 26 +++ src/v/cloud_topics/request_balancer/README.md | 11 + .../cloud_topics/request_balancer/tests/BUILD | 22 ++ .../tests/write_request_balancer_test.cc | 166 +++++++++++++++ .../write_request_balancer.cc | 198 ++++++++++++++++++ .../request_balancer/write_request_balancer.h | 106 ++++++++++ 8 files changed, 533 insertions(+) create mode 100644 src/v/cloud_topics/request_balancer/BUILD create mode 100644 src/v/cloud_topics/request_balancer/README.md create mode 100644 src/v/cloud_topics/request_balancer/tests/BUILD create mode 100644 src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc create mode 100644 src/v/cloud_topics/request_balancer/write_request_balancer.cc create mode 100644 src/v/cloud_topics/request_balancer/write_request_balancer.h diff --git a/src/v/cloud_topics/BUILD b/src/v/cloud_topics/BUILD index 5372da6e90a10..4ee975fc891e7 100644 --- a/src/v/cloud_topics/BUILD +++ b/src/v/cloud_topics/BUILD @@ -9,6 +9,8 @@ package(default_visibility = [ "//src/v/cloud_topics/dl_stm/tests:__pkg__", "//src/v/cloud_topics/reader:__pkg__", "//src/v/cloud_topics/reader/tests:__pkg__", + "//src/v/cloud_topics/request_balancer:__pkg__", + "//src/v/cloud_topics/request_balancer/tests:__pkg__", "//src/v/cloud_topics/tests:__pkg__", "//src/v/cloud_topics/throttler:__pkg__", "//src/v/cloud_topics/throttler/tests:__pkg__", diff --git a/src/v/cloud_topics/core/BUILD b/src/v/cloud_topics/core/BUILD index e2848809a8693..c1b55226f1d8f 100644 --- a/src/v/cloud_topics/core/BUILD +++ b/src/v/cloud_topics/core/BUILD @@ -4,6 +4,8 @@ package(default_visibility = [ "//src/v/cloud_topics/batcher:__pkg__", "//src/v/cloud_topics/batcher/tests:__pkg__", "//src/v/cloud_topics/core/tests:__pkg__", + "//src/v/cloud_topics/request_balancer:__pkg__", + "//src/v/cloud_topics/request_balancer/tests:__pkg__", "//src/v/cloud_topics/throttler:__pkg__", "//src/v/cloud_topics/throttler/tests:__pkg__", ]) diff --git a/src/v/cloud_topics/request_balancer/BUILD b/src/v/cloud_topics/request_balancer/BUILD new file mode 100644 index 0000000000000..f14b6b1841a5c --- /dev/null +++ b/src/v/cloud_topics/request_balancer/BUILD @@ -0,0 +1,26 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +package(default_visibility = ["//src/v/cloud_topics/request_balancer/tests:__pkg__"]) + +redpanda_cc_library( + name = "write_request_balancer", + srcs = [ + "write_request_balancer.cc", + ], + hdrs = [ + "write_request_balancer.h", + ], + include_prefix = "cloud_topics/request_balancer", + deps = [ + "//src/v/base", + "//src/v/bytes", + "//src/v/bytes:iobuf", + "//src/v/cloud_topics:logger", + "//src/v/cloud_topics:types", + "//src/v/cloud_topics/core:event_filter", + "//src/v/cloud_topics/core:write_pipeline", + "//src/v/cloud_topics/core:write_request", + "//src/v/ssx:future_util", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/request_balancer/README.md b/src/v/cloud_topics/request_balancer/README.md new file mode 100644 index 0000000000000..ea5d912f525f9 --- /dev/null +++ b/src/v/cloud_topics/request_balancer/README.md @@ -0,0 +1,11 @@ +### Intro + +Request balancer is responsible for distributing the load between the shards +and for limiting resource usage. + +Our previous approach was to measure the actual resource usage by every shard. +This complicates the design quite a lot. + +The alternative to this approach is to implement load balancing as an external +service. The request balancer uses policy based approach. Currently, only the +simplest possible policy is implemented. \ No newline at end of file diff --git a/src/v/cloud_topics/request_balancer/tests/BUILD b/src/v/cloud_topics/request_balancer/tests/BUILD new file mode 100644 index 0000000000000..f9617a646e23f --- /dev/null +++ b/src/v/cloud_topics/request_balancer/tests/BUILD @@ -0,0 +1,22 @@ +load("//bazel:test.bzl", "redpanda_cc_gtest") + +redpanda_cc_gtest( + name = "write_request_balancer_test", + timeout = "short", + srcs = [ + "write_request_balancer_test.cc", + ], + deps = [ + "//src/v/base", + "//src/v/cloud_topics/core:event_filter", + "//src/v/cloud_topics/core:pipeline_stage", + "//src/v/cloud_topics/core:write_pipeline", + "//src/v/cloud_topics/core:write_request", + "//src/v/cloud_topics/request_balancer:write_request_balancer", + "//src/v/model", + "//src/v/model/tests:random", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc b/src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc new file mode 100644 index 0000000000000..1af3957edad75 --- /dev/null +++ b/src/v/cloud_topics/request_balancer/tests/write_request_balancer_test.cc @@ -0,0 +1,166 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cloud_topics/core/event_filter.h" +#include "cloud_topics/core/pipeline_stage.h" +#include "cloud_topics/request_balancer/write_request_balancer.h" +#include "model/fundamental.h" +#include "model/namespace.h" +#include "model/record.h" +#include "model/record_batch_reader.h" +#include "model/tests/random_batch.h" +#include "test_utils/test.h" + +#include +#include +#include +#include + +#include +#include +#include + +inline ss::logger test_log("balancer_gtest"); + +namespace cloud_topics = experimental::cloud_topics; +using namespace std::chrono_literals; + +namespace experimental::cloud_topics { + +// Sink consumes and acknowledges all write requests +// in the pipeline. +struct pipeline_sink { + explicit pipeline_sink(core::write_pipeline<>& p) + : pipeline(p) + , _id(pipeline.register_pipeline_stage()) {} + + ss::future<> start() { + ssx::background = bg_run(); + co_return; + } + + ss::future<> stop() { + _as.request_abort(); + co_await _gate.close(); + } + + ss::future<> bg_run() { + auto h = _gate.hold(); + while (!_as.abort_requested()) { + vlog(test_log.debug, "pipeline_sink subscribe, stage id {}", _id); + core::event_filter<> flt(core::event_type::new_write_request, _id); + auto sub = _as.subscribe( + [&flt](const std::optional&) noexcept { + flt.cancel(); + }); + auto fut = co_await ss::coroutine::as_future( + pipeline.subscribe(flt)); + vlog(test_log.debug, "pipeline_sink event"); + if (fut.failed()) { + vlog( + test_log.error, + "Event subscription failed: {}", + fut.get_exception()); + continue; + } + auto event = fut.get(); + if (event.type != core::event_type::new_write_request) { + co_return; + } + // Vacuum all write requests + auto result = pipeline.get_write_requests( + std::numeric_limits::max(), _id); + for (auto& r : result.ready) { + // Set empty result to unblock the caller + r.set_value( + ss::circular_buffer< + model::record_batch>{}); // TODO: return some random batch + write_requests_acked++; + } + } + } + + core::write_pipeline<>& pipeline; + core::pipeline_stage _id; + ss::gate _gate; + ss::abort_source _as; + size_t write_requests_acked{0}; +}; + +/// Balancing policy that redirects all write requests +/// to shard 1 +class shard_one_balancing_policy : public balancing_policy { +public: + void rebalance(std::vector& shards) override { + shards.front().shard = 1; + } +}; + +} // namespace experimental::cloud_topics + +class write_request_balancer_fixture : public seastar_test { +public: + ss::future<> start() { + vlog(test_log.info, "Creating pipeline"); + co_await pipeline.start(); + + // Start the balancer + vlog(test_log.info, "Creating balancer"); + co_await balancer.start( + ss::sharded_parameter([this] { return std::ref(pipeline.local()); }), + ss::sharded_parameter([] { + return std::make_unique< + cloud_topics::shard_one_balancing_policy>(); + })); + vlog(test_log.info, "Starting balancer"); + co_await balancer.invoke_on_all( + [](cloud_topics::write_request_balancer& bal) { + return bal.start(); + }); + + // Start the sink + vlog(test_log.info, "Creating request_sink"); + co_await request_sink.start( + ss::sharded_parameter([this] { return std::ref(pipeline.local()); })); + vlog(test_log.info, "Starting request_sink"); + co_await request_sink.invoke_on_all( + [](cloud_topics::pipeline_sink& sink) { return sink.start(); }); + } + + ss::future<> stop() { + vlog(test_log.info, "Stopping request_sink"); + co_await request_sink.stop(); + vlog(test_log.info, "Stopping balancer"); + co_await balancer.stop(); + vlog(test_log.info, "Stopping pipeline"); + co_await pipeline.stop(); + } + + ss::sharded> pipeline; + ss::sharded balancer; + ss::sharded request_sink; +}; + +static const model::ntp test_ntp( + model::kafka_namespace, + model::topic_partition(model::topic("tapioca"), model::partition_id(42))); + +TEST_F_CORO(write_request_balancer_fixture, smoke_test) { + ASSERT_TRUE_CORO(ss::smp::count > 1); + + co_await start(); + + auto buf = co_await model::test::make_random_batches(); + auto reader = model::make_memory_record_batch_reader(std::move(buf)); + + auto placeholders = co_await pipeline.local().write_and_debounce( + test_ntp, std::move(reader), 1s); + + co_await stop(); +} diff --git a/src/v/cloud_topics/request_balancer/write_request_balancer.cc b/src/v/cloud_topics/request_balancer/write_request_balancer.cc new file mode 100644 index 0000000000000..e6f3adb43fa2d --- /dev/null +++ b/src/v/cloud_topics/request_balancer/write_request_balancer.cc @@ -0,0 +1,198 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/request_balancer/write_request_balancer.h" + +#include "base/unreachable.h" +#include "cloud_topics/core/write_pipeline.h" +#include "cloud_topics/core/write_request.h" +#include "cloud_topics/logger.h" +#include "ssx/future-util.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace experimental::cloud_topics { + +void dummy_balancing_policy::rebalance( + std::vector& shards) { + std::sort( + shards.begin(), + shards.end(), + []( + const shard_resource_utilization& lhs, + const shard_resource_utilization& rhs) { + return lhs.shard < rhs.shard; + }); +} + +write_request_balancer::write_request_balancer( + core::write_pipeline<>& pipeline, std::unique_ptr policy) + : _pipeline(pipeline) + , _my_stage(_pipeline.register_pipeline_stage()) + , _policy(std::move(policy)) { + for (auto cpu : ss::smp::all_cpus()) { + _shards.push_back({.shard = cpu}); + } +} + +ss::future<> write_request_balancer::start() { + ssx::repeat_until_gate_closed_or_aborted( + _gate, _as, [this] { return run_bg(); }); + co_return; +} + +ss::future<> write_request_balancer::stop() { + _as.request_abort(); + return _gate.close(); +} + +ss::future<> write_request_balancer::run_bg() noexcept { + core::event_filter filter( + core::event_type::new_write_request, _my_stage); + auto event = co_await _pipeline.subscribe(filter, _as); + switch (event.type) { + case core::event_type::shutting_down: + co_return; + case core::event_type::err_timedout: + case core::event_type::new_read_request: + case core::event_type::none: + unreachable(); + case core::event_type::new_write_request: + break; + } + auto fut = co_await ss::coroutine::as_future(run_once()); + if (fut.failed()) { + auto ep = fut.get_exception(); + if (!ssx::is_shutdown_exception(ep)) { + vlog(cd_log.error, "Failed to proxy write request: {}", ep); + } + } else if (fut.get().has_failure()) { + vlog( + cd_log.error, "Failed to proxy write request: {}", fut.get().error()); + } +} + +static inline core::serialized_chunk +deep_copy(const core::serialized_chunk& chunk) { + core::serialized_chunk copy; + copy.batches = chunk.batches.copy(); + copy.payload = chunk.payload.copy(); + return copy; +} + +static inline ss::circular_buffer +deep_copy(const ss::circular_buffer& batches) { + ss::circular_buffer res; + for (const auto& b : batches) { + res.push_back(b.copy()); + } + return res; +} + +ss::future> write_request_balancer::run_once() noexcept { + _policy->rebalance(_shards); + if (_shards.front().shard == ss::this_shard_id()) { + // Fast path, no need to rebalance requests. + _pipeline.process_stage( + [](core::write_request<>&) noexcept + -> checked { + return core::get_write_requests_result{ + .stop_iteration = ss::stop_iteration::no, + .advance_next_stage = true, + }; + }, + _my_stage); + co_return false; + } + auto requests = _pipeline.get_write_requests( + std::numeric_limits::max(), _my_stage); + // For every request: + // - make a proxy on a target shard + // - submit it to the pipeline on that shard bypassing + // the load balancer + // - extract result from the proxy request and put it + // into the original one + for (auto& request : requests.ready) { + // The request is stored on the stack of one of the fibers and its + // lifetime is defined by the promise that it contains. The request will + // be alive until the promise is set. After its set all bets are off. + // Since the request is extracted out of the pipeline nothing can set + // the promise except the resource_balancer. + auto target_shard = _shards.front().shard; + // TODO: use zero copy mechanism + // NOTE: it's OK to background because the pipeline has throttler which + // handles memory pressure + ssx::background = roundtrip(target_shard, request); + } + co_return true; +} + +ss::future<> write_request_balancer::roundtrip( + ss::shard_id shard, core::write_request<>& req) { + vassert(shard != ss::this_shard_id(), "Can't roundtrip using one shard"); + auto resp = co_await container().invoke_on( + shard, [&req](write_request_balancer& balancer) { + return balancer.proxy_write_request(&req); + }); + ack_write_response(&req, std::move(resp)); +} + +ss::future> +write_request_balancer::proxy_write_request(const core::write_request<>* req) { + // The request was created on another shard. + auto timeout = std::chrono::duration_cast( + req->expiration_time - req->ingestion_time); + core::write_request<> proxy( + req->ntp, deep_copy(req->data_chunk), timeout, req->stage); + auto fut = proxy.response.get_future(); + _pipeline.reenqueue(proxy); + auto batches_fut = co_await ss::coroutine::as_future(std::move(fut)); + if (batches_fut.failed()) { + vlog( + cd_log.error, + "Proxy write request failed: {}", + batches_fut.get_exception()); + co_return outcome::failure(errc::upload_failure); + } + auto batches = batches_fut.get(); + if (batches.has_error()) { + // Normal errors (S3 upload failure or timeout) + // are handled here + vlog(cd_log.error, "Proxy write request failed: {}", batches.error()); + co_return outcome::failure(batches.error()); + } + auto ptr = ss::make_lw_shared>( + std::move(batches.value())); + co_return outcome::success(ss::make_foreign(std::move(ptr))); +} + +void write_request_balancer::ack_write_response( + core::write_request<>* req, checked resp) { + // The response was created on another shard. + // The req was created on this shard. + // TODO: use zero copy + if (resp.has_error()) { + req->set_value(resp.error()); + } else { + auto batches = deep_copy(*resp.value()); + req->set_value(std::move(batches)); + } +} + +} // namespace experimental::cloud_topics diff --git a/src/v/cloud_topics/request_balancer/write_request_balancer.h b/src/v/cloud_topics/request_balancer/write_request_balancer.h new file mode 100644 index 0000000000000..129abf478c9b2 --- /dev/null +++ b/src/v/cloud_topics/request_balancer/write_request_balancer.h @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "base/seastarx.h" +#include "cloud_topics/core/write_pipeline.h" +#include "cloud_topics/core/write_request.h" + +#include +#include +#include +#include +#include + +#include + +namespace experimental::cloud_topics { + +/// Current resource utilization of the shard. +/// The instances of this struct are exchanged by all shards. +struct shard_resource_utilization { + unsigned shard{ss::this_shard_id()}; + // TODO: implement + // NOTE: currently, the monitoring of the 'remote' is implemented by + // the 'cloud_storage::remote' and not available through the + // 'cloud_io::remote'. We should move this mechanism to the 'cloud_io' + // subsystem and then use it here to collect resource utilization + // information. +}; + +class balancing_policy { +public: + balancing_policy() = default; + balancing_policy(const balancing_policy&) = default; + balancing_policy(balancing_policy&&) = default; + balancing_policy& operator=(const balancing_policy&) = default; + balancing_policy& operator=(balancing_policy&&) = default; + virtual ~balancing_policy() = default; + + /// Reorder the vector so the shard that should be chosen first + /// goes first etc. + virtual void rebalance(std::vector&) = 0; +}; + +/// Balancing policy which choses shards based on shard ids (prefers pushing +/// write requests to shard 0). +class dummy_balancing_policy : public balancing_policy { +public: + void rebalance(std::vector&) override; +}; + +struct resource_balancer_accessor; + +/// The resource balancer is monitoring the write_pipeline and redirects +/// write requests to the correct shard. +class write_request_balancer + : public ss::peering_sharded_service { + using request_ptr = ss::weak_ptr>; + + friend struct resource_balancer_accessor; + +public: + write_request_balancer( + core::write_pipeline<>& pipeline, + std::unique_ptr policy); + + ss::future<> start(); + + ss::future<> stop(); + +private: + /// Run the load balancing for all available write requests + ss::future> run_once() noexcept; + + /// Run background loop + ss::future<> run_bg() noexcept; + + using foreign_resp_ptr = ss::foreign_ptr< + ss::lw_shared_ptr>>; + + /// Make a copy of the write request and enqueue it + ss::future> + proxy_write_request(const core::write_request<>* req); + + void ack_write_response( + core::write_request<>* req, checked resp); + + ss::future<> roundtrip(ss::shard_id shard, core::write_request<>& req); + + std::vector _shards; + core::write_pipeline<>& _pipeline; + core::pipeline_stage _my_stage; + std::unique_ptr _policy; + ss::gate _gate; + ss::abort_source _as; +}; + +} // namespace experimental::cloud_topics