Skip to content

Commit

Permalink
c/scheduling: detect concurrent allocation_state replacements
Browse files Browse the repository at this point in the history
Shard-local allocation_state object is replaced when we are applying
a controller snapshot. After this happens, all live allocated_partition
objects become invalid. Detect this and throw
concurrent_modification_error in case these objects are still used and
make destructors no-op.

(cherry picked from commit a57a575)

Conflicts:
	src/v/cluster/scheduling/types.cc
  • Loading branch information
ztlpn committed May 14, 2024
1 parent 7b054e8 commit 9409311
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cluster/logger.h"
#include "cluster/scheduling/allocation_state.h"
#include "utils/exceptions.h"
#include "utils/to_string.h"

#include <fmt/ostream.h>
Expand Down Expand Up @@ -76,6 +77,9 @@ allocation_units::allocation_units(

allocation_units::~allocation_units() {
oncore_debug_verify(_oncore);
if (unlikely(!_state)) {
return;
}
for (auto& pas : _assignments) {
for (auto& replica : pas.replicas) {
_state->remove_allocation(replica, _domain);
Expand All @@ -96,6 +100,11 @@ allocated_partition::allocated_partition(

std::optional<allocated_partition::previous_replica>
allocated_partition::prepare_move(model::node_id prev_node) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

previous_replica prev;
auto it = std::find_if(
_replicas.begin(), _replicas.end(), [prev_node](const auto& bs) {
Expand Down Expand Up @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) {

model::broker_shard allocated_partition::add_replica(
model::node_id node, const std::optional<previous_replica>& prev) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
_original_node2shard.emplace();
for (const auto& bs : _replicas) {
Expand Down Expand Up @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const {
}

errc allocated_partition::try_revert(const reallocation_step& step) {
if (!_original_node2shard || !_state) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
return errc::no_update_in_progress;
}

Expand Down

0 comments on commit 9409311

Please sign in to comment.