From 92e14e6805bd9c4455cf1990336343d6e5d897d0 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 8 May 2024 18:59:49 +0200 Subject: [PATCH 1/3] c/topic_table: bump _topics_map_revision in more places Partition balancer relies on _topics_map_revision checks to safely iterate over topic table collections with partition granularity (i.e. references to partition data and replica sets are stored and accessed across yield points). To make this safe, increment _topics_map_revision every time _topics, _updates_in_progress, _disabled_partitions or nested collections are modified in a way that invalidates references or iterators. (cherry picked from commit 30bbfb1f24f7e289c6a02956e9718bff4e2f6754) Conflicts: src/v/cluster/topic_table.cc --- src/v/cluster/topic_table.cc | 31 ++++++++++++++++++++++++------- src/v/cluster/topic_table.h | 9 +++++++-- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 2ae45f1541170..38b72d0c01763 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -332,6 +332,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { _updates_in_progress.erase(it); + _topics_map_revision++; + partition_assignment delta_assignment{ current_assignment_it->group, current_assignment_it->id, @@ -406,6 +408,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { current_assignment_it->replicas = in_progress_it->second.get_previous_replicas(); + _topics_map_revision++; + /** * Cancel/force abort delta contains two assignments new_assignment is set * to the one the partition is currently being moved from. Previous @@ -471,6 +475,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { co_return errc::no_update_in_progress; } + auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); + if (p_meta_it == tp->second.partitions.end()) { + co_return errc::partition_not_exists; + } + // revert replica set update current_assignment_it->replicas = in_progress_it->second.get_target_replicas(); @@ -481,11 +490,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { current_assignment_it->replicas, }; - // update partition_meta object - auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); - if (p_meta_it == tp->second.partitions.end()) { - co_return errc::partition_not_exists; - } + // update partition_meta object: // the cancellation was reverted and update went through, we must // update replicas_revisions. p_meta_it->second.replicas_revisions = update_replicas_revisions( @@ -497,6 +502,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { /// Since the update is already finished we drop in_progress state _updates_in_progress.erase(in_progress_it); + _topics_map_revision++; + // notify backend about finished update _pending_deltas.emplace_back( ntp, std::move(delta_assignment), o, delta::op_type::update_finished); @@ -832,12 +839,14 @@ class topic_table::snapshot_applier { updates_t& _updates_in_progress; fragmented_vector& _pending_deltas; topic_table_probe& _probe; + model::revision_id& _topics_map_revision; public: explicit snapshot_applier(topic_table& parent) : _updates_in_progress(parent._updates_in_progress) , _pending_deltas(parent._pending_deltas) - , _probe(parent._probe) {} + , _probe(parent._probe) + , _topics_map_revision(parent._topics_map_revision) {} void delete_ntp( const model::topic_namespace& ns_tp, @@ -846,7 +855,9 @@ class topic_table::snapshot_applier { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id); vlog( clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp); - _updates_in_progress.erase(ntp); + if (_updates_in_progress.erase(ntp)) { + _topics_map_revision++; + }; _pending_deltas.emplace_back( std::move(ntp), @@ -883,6 +894,9 @@ class topic_table::snapshot_applier { vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp); size_t pending_deltas_start_idx = _pending_deltas.size(); + // we are going to modify md_item so increment the revision right away. + _topics_map_revision++; + const model::partition_id p_id = ntp.tp.partition; // 1. reconcile the _topics state (the md_item object) and generate @@ -1121,6 +1135,7 @@ ss::future<> topic_table::apply_snapshot( co_await applier.delete_topic( ns_tp, md_item, topic_snapshot.metadata.revision); md_item = co_await applier.create_topic(ns_tp, topic_snapshot); + _topics_map_revision++; } else { // The topic was present in the previous set, now we need to // reconcile individual partitions. @@ -1156,6 +1171,7 @@ ss::future<> topic_table::apply_snapshot( if (!topic_snapshot.partitions.contains(as_it_copy->id)) { applier.delete_ntp(ns_tp, *as_it_copy, snap_revision); md_item.get_assignments().erase(as_it_copy); + _topics_map_revision++; } co_await ss::coroutine::maybe_yield(); } @@ -1513,6 +1529,7 @@ void topic_table::change_partition_replicas( auto previous_assignment = current_assignment.replicas; // replace partition replica set current_assignment.replicas = new_assignment; + _topics_map_revision++; // calculate deleta for backend diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index c94bd495c47fc..31474141315b4 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -532,8 +532,13 @@ class topic_table { updates_t _updates_in_progress; model::revision_id _last_applied_revision_id; - // Monotonic counter that is bumped for every addition/deletion to topics - // map. Unlike other revisions this does not correspond to the command + + // Monotonic counter that is bumped each time _topics, _disabled_partitions, + // or _updates_in_progress are modified in a way that makes iteration over + // them unsafe (i.e. invalidates iterators or references, including + // for nested collections like partition sets and replica sets). + // + // Unlike other revisions this does not correspond to the command // revision that updated the map. model::revision_id _topics_map_revision{0}; From 7b054e823022b3bd77dcb62262430726db2042e3 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:35:27 +0200 Subject: [PATCH 2/3] utils: introduce concurrent_modification_error exception It is handy to have a base class for all instances of concurrent modifications. (cherry picked from commit 627db4d751bc2e583f0800be0856a3cfb2aaf67b) Conflicts: src/v/cluster/topic_table.h src/v/cluster/topics_frontend.cc src/v/utils/stable_iterator_adaptor.h --- src/v/cluster/topic_table.h | 14 ++++-------- src/v/utils/exceptions.h | 33 +++++++++++++++++++++++++++ src/v/utils/stable_iterator_adaptor.h | 10 ++++---- 3 files changed, 43 insertions(+), 14 deletions(-) create mode 100644 src/v/utils/exceptions.h diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 31474141315b4..94eabfcafc0f8 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -96,21 +96,17 @@ class topic_table { // * partition::get_revision_id() // * raft::group_configuration::revision_id() - class concurrent_modification_error final : public std::exception { + class concurrent_modification_error final + : public ::concurrent_modification_error { public: concurrent_modification_error( model::revision_id initial_revision, model::revision_id current_revision) - : _msg(ssx::sformat( - "Topic table was modified by concurrent fiber. (initial_revision: " - "{}, current_revision: {}) ", + : ::concurrent_modification_error(ssx::sformat( + "Topic table was modified by concurrent fiber. " + "(initial_revision: {}, current_revision: {}) ", initial_revision, current_revision)) {} - - const char* what() const noexcept final { return _msg.c_str(); } - - private: - ss::sstring _msg; }; class in_progress_update { diff --git a/src/v/utils/exceptions.h b/src/v/utils/exceptions.h new file mode 100644 index 0000000000000..a70b8dd549bd5 --- /dev/null +++ b/src/v/utils/exceptions.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "seastarx.h" + +#include + +#include + +/// Some objects reference state that changes comparatively rarely (e.g. +/// topic_table state) across yield points and expect these references to remain +/// valid. In case these references are invalidated by a concurrent fiber, this +/// exception is thrown. This is a signal for the caller to restart the +/// computation with up-to-date state. +class concurrent_modification_error : public std::exception { +public: + explicit concurrent_modification_error(ss::sstring s) + : _msg(std::move(s)) {} + + const char* what() const noexcept override { return _msg.c_str(); } + +private: + ss::sstring _msg; +}; diff --git a/src/v/utils/stable_iterator_adaptor.h b/src/v/utils/stable_iterator_adaptor.h index 424bcb82fd32b..fe39fbdbc8785 100644 --- a/src/v/utils/stable_iterator_adaptor.h +++ b/src/v/utils/stable_iterator_adaptor.h @@ -11,20 +11,20 @@ #pragma once #include "seastarx.h" +#include "utils/exceptions.h" #include #include #include -#include -#include #include -class iterator_stability_violation : public std::runtime_error { +class iterator_stability_violation final + : public concurrent_modification_error { public: - explicit iterator_stability_violation(const std::string& why) - : std::runtime_error(why){}; + explicit iterator_stability_violation(ss::sstring why) + : concurrent_modification_error(std::move(why)){}; }; /* From 940931168fe72fd9cc1aeec89937244216fe011c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:52:07 +0200 Subject: [PATCH 3/3] c/scheduling: detect concurrent allocation_state replacements Shard-local allocation_state object is replaced when we are applying a controller snapshot. After this happens, all live allocated_partition objects become invalid. Detect this and throw concurrent_modification_error in case these objects are still used and make destructors no-op. (cherry picked from commit a57a5757e278ac6b7ca0c58de0f1b23075a93402) Conflicts: src/v/cluster/scheduling/types.cc --- src/v/cluster/scheduling/types.cc | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 1becc229fea4d..a20ecdd03827c 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -13,6 +13,7 @@ #include "cluster/logger.h" #include "cluster/scheduling/allocation_state.h" +#include "utils/exceptions.h" #include "utils/to_string.h" #include @@ -76,6 +77,9 @@ allocation_units::allocation_units( allocation_units::~allocation_units() { oncore_debug_verify(_oncore); + if (unlikely(!_state)) { + return; + } for (auto& pas : _assignments) { for (auto& replica : pas.replicas) { _state->remove_allocation(replica, _domain); @@ -96,6 +100,11 @@ allocated_partition::allocated_partition( std::optional allocated_partition::prepare_move(model::node_id prev_node) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + previous_replica prev; auto it = std::find_if( _replicas.begin(), _replicas.end(), [prev_node](const auto& bs) { @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) { model::broker_shard allocated_partition::add_replica( model::node_id node, const std::optional& prev) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + if (!_original_node2shard) { _original_node2shard.emplace(); for (const auto& bs : _replicas) { @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const { } errc allocated_partition::try_revert(const reallocation_step& step) { - if (!_original_node2shard || !_state) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + + if (!_original_node2shard) { return errc::no_update_in_progress; }