Skip to content

Commit

Permalink
cluster: invoke set_next_version on controller shard
Browse files Browse the repository at this point in the history
Just like patch, we also have to call do_set_next_version on the same
shard because config_frontend::set_next_version might be called from a
background fiber.

(cherry picked from commit ed04fc3)
  • Loading branch information
pgellert committed Mar 20, 2024
1 parent 6f97e1f commit 36c11ae
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
13 changes: 9 additions & 4 deletions src/v/cluster/config_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,16 @@ ss::future<std::error_code> config_frontend::set_status(
/**
* For config_manager to notify the frontend of what the next version
* number should be.
*
* Must be called on version_shard (the shard where writes are
* serialized to generate version numbers).
*/
void config_frontend::set_next_version(config_version v) {
ss::future<> config_frontend::set_next_version(config_version v) {
co_return co_await container().invoke_on(
cluster::config_frontend::version_shard,
[v](cluster::config_frontend& cfg_frontend) mutable {
return cfg_frontend.do_set_next_version(v);
});
}

void config_frontend::do_set_next_version(config_version v) {
vassert(
ss::this_shard_id() == version_shard, "Must be called on version_shard");
vlog(clusterlog.trace, "set_next_version: {}", v);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/config_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class config_frontend final
ss::future<std::error_code>
set_status(config_status&, model::timeout_clock::time_point);

void set_next_version(config_version v);
ss::future<> set_next_version(config_version v);

private:
ss::future<patch_result>
do_patch(config_update_request&&, model::timeout_clock::time_point);

void do_set_next_version(config_version v);

ss::sharded<controller_stm>& _stm;
ss::sharded<rpc::connection_cache>& _connections;
ss::sharded<partition_leaders_table>& _leaders;
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ss::future<> config_manager::do_bootstrap() {
});

// Version of the first write
_frontend.local().set_next_version(config_version{1});
co_await _frontend.local().set_next_version(config_version{1});

try {
auto patch_result = co_await _frontend.local().patch(
Expand Down Expand Up @@ -206,7 +206,8 @@ ss::future<> config_manager::start() {
clusterlog.trace,
"Starting config_manager... (seen version {})",
_seen_version);
_frontend.local().set_next_version(_seen_version + config_version{1});
co_await _frontend.local().set_next_version(
_seen_version + config_version{1});
}

vlog(clusterlog.trace, "Starting reconcile_status...");
Expand All @@ -227,7 +228,7 @@ ss::future<> config_manager::start() {
_reconcile_wait.signal();
});

return ss::now();
co_return co_await ss::now();
}
void config_manager::handle_cluster_members_update(
model::node_id id, model::membership_state new_state) {
Expand Down Expand Up @@ -891,7 +892,8 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) {
vassert(
ss::this_shard_id() == config_frontend::version_shard,
"Must be called on frontend version_shard");
_frontend.local().set_next_version(_seen_version + config_version{1});
co_await _frontend.local().set_next_version(
_seen_version + config_version{1});

const cluster_config_delta_cmd_data& data = cmd.value;
vlog(
Expand Down

0 comments on commit 36c11ae

Please sign in to comment.