Skip to content

Commit

Permalink
Merge pull request #24055 from bashtanov/feature-cleanup
Browse files Browse the repository at this point in the history
Feature cleanup
  • Loading branch information
bashtanov authored Nov 22, 2024
2 parents 9a9a4ae + 40e7af3 commit ee98e07
Show file tree
Hide file tree
Showing 25 changed files with 61 additions and 417 deletions.
11 changes: 3 additions & 8 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,15 +1036,10 @@ ss::future<> controller::cluster_creation_hook(cluster_discovery& discovery) {
// upgraded from a version before 22.3. Replicate just a cluster UUID so we
// can advertise that a cluster has already been bootstrapped to nodes
// trying to discover existing clusters.
bootstrap_cluster_cmd_data cmd_data;
cmd_data.uuid = model::cluster_uuid(uuid_t::create());
ssx::background
= _feature_table.local()
.await_feature(
features::feature::seeds_driven_bootstrap_capable, _as.local())
.then([this] {
bootstrap_cluster_cmd_data cmd_data;
cmd_data.uuid = model::cluster_uuid(uuid_t::create());
return create_cluster(std::move(cmd_data));
})
= create_cluster(std::move(cmd_data))
.handle_exception([](const std::exception_ptr e) {
vlog(clusterlog.warn, "Error creating cluster UUID. {}", e);
});
Expand Down
113 changes: 18 additions & 95 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,30 +529,6 @@ ss::future<std::error_code> do_update_replica_set(
co_return co_await p->update_replica_set(std::move(brokers), cmd_revision);
}

ss::future<std::error_code> revert_configuration_update(
ss::lw_shared_ptr<partition> p,
const replicas_t& replicas,
const replicas_revision_map& replica_revisions,
model::revision_id cmd_revision,
members_table& members,
bool command_based_members_update) {
vlog(
clusterlog.debug,
"[{}] reverting already finished reconfiguration. Revision: {}, replica "
"set: {}",
p->ntp(),
cmd_revision,
replicas);
return do_update_replica_set(
std::move(p),
replicas,
replica_revisions,
cmd_revision,
members,
command_based_members_update,
std::nullopt);
}

/**
* Retrieve topic property based on the following logic
*
Expand Down Expand Up @@ -1270,7 +1246,6 @@ ss::future<result<ss::stop_iteration>> controller_backend::reconcile_ntp_step(
replicas_view.last_cmd_revision(), op_type, replicas_view.assignment);

co_return co_await reconcile_partition_reconfiguration(
rs,
std::move(partition),
*replicas_view.update,
replicas_view.revisions());
Expand All @@ -1281,7 +1256,6 @@ ss::future<result<ss::stop_iteration>> controller_backend::reconcile_ntp_step(

ss::future<result<ss::stop_iteration>>
controller_backend::reconcile_partition_reconfiguration(
ntp_reconciliation_state& rs,
ss::lw_shared_ptr<partition> partition,
const topic_table::in_progress_update& update,
const replicas_revision_map& replicas_revisions) {
Expand All @@ -1305,19 +1279,14 @@ controller_backend::reconcile_partition_reconfiguration(
_self, partition, update.get_resulting_replicas(), cmd_revision);
if (!update_ec) {
auto leader = partition->get_leader_id();
size_t retries = (rs.cur_operation ? rs.cur_operation->retries : 0);
vlog(
clusterlog.trace,
"[{}] update complete, checking if our node can finish it "
"(leader: {}, retry: {})",
"(leader: {})",
partition->ntp(),
leader,
retries);
leader);
if (can_finish_update(
leader,
retries,
update.get_state(),
update.get_resulting_replicas())) {
leader, update.get_state(), update.get_resulting_replicas())) {
auto ec = co_await dispatch_update_finished(
partition->ntp(), update.get_resulting_replicas());
if (ec) {
Expand Down Expand Up @@ -1372,7 +1341,6 @@ controller_backend::reconcile_partition_reconfiguration(

bool controller_backend::can_finish_update(
std::optional<model::node_id> current_leader,
uint64_t current_retry,
reconfiguration_state state,
const replicas_t& current_replicas) {
if (
Expand All @@ -1382,23 +1350,9 @@ bool controller_backend::can_finish_update(
return current_leader == _self;
}
/**
* If the revert feature is active we use current leader to dispatch
* partition move
*/
if (_features.local().is_active(
features::feature::partition_move_revert_cancel)) {
return current_leader == _self
&& contains_node(current_replicas, _self);
}
/**
* Use retry count to determine which node is eligible to dispatch update
* finished. Using modulo division allow us to round robin between
* candidates
* Use current leader to dispatch partition move
*/
const model::broker_shard& candidate
= current_replicas[current_retry % current_replicas.size()];

return candidate.node_id == _self;
return current_leader == _self && contains_node(current_replicas, _self);
}

ss::future<std::error_code> controller_backend::create_partition(
Expand Down Expand Up @@ -1613,28 +1567,14 @@ controller_backend::cancel_replica_set_update(
return result<ss::stop_iteration>{ec};
});
} else if (already_moved) {
if (likely(_features.local().is_active(
features::feature::partition_move_revert_cancel))) {
return dispatch_revert_cancel_move(p->ntp()).then(
[](std::error_code ec) -> result<ss::stop_iteration> {
if (ec) {
return ec;
}
// revert_cancel is dispatched, nothing else to do,
// but wait for the topic table update.
return ss::stop_iteration::yes;
});
}

return revert_configuration_update(
std::move(p),
replicas,
replicas_revisions,
cmd_revision,
_members_table.local(),
command_based_membership_active())
.then([](std::error_code ec) {
return result<ss::stop_iteration>{ec};
return dispatch_revert_cancel_move(p->ntp()).then(
[](std::error_code ec) -> result<ss::stop_iteration> {
if (ec) {
return ec;
}
// revert_cancel is dispatched, nothing else to do,
// but wait for the topic table update.
return ss::stop_iteration::yes;
});
}
return ss::make_ready_future<result<ss::stop_iteration>>(
Expand Down Expand Up @@ -1704,29 +1644,12 @@ controller_backend::force_abort_replica_set_update(
cmd_revision,
reconfiguration_policy::full_local_retention);
} else if (already_moved) {
if (likely(_features.local().is_active(
features::feature::partition_move_revert_cancel))) {
std::error_code ec = co_await dispatch_revert_cancel_move(
partition->ntp());
if (ec) {
co_return ec;
}
co_return ss::stop_iteration::yes;
std::error_code ec = co_await dispatch_revert_cancel_move(
partition->ntp());
if (ec) {
co_return ec;
}

co_return co_await apply_configuration_change_on_leader(
std::move(partition),
replicas,
cmd_revision,
[&](ss::lw_shared_ptr<cluster::partition> p) {
return revert_configuration_update(
std::move(p),
replicas,
replicas_revisions,
cmd_revision,
_members_table.local(),
command_based_membership_active());
});
co_return ss::stop_iteration::yes;
}
co_return errc::waiting_for_recovery;
} else {
Expand Down
2 changes: 0 additions & 2 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ class controller_backend
shard_placement_table&);

ss::future<result<ss::stop_iteration>> reconcile_partition_reconfiguration(
ntp_reconciliation_state&,
ss::lw_shared_ptr<partition>,
const topic_table::in_progress_update&,
const replicas_revision_map& replicas_revisions);
Expand Down Expand Up @@ -365,7 +364,6 @@ class controller_backend

bool can_finish_update(
std::optional<model::node_id> current_leader,
uint64_t current_retry,
reconfiguration_state,
const replicas_t& requested_replicas);

Expand Down
9 changes: 0 additions & 9 deletions src/v/cluster/ephemeral_credential_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ ephemeral_credential_frontend::get(const security::acl_principal& principal) {
auto guard = _gate.hold();
get_return res;

if (!_feature_table.local().is_active(
features::feature::ephemeral_secrets)) {
vlog(
clusterlog.info,
"Ephemeral credentials feature is not active (upgrade in progress?)");
res.err = errc::invalid_node_operation;
co_return res;
}

if (auto it = _e_store.local().find(principal); !_e_store.local().has(it)) {
res.credential = make_ephemeral_credential(principal);
co_await put(res.credential);
Expand Down
25 changes: 11 additions & 14 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,17 @@ ss::future<> feature_manager::maybe_log_license_check_info() {
// Shutting down - next iteration will drop out
co_return;
}
if (_feature_table.local().is_active(features::feature::license)) {
auto enterprise_features = report_enterprise_features();
if (enterprise_features.any()) {
if (_feature_table.local().should_sanction()) {
vlog(
clusterlog.warn,
"A Redpanda Enterprise Edition license is required to use "
"enterprise features: ([{}]). Enter an active license key "
"(for example, rpk cluster license set <key>). To request a "
"license, see https://redpanda.com/license-request. For more "
"information, see "
"https://docs.redpanda.com/current/get-started/licenses.",
fmt::join(enterprise_features.enabled(), ", "));
}
auto enterprise_features = report_enterprise_features();
if (enterprise_features.any()) {
if (_feature_table.local().should_sanction()) {
vlog(
clusterlog.warn,
"A Redpanda Enterprise Edition license is required to use "
"enterprise features: ([{}]). Enter an active license key (for "
"example, rpk cluster license set <key>). To request a license, "
"see https://redpanda.com/license-request. For more information, "
"see https://docs.redpanda.com/current/get-started/licenses.",
fmt::join(enterprise_features.enabled(), ", "));
}
}
}
Expand Down
41 changes: 2 additions & 39 deletions src/v/cluster/members_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1224,27 +1224,13 @@ ss::future<result<join_node_reply>> members_manager::replicate_new_node_uuid(
co_await make_join_node_success_reply(get_node_id(node_uuid)));
}

static bool contains_address(
const net::unresolved_address& address,
const members_table::cache_t& brokers) {
return std::find_if(
brokers.begin(),
brokers.end(),
[&address](const auto& p) {
return p.second.broker.rpc_address() == address;
})
!= brokers.end();
}

ss::future<result<join_node_reply>>
members_manager::handle_join_request(const join_node_request req) {
using ret_t = result<join_node_reply>;
using status_t = join_node_reply::status_code;

bool node_id_assignment_supported = _feature_table.local().is_active(
features::feature::node_id_assignment);
bool req_has_node_uuid = !req.node_uuid.empty();
if (node_id_assignment_supported && !req_has_node_uuid) {
if (!req_has_node_uuid) {
vlog(
clusterlog.warn,
"Invalid join request for node ID {}, node UUID is required",
Expand All @@ -1255,13 +1241,6 @@ members_manager::handle_join_request(const join_node_request req) {
if (req.node.id() >= 0) {
req_node_id = req.node.id();
}
if (!node_id_assignment_supported && !req_node_id) {
vlog(
clusterlog.warn,
"Got request to assign node ID, but feature not active",
req.node.id());
co_return errc::invalid_request;
}
if (
req_has_node_uuid
&& req.node_uuid.size() != model::node_uuid::type::length) {
Expand Down Expand Up @@ -1324,7 +1303,7 @@ members_manager::handle_join_request(const join_node_request req) {
join_node_reply{status_t::not_ready, model::unassigned_node_id});
}

if (likely(node_id_assignment_supported && req_has_node_uuid)) {
if (likely(req_has_node_uuid)) {
const auto it = _id_by_uuid.find(node_uuid);
if (!req_node_id) {
if (it == _id_by_uuid.end()) {
Expand Down Expand Up @@ -1403,22 +1382,6 @@ members_manager::handle_join_request(const join_node_request req) {
});
}

// Older versions of Redpanda don't support having multiple servers pointed
// at the same address.
if (
!node_id_assignment_supported
&& contains_address(
req.node.rpc_address(), _members_table.local().nodes())) {
vlog(
clusterlog.info,
"Broker {} address ({}) conflicts with the address of another "
"node",
req.node.id(),
req.node.rpc_address());
co_return ret_t(
join_node_reply{status_t::conflict, model::unassigned_node_id});
}

if (req.node.id() != _self.id()) {
co_await update_broker_client(
_self.id(),
Expand Down
5 changes: 0 additions & 5 deletions src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,6 @@ void node_status_backend::tick() {
}

ss::future<> node_status_backend::collect_and_store_updates() {
if (!_feature_table.local().is_active(
features::feature::raftless_node_status)) {
co_return;
}

auto updates = co_await collect_updates_from_peers();
co_return co_await _node_status_table.invoke_on_all(
[updates = std::move(updates)](auto& table) {
Expand Down
4 changes: 0 additions & 4 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ rm_stm::rm_stm(
, _ctx_log(txlog, ssx::sformat("[{}]", c->ntp()))
, _producer_state_manager(producer_state_manager)
, _vcluster_id(vcluster_id) {
vassert(
_feature_table.local().is_active(features::feature::transaction_ga),
"unexpected state for transactions support. skipped a few "
"versions during upgrade?");
setup_metrics();
if (!_is_tx_enabled) {
_is_autoabort_enabled = false;
Expand Down
5 changes: 0 additions & 5 deletions src/v/cluster/tests/ephemeral_credential_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ FIXTURE_TEST(test_ephemeral_credential_frontend, cluster_test_fixture) {

wait_for_all_members(3s).get();

tests::cooperative_spin_wait_with_timeout(10s, [&apps] {
return apps.front()->controller->get_feature_table().local().is_active(
features::feature::ephemeral_secrets);
}).get();

auto& fe_0
= apps[0]->controller->get_ephemeral_credential_frontend().local();
auto& fe_1
Expand Down
5 changes: 0 additions & 5 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,6 @@ ss::future<std::error_code> topics_frontend::do_update_replication_factor(
topic_properties_update& update, model::timeout_clock::time_point timeout) {
switch (update.custom_properties.replication_factor.op) {
case incremental_update_operation::set: {
if (!_features.local().is_active(
features::feature::replication_factor_change)) {
co_return cluster::errc::feature_disabled;
}

if (_topics.local().is_fully_disabled(update.tp_ns)) {
co_return errc::topic_disabled;
}
Expand Down
8 changes: 0 additions & 8 deletions src/v/cluster/tx_gateway_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,6 @@ class tx_gateway_frontend final
bool _transactions_enabled;
config::binding<uint64_t> _max_transactions_per_coordinator;

// Transaction GA includes: KIP_447, KIP-360, fix for compaction tx_group*
// records, perf fix#1(Do not writing preparing state on disk in tm_stn),
// perf fix#2(Do not writeing prepared marker in rm_stm)
bool is_transaction_ga() {
return _feature_table.local().is_active(
features::feature::transaction_ga);
}

void start_expire_timer();

void rearm_expire_timer(bool force = false);
Expand Down
Loading

0 comments on commit ee98e07

Please sign in to comment.