Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] cluster: invoke config_frontend methods on controller shard #17211

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/v/cluster/config_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ ss::future<config_frontend::patch_result> config_frontend::patch(
.errc = errc::no_leader_controller, .version = config_version_unset};
}
if (leader == _self) {
co_return co_await do_patch(std::move(update), timeout);
co_return co_await container().invoke_on(
cluster::config_frontend::version_shard,
[update{std::move(update)},
timeout](cluster::config_frontend& cfg_frontend) mutable {
return cfg_frontend.do_patch(std::move(update), timeout);
});
} else {
auto res = co_await _connections.local()
.with_node_client<cluster::controller_client_protocol>(
Expand Down Expand Up @@ -130,11 +135,16 @@ ss::future<std::error_code> config_frontend::set_status(
/**
* For config_manager to notify the frontend of what the next version
* number should be.
*
* Must be called on version_shard (the shard where writes are
* serialized to generate version numbers).
*/
void config_frontend::set_next_version(config_version v) {
ss::future<> config_frontend::set_next_version(config_version v) {
co_return co_await container().invoke_on(
cluster::config_frontend::version_shard,
[v](cluster::config_frontend& cfg_frontend) mutable {
return cfg_frontend.do_set_next_version(v);
});
}

void config_frontend::do_set_next_version(config_version v) {
vassert(
ss::this_shard_id() == version_shard, "Must be called on version_shard");
vlog(clusterlog.trace, "set_next_version: {}", v);
Expand Down
15 changes: 10 additions & 5 deletions src/v/cluster/config_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

#include "cluster/controller_stm.h"

#include <seastar/core/sharded.hh>

namespace cluster {

class config_frontend final {
class config_frontend final
: public ss::peering_sharded_service<config_frontend> {
public:
// Shard ID that will track the next available version and serialize
// writes to hand out sequential versions.
Expand All @@ -39,15 +42,17 @@ class config_frontend final {
ss::future<patch_result>
patch(config_update_request, model::timeout_clock::time_point);

ss::future<patch_result>
do_patch(config_update_request&&, model::timeout_clock::time_point);

ss::future<std::error_code>
set_status(config_status&, model::timeout_clock::time_point);

void set_next_version(config_version v);
ss::future<> set_next_version(config_version v);

private:
ss::future<patch_result>
do_patch(config_update_request&&, model::timeout_clock::time_point);

void do_set_next_version(config_version v);

ss::sharded<controller_stm>& _stm;
ss::sharded<rpc::connection_cache>& _connections;
ss::sharded<partition_leaders_table>& _leaders;
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ss::future<> config_manager::do_bootstrap() {
});

// Version of the first write
_frontend.local().set_next_version(config_version{1});
co_await _frontend.local().set_next_version(config_version{1});

try {
auto patch_result = co_await _frontend.local().patch(
Expand Down Expand Up @@ -206,7 +206,8 @@ ss::future<> config_manager::start() {
clusterlog.trace,
"Starting config_manager... (seen version {})",
_seen_version);
_frontend.local().set_next_version(_seen_version + config_version{1});
co_await _frontend.local().set_next_version(
_seen_version + config_version{1});
}

vlog(clusterlog.trace, "Starting reconcile_status...");
Expand All @@ -227,7 +228,7 @@ ss::future<> config_manager::start() {
_reconcile_wait.signal();
});

return ss::now();
co_return co_await ss::now();
}
void config_manager::handle_cluster_members_update(
model::node_id id, model::membership_state new_state) {
Expand Down Expand Up @@ -891,7 +892,8 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) {
vassert(
ss::this_shard_id() == config_frontend::version_shard,
"Must be called on frontend version_shard");
_frontend.local().set_next_version(_seen_version + config_version{1});
co_await _frontend.local().set_next_version(
_seen_version + config_version{1});

const cluster_config_delta_cmd_data& data = cmd.value;
vlog(
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metrics_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ ss::future<> metrics_reporter::propagate_cluster_id() {
co_return;
}

auto result = co_await _config_frontend.local().do_patch(
auto result = co_await _config_frontend.local().patch(
config_update_request{.upsert = {{"cluster_id", _cluster_info.uuid}}},
model::timeout_clock::now() + 5s);
if (result.errc) {
Expand Down
12 changes: 4 additions & 8 deletions src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,10 @@ service::config_status(config_status_request&& req, rpc::streaming_context&) {

ss::future<config_update_reply>
service::config_update(config_update_request&& req, rpc::streaming_context&) {
auto patch_result = co_await _config_frontend.invoke_on(
config_frontend::version_shard,
[req = std::move(req)](config_frontend& fe) mutable {
return fe.patch(
std::move(req),
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());
});
auto patch_result = co_await _config_frontend.local().patch(
std::move(req),
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());

if (patch_result.errc.category() == error_category()) {
co_return config_update_reply{
Expand Down
14 changes: 5 additions & 9 deletions src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,11 @@ static ss::future<std::vector<resp_resource_t>> alter_broker_configuartion(

auto resp
= co_await ctx.config_frontend()
.invoke_on(
cluster::config_frontend::version_shard,
[req = std::move(req)](cluster::config_frontend& fe) mutable {
return fe.patch(
std::move(req),
model::timeout_clock::now()
+ config::shard_local_cfg()
.alter_topic_cfg_timeout_ms());
})
.local()
.patch(
std::move(req),
model::timeout_clock::now()
+ config::shard_local_cfg().alter_topic_cfg_timeout_ms())
.then([resource](cluster::config_frontend::patch_result pr) {
std::error_code& ec = pr.errc;
error_code kec = error_code::none;
Expand Down
12 changes: 6 additions & 6 deletions src/v/kafka/server/tests/alter_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,13 @@ FIXTURE_TEST(
FIXTURE_TEST(
test_topic_describe_configs_requested_properties, alter_config_test_fixture) {
wait_for_controller_leadership().get();

cluster::config_update_request r{
.upsert = {{"enable_schema_id_validation", "compat"}}};
app.controller->get_config_frontend()
.invoke_on_all([](cluster::config_frontend& cfg_frontend) {
cluster::config_update_request r{
.upsert = {{"enable_schema_id_validation", "compat"}}};
return cfg_frontend.patch(r, model::timeout_clock::now() + 1s)
.discard_result();
})
.local()
.patch(r, model::timeout_clock::now() + 1s)
.discard_result()
.get();

model::topic test_tp{"topic-1"};
Expand Down
8 changes: 3 additions & 5 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1678,11 +1678,9 @@ admin_server::patch_cluster_config_handler(
update.upsert.size(),
update.remove.size());

auto patch_result = co_await _controller->get_config_frontend().invoke_on(
cluster::config_frontend::version_shard,
[update = std::move(update)](cluster::config_frontend& fe) mutable {
return fe.patch(std::move(update), model::timeout_clock::now() + 5s);
});
auto patch_result
= co_await _controller->get_config_frontend().local().patch(
std::move(update), model::timeout_clock::now() + 5s);

co_await throw_on_error(*req, patch_result.errc, model::controller_ntp);

Expand Down
Loading