Skip to content

Commit

Permalink
Merge pull request #19875 from pgellert/quotas/watch-quota-store-changes
Browse files Browse the repository at this point in the history
CORE-4200 quotas: watch quota store changes
  • Loading branch information
michael-redpanda authored Jun 18, 2024
2 parents 7ad4bdf + 0620382 commit e7f0cb6
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 10 deletions.
30 changes: 26 additions & 4 deletions src/v/cluster/client_quota_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@

namespace cluster::client_quota {

void store::set_quota(const entity_key& key, const entity_value& value) {
void store::set_quota(
const entity_key& key, const entity_value& value, bool trigger_notify) {
if (!value.is_empty()) {
_quotas.insert_or_assign(key, value);
} else {
_quotas.erase(key);
}

if (trigger_notify) {
notify_watchers();
}
}

void store::remove_quota(const entity_key& key) { _quotas.erase(key); }
void store::remove_quota(const entity_key& key) {
_quotas.erase(key);
notify_watchers();
}

std::optional<entity_value> store::get_quota(const entity_key& key) const {
auto it = _quotas.find(key);
Expand All @@ -42,7 +50,10 @@ store::range_container_type store::range(

store::container_type::size_type store::size() const { return _quotas.size(); }

void store::clear() { _quotas.clear(); }
void store::clear() {
_quotas.clear();
notify_watchers();
}

const store::container_type& store::all_quotas() const { return _quotas; }

Expand All @@ -69,9 +80,20 @@ void store::apply_delta(const alter_delta_cmd_data& data) {
break;
}
}
set_quota(key, q);
set_quota(key, q, false);
}
_quotas.rehash(0);
notify_watchers();
}

void store::watch(on_change_callback_type&& f) {
_on_change_watchers.push_back(std::move(f));
}

void store::notify_watchers() const {
for (auto& f : _on_change_watchers) {
f();
}
}

} // namespace cluster::client_quota
15 changes: 11 additions & 4 deletions src/v/cluster/client_quota_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class store final {
= chunked_vector<std::pair<entity_key, entity_value>>;
using range_callback_type
= std::function<bool(const std::pair<entity_key, entity_value>&)>;
using on_change_callback_type = std::function<void()>;

/// Constructs an empty store
store() = default;
Expand All @@ -36,7 +37,10 @@ class store final {
/// All quota types are overwritten with the given entity_value, so on alter
/// operations we need to read the current state of the quota and merge it
/// with the alterations
void set_quota(const entity_key&, const entity_value&);
/// Optionally the call to notify_watchers can be avoided by setting the
/// trigger_notify optional parameter to false
void set_quota(
const entity_key&, const entity_value&, bool trigger_notify = true);

/// Removes the configured quota at the given entity key
void remove_quota(const entity_key&);
Expand All @@ -59,6 +63,9 @@ class store final {
/// Applies the given alter controller command to the store
void apply_delta(const alter_delta_cmd_data&);

/// Call the callback whenever the quotas change in the store
void watch(on_change_callback_type&& f);

static constexpr auto entity_part_filter =
[](
const std::pair<entity_key, entity_value>& kv,
Expand All @@ -84,11 +91,11 @@ class store final {
};
}

// TODO: provide an observer mechanism so that quota_manager can listen to
// quota changes and update its state accordingly

private:
void notify_watchers() const;

container_type _quotas;
std::vector<on_change_callback_type> _on_change_watchers;
};

} // namespace cluster::client_quota
1 change: 1 addition & 0 deletions src/v/kafka/server/client_quota_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ void client_quota_translator::watch(on_change_fn&& fn) {
_target_partition_mutation_quota.watch(watcher);
_default_target_produce_tp_rate.watch(watcher);
_default_target_fetch_tp_rate.watch(watcher);
_quota_store.local().watch(watcher);
}

const client_quota_translator::quota_config&
Expand Down
5 changes: 3 additions & 2 deletions src/v/kafka/server/quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ quota_manager::quota_manager(
, _max_delay(config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) {
if (seastar::this_shard_id() == _client_quotas.shard_id()) {
_gc_timer.set_callback([this]() { gc(); });
auto update_quotas = [this]() { update_client_quotas(); };
_translator.watch(update_quotas);
}
}

Expand All @@ -66,6 +64,9 @@ ss::future<> quota_manager::start() {
if (ss::this_shard_id() == _client_quotas.shard_id()) {
co_await _client_quotas.reset(client_quotas_map_t{});
_gc_timer.arm_periodic(_gc_freq);

auto update_quotas = [this]() { update_client_quotas(); };
_translator.watch(update_quotas);
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/v/kafka/server/tests/quota_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "cluster/client_quota_serde.h"
#include "cluster/client_quota_store.h"
#include "config/configuration.h"
#include "kafka/server/client_quota_translator.h"
Expand Down Expand Up @@ -329,4 +330,24 @@ SEASTAR_THREAD_TEST_CASE(update_test) {
BOOST_CHECK_EQUAL(
client_it->second->tp_produce_rate->rate(), scale_to_smp_count(1024));
}

{
using cluster::client_quota::entity_key;
using cluster::client_quota::entity_value;

// Update fetch config again using the quota store
ss::sstring client_id = "franz-go";
auto key = entity_key{entity_key::client_id_match{client_id}};
auto value = entity_value{.consumer_byte_rate = 16384};
f.quota_store.local().set_quota(key, value);

// Wait for the quota update to propagate
ss::sleep(std::chrono::milliseconds(1)).get();

// Check the rate has been updated
auto it = f.buckets_map.local()->find(k_client_id{client_id});
BOOST_REQUIRE(it != f.buckets_map.local()->end());
BOOST_REQUIRE(it->second->tp_fetch_rate.has_value());
BOOST_CHECK_EQUAL(it->second->tp_fetch_rate->rate(), 16384);
}
}

0 comments on commit e7f0cb6

Please sign in to comment.