Skip to content

Commit

Permalink
rm_stm: fix a race condition between async cleanup and reset
Browse files Browse the repository at this point in the history
This is a classic iterator invalidation caught by the test added in the
previous commit. Cleanup could race with reset thus invalidating the
iterator used in max_concurrent_for_each().
  • Loading branch information
bharathv committed Dec 10, 2024
1 parent 68137e7 commit b645c31
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
59 changes: 46 additions & 13 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ rm_stm::rm_stm(
, _feature_table(feature_table)
, _ctx_log(txlog, ssx::sformat("[{}]", c->ntp()))
, _producer_state_manager(producer_state_manager)
, _vcluster_id(vcluster_id) {
, _vcluster_id(vcluster_id)
, _producers_pending_cleanup(std::numeric_limits<size_t>::max()) {
setup_metrics();
if (!_is_tx_enabled) {
_is_autoabort_enabled = false;
Expand All @@ -114,6 +115,18 @@ rm_stm::rm_stm(
e);
});
});

ssx::repeat_until_gate_closed_or_aborted(_gate, _as, [this] {
return cleanup_evicted_producers().handle_exception(
[h = _gate.hold(), this](const std::exception_ptr& ex) {
if (!ssx::is_shutdown_exception(ex)) {
vlog(
_ctx_log.warn,
"encountered an exception while cleaning producers: ",
ex);
}
});
});
}

ss::future<model::offset> rm_stm::bootstrap_committed_offset() {
Expand All @@ -129,6 +142,8 @@ ss::future<model::offset> rm_stm::bootstrap_committed_offset() {

std::pair<producer_ptr, rm_stm::producer_previously_known>
rm_stm::maybe_create_producer(model::producer_identity pid) {
// note: must be called under state_lock in shared/read mode.

// Double lookup because of two reasons
// 1. we are forced to use a ptr as map value_type because producer_state is
// not movable
Expand All @@ -148,23 +163,37 @@ rm_stm::maybe_create_producer(model::producer_identity pid) {
return std::make_pair(producer, producer_previously_known::no);
}

void rm_stm::cleanup_producer_state(model::producer_identity pid) {
auto it = _producers.find(pid.get_id());
if (it != _producers.end() && it->second->id() == pid) {
const auto& producer = *(it->second);
if (producer._active_transaction_hook.is_linked()) {
vlog(
_ctx_log.error,
"Ignoring cleanup request of producer {} due to in progress "
"transaction.",
producer);
return;
ss::future<> rm_stm::cleanup_evicted_producers() {
while (!_as.abort_requested() && !_gate.is_closed()) {
auto pid = co_await _producers_pending_cleanup.pop_eventually();
auto units = co_await _state_lock.hold_read_lock();
auto it = _producers.find(pid.get_id());
if (it != _producers.end() && it->second->id() == pid) {
const auto& producer = *(it->second);
if (producer._active_transaction_hook.is_linked()) {
vlog(
_ctx_log.error,
"Ignoring cleanup request of producer {} due to in progress "
"transaction.",
producer);
co_return;
}
_producers.erase(it);
vlog(_ctx_log.trace, "removed producer: {}", pid);
}
_producers.erase(it);
}
}

void rm_stm::cleanup_producer_state(model::producer_identity pid) noexcept {
if (_as.abort_requested() || _gate.is_closed()) {
return;
}
_producers_pending_cleanup.push(std::move(pid));
};

ss::future<> rm_stm::reset_producers() {
// note: must always be called under exlusive write lock to
// avoid concurrrent state changes to _producers.
co_await ss::max_concurrent_for_each(
_producers.begin(), _producers.end(), 32, [this](auto& it) {
auto& producer = it.second;
Expand Down Expand Up @@ -738,6 +767,8 @@ ss::future<result<kafka_result>> rm_stm::do_replicate(

ss::future<> rm_stm::stop() {
_as.request_abort();
_producers_pending_cleanup.abort(
std::make_exception_ptr(ss::abort_requested_exception{}));
auto_abort_timer.cancel();
co_await _gate.close();
co_await reset_producers();
Expand Down Expand Up @@ -1655,6 +1686,8 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {

ss::future<>
rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto units = co_await _state_lock.hold_write_lock();

vlog(
_ctx_log.trace,
"applying snapshot with last included offset: {}",
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class rm_stm final : public raft::persisted_stm<> {
= ss::bool_class<struct new_producer_created_tag>;
std::pair<tx::producer_ptr, producer_previously_known>
maybe_create_producer(model::producer_identity);
void cleanup_producer_state(model::producer_identity);
void cleanup_producer_state(model::producer_identity) noexcept;
ss::future<> cleanup_evicted_producers();
ss::future<> reset_producers();
ss::future<checked<model::term_id, tx::errc>> do_begin_tx(
model::term_id,
Expand Down Expand Up @@ -414,6 +415,8 @@ class rm_stm final : public raft::persisted_stm<> {
// producers because epoch is unused.
producers_t _producers;

ss::queue<model::producer_identity> _producers_pending_cleanup;

// All the producers with open transactions in this partition.
// The list is sorted by the open transaction begin offset, so
// the first entry in the list is the earliest open transaction
Expand Down

0 comments on commit b645c31

Please sign in to comment.