Skip to content

Commit

Permalink
Merge pull request #18744 from ballard26/1282-fix
Browse files Browse the repository at this point in the history
Change even topic constraint indexes to per shard
  • Loading branch information
piyushredpanda authored Jun 6, 2024
2 parents 4e0436d + 1ee81f1 commit 66dd490
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 133 deletions.
71 changes: 30 additions & 41 deletions src/v/cluster/scheduling/leader_balancer_constraints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ namespace cluster::leader_balancer_types {

even_topic_distributon_constraint::even_topic_distributon_constraint(
group_id_to_topic_revision_t group_to_topic_rev,
shard_index si,
const shard_index& si,
const muted_index& mi)
: _si(std::move(si))
: _si(si)
, _mi(mi)
, _group_to_topic_rev(std::move(group_to_topic_rev)) {
rebuild_indexes();
Expand All @@ -41,13 +41,9 @@ void even_topic_distributon_constraint::update_index(const reassignment& r) {
skew = adjusted_error(skew, topic_id, r.from, r.to);
_error += skew;

// Update _topic_node_index
_topic_node_index.at(topic_id).at(r.from.node_id) -= 1;
_topic_node_index.at(topic_id).at(r.to.node_id) += 1;

// Update _si

_si.update_index(r);
// Update _topic_shard_index
_topic_shard_index.at(topic_id).at(r.from) -= 1;
_topic_shard_index.at(topic_id).at(r.to) += 1;
}

std::optional<reassignment>
Expand All @@ -60,23 +56,22 @@ even_topic_distributon_constraint::recommended_reassignment() {
}

void even_topic_distributon_constraint::rebuild_indexes() {
_topic_node_index.clear();
_topic_shard_index.clear();
_topic_replica_index.clear();
_topic_partition_index.clear();

for (const auto& broker_shard : si().shards()) {
for (const auto& group_p : broker_shard.second) {
auto topic_id = group_to_topic_id().at(group_p.first);
const auto& node_id = broker_shard.first.node_id;

_topic_node_index[topic_id][node_id] += 1;
_topic_shard_index[topic_id][broker_shard.first] += 1;
_topic_partition_index[topic_id] += 1;

// Some of the replicas may not have leadership. So add
// all replica nodes here.
// all replica shards here.
for (const auto& replica_bs : group_p.second) {
_topic_replica_index[topic_id].insert(replica_bs.node_id);
_topic_node_index[topic_id].try_emplace(replica_bs.node_id);
_topic_replica_index[topic_id].insert(replica_bs);
_topic_shard_index[topic_id].try_emplace(replica_bs);
}
}
}
Expand All @@ -86,20 +81,20 @@ void even_topic_distributon_constraint::rebuild_indexes() {
* Used to calculate the initial values for the error this constraint
* is trying to minimize. The goal here is to calculate a per topic
* error(or skew) where the error is zero if the leaders of a topic's
* partitions are evenly distributed on every node. And where the error
* partitions are evenly distributed on every shard. And where the error
* grows to +infinity the more skewed the leadership assignment is to a
* subset of nodes. The equations used can be summarized as;
* subset of shards. The equations used can be summarized as;
*
* skew[topic_i] = SUM(leaders[node_i, topic_i] - opt[topic_i])^2
* opt[topic_i] = total_partitions[topic_i] / total_nodes[topic_i]
* where total_nodes is the number of nodes a topic has replicas on.
* skew[topic_i] = SUM(leaders[shard_i, topic_i] - opt[topic_i])^2
* opt[topic_i] = total_partitions[topic_i] / total_shards[topic_i]
* where total_shards is the number of shards a topic has replicas on.
* total_partitions is the number of partitions the topic has.
*/
void even_topic_distributon_constraint::calc_topic_skew() {
_topic_skew.clear();
_topic_opt_leaders.clear();

for (const auto& topic : _topic_node_index) {
for (const auto& topic : _topic_shard_index) {
auto topic_partitions = static_cast<double>(
_topic_partition_index.at(topic.first));
auto topic_replicas = static_cast<double>(
Expand All @@ -111,8 +106,8 @@ void even_topic_distributon_constraint::calc_topic_skew() {
auto& skew = _topic_skew[topic.first];
skew = 0;

for (const auto& node : topic.second) {
auto leaders = static_cast<double>(node.second);
for (const auto& shard : topic.second) {
auto leaders = static_cast<double>(shard.second);

skew += pow(leaders - opt_leaders, 2);
}
Expand All @@ -127,39 +122,33 @@ double even_topic_distributon_constraint::adjusted_error(
const topic_id_t& topic_id,
const model::broker_shard& from,
const model::broker_shard& to) const {
// Moving leadership between shards on the same node doesn't change
// error for this constraint.
if (from.node_id == to.node_id) {
return current_error;
}

auto opt_leaders = _topic_opt_leaders.at(topic_id);

const auto& topic_leaders = _topic_node_index.at(topic_id);
const auto& topic_leaders = _topic_shard_index.at(topic_id);

double from_node_leaders = 0;
const auto from_it = topic_leaders.find(from.node_id);
double from_shard_leaders = 0;
const auto from_it = topic_leaders.find(from);
if (from_it != topic_leaders.cend()) {
from_node_leaders = static_cast<double>(from_it->second);
from_shard_leaders = static_cast<double>(from_it->second);
} else {
// If there are no leaders for the topic on the from node
// If there are no leaders for the topic on the from shard
// then there is nothing to move and no change to the error.
return current_error;
}

double to_node_leaders = 0;
const auto to_it = topic_leaders.find(to.node_id);
double to_shard_leaders = 0;
const auto to_it = topic_leaders.find(to);
if (to_it != topic_leaders.cend()) {
to_node_leaders = static_cast<double>(to_it->second);
to_shard_leaders = static_cast<double>(to_it->second);
}

// Subtract old weights
current_error -= pow(from_node_leaders - opt_leaders, 2);
current_error -= pow(to_node_leaders - opt_leaders, 2);
current_error -= pow(from_shard_leaders - opt_leaders, 2);
current_error -= pow(to_shard_leaders - opt_leaders, 2);

// Add new weights
current_error += pow((from_node_leaders - 1) - opt_leaders, 2);
current_error += pow((to_node_leaders + 1) - opt_leaders, 2);
current_error += pow((from_shard_leaders - 1) - opt_leaders, 2);
current_error += pow((to_shard_leaders + 1) - opt_leaders, 2);

return current_error;
}
Expand Down
23 changes: 11 additions & 12 deletions src/v/cluster/scheduling/leader_balancer_constraints.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
#pragma once

#include "cluster/scheduling/leader_balancer_types.h"
#include "container/chunked_hash_map.h"
#include "model/metadata.h"
#include "raft/fundamental.h"

#include <seastar/core/metrics.hh>
#include <seastar/core/sstring.hh>

#include <absl/container/btree_map.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <boost/range/adaptor/reversed.hpp>
Expand Down Expand Up @@ -110,12 +110,12 @@ class even_topic_distributon_constraint final
using topic_id_t = model::revision_id::type;

template<typename ValueType>
using topic_map = absl::btree_map<topic_id_t, ValueType>;
using topic_map = chunked_hash_map<topic_id_t, ValueType>;

public:
even_topic_distributon_constraint(
group_id_to_topic_revision_t group_to_topic_rev,
shard_index si,
const shard_index& si,
const muted_index& mi);

even_topic_distributon_constraint(
Expand Down Expand Up @@ -152,19 +152,19 @@ class even_topic_distributon_constraint final
std::optional<reassignment> recommended_reassignment() override;

private:
shard_index _si;
std::reference_wrapper<const shard_index> _si;
std::reference_wrapper<const muted_index> _mi;
group_id_to_topic_revision_t _group_to_topic_rev;
double _error{0};

// Stores the number of leaders on a given node per topic.
topic_map<absl::flat_hash_map<model::node_id, size_t>> _topic_node_index;
topic_map<chunked_hash_map<model::broker_shard, size_t>> _topic_shard_index;
topic_map<size_t> _topic_partition_index;
topic_map<absl::flat_hash_set<model::node_id>> _topic_replica_index;
topic_map<chunked_hash_set<model::broker_shard>> _topic_replica_index;
topic_map<double> _topic_skew;
topic_map<double> _topic_opt_leaders;

const shard_index& si() const { return _si; }
const shard_index& si() const { return _si.get(); }
const muted_index& mi() const { return _mi.get(); }
const group_id_to_topic_revision_t& group_to_topic_id() const {
return _group_to_topic_rev;
Expand Down Expand Up @@ -214,8 +214,8 @@ class even_shard_load_constraint final
static constexpr double error_jitter = 0.000001;

public:
even_shard_load_constraint(shard_index si, const muted_index& mi)
: _si(std::move(si))
even_shard_load_constraint(const shard_index& si, const muted_index& mi)
: _si(si)
, _mi(mi)
, _num_cores(num_cores())
, _num_groups(num_groups()) {
Expand All @@ -235,7 +235,6 @@ class even_shard_load_constraint final
double error() const { return _error; }
void update_index(const reassignment& r) override {
_error = adjusted_error(_error, r.from, r.to);
_si.update_index(r);
}

/*
Expand All @@ -260,13 +259,13 @@ class even_shard_load_constraint final
double calc_target_load() const;

private:
shard_index _si;
std::reference_wrapper<const shard_index> _si;
std::reference_wrapper<const muted_index> _mi;
size_t _num_cores;
size_t _num_groups;
double _error{0};

const shard_index& si() const { return _si; }
const shard_index& si() const { return _si.get(); }
const muted_index& mi() const { return _mi.get(); }

/*
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/scheduling/leader_balancer_greedy.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class greedy_balanced_shards final : public leader_balancer_strategy {
index_type cores, absl::flat_hash_set<model::node_id> muted_nodes)
: _mi(std::make_unique<leader_balancer_types::muted_index>(
std::move(muted_nodes), leader_balancer_types::muted_groups_t{}))
, _even_shard_load_c(
leader_balancer_types::shard_index{std::move(cores)}, *_mi) {}
, _si(std::make_unique<leader_balancer_types::shard_index>(
std::move(cores)))
, _even_shard_load_c(*_si, *_mi) {}

double error() const final { return _even_shard_load_c.error(); }

Expand All @@ -62,6 +63,7 @@ class greedy_balanced_shards final : public leader_balancer_strategy {

void apply_movement(const reassignment& r) final {
_even_shard_load_c.update_index(r);
_si->update_index(r);
}

std::vector<shard_load> stats() const final {
Expand All @@ -70,6 +72,7 @@ class greedy_balanced_shards final : public leader_balancer_strategy {

private:
std::unique_ptr<leader_balancer_types::muted_index> _mi;
std::unique_ptr<leader_balancer_types::shard_index> _si;
leader_balancer_types::even_shard_load_constraint _even_shard_load_c;
};

Expand Down
9 changes: 6 additions & 3 deletions src/v/cluster/scheduling/leader_balancer_random.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ class random_hill_climbing_strategy final : public leader_balancer_strategy {
random_hill_climbing_strategy(
index_type index, group_id_to_topic_revision_t g_to_ntp, muted_index mi)
: _mi(std::make_unique<muted_index>(std::move(mi)))
, _reassignments(index)
, _etdc(std::move(g_to_ntp), shard_index(index), *_mi)
, _eslc(shard_index(std::move(index)), *_mi) {}
, _si(std::make_unique<shard_index>(std::move(index)))
, _reassignments(_si->shards())
, _etdc(std::move(g_to_ntp), *_si, *_mi)
, _eslc(*_si, *_mi) {}

double error() const override { return _eslc.error() + _etdc.error(); }

Expand Down Expand Up @@ -147,6 +148,7 @@ class random_hill_climbing_strategy final : public leader_balancer_strategy {
_etdc.update_index(reassignment);
_eslc.update_index(reassignment);
_mi->update_index(reassignment);
_si->update_index(reassignment);
_reassignments.update_index(reassignment);
}

Expand All @@ -159,6 +161,7 @@ class random_hill_climbing_strategy final : public leader_balancer_strategy {
static constexpr double error_jitter = 0.000001;

std::unique_ptr<muted_index> _mi;
std::unique_ptr<shard_index> _si;
random_reassignments _reassignments;

even_topic_distributon_constraint _etdc;
Expand Down
11 changes: 4 additions & 7 deletions src/v/cluster/scheduling/leader_balancer_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@
*/
#pragma once

#include "container/chunked_hash_map.h"
#include "model/metadata.h"
#include "raft/fundamental.h"

#include <absl/container/btree_map.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>
#include <roaring/roaring64map.hh>

namespace cluster::leader_balancer_types {
Expand All @@ -37,12 +34,12 @@ struct reassignment {
reassignment() = default;
};

using index_type = absl::node_hash_map<
using index_type = chunked_hash_map<
model::broker_shard,
absl::btree_map<raft::group_id, std::vector<model::broker_shard>>>;
chunked_hash_map<raft::group_id, std::vector<model::broker_shard>>>;

using group_id_to_topic_revision_t
= absl::btree_map<raft::group_id, model::revision_id>;
= chunked_hash_map<raft::group_id, model::revision_id>;

using muted_groups_t = roaring::Roaring64Map;
/*
Expand Down
Loading

0 comments on commit 66dd490

Please sign in to comment.