From 940931168fe72fd9cc1aeec89937244216fe011c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:52:07 +0200 Subject: [PATCH] c/scheduling: detect concurrent allocation_state replacements Shard-local allocation_state object is replaced when we are applying a controller snapshot. After this happens, all live allocated_partition objects become invalid. Detect this and throw concurrent_modification_error in case these objects are still used and make destructors no-op. (cherry picked from commit a57a5757e278ac6b7ca0c58de0f1b23075a93402) Conflicts: src/v/cluster/scheduling/types.cc --- src/v/cluster/scheduling/types.cc | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 1becc229fea4d..a20ecdd03827c 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -13,6 +13,7 @@ #include "cluster/logger.h" #include "cluster/scheduling/allocation_state.h" +#include "utils/exceptions.h" #include "utils/to_string.h" #include @@ -76,6 +77,9 @@ allocation_units::allocation_units( allocation_units::~allocation_units() { oncore_debug_verify(_oncore); + if (unlikely(!_state)) { + return; + } for (auto& pas : _assignments) { for (auto& replica : pas.replicas) { _state->remove_allocation(replica, _domain); @@ -96,6 +100,11 @@ allocated_partition::allocated_partition( std::optional allocated_partition::prepare_move(model::node_id prev_node) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + previous_replica prev; auto it = std::find_if( _replicas.begin(), _replicas.end(), [prev_node](const auto& bs) { @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) { model::broker_shard allocated_partition::add_replica( model::node_id node, const std::optional& prev) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + if (!_original_node2shard) { _original_node2shard.emplace(); for (const auto& bs : _replicas) { @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const { } errc allocated_partition::try_revert(const reallocation_step& step) { - if (!_original_node2shard || !_state) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + + if (!_original_node2shard) { return errc::no_update_in_progress; }