diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index aecb81d61d8f6..51234fddf4910 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -90,7 +90,8 @@ rm_stm::rm_stm( , _feature_table(feature_table) , _ctx_log(txlog, ssx::sformat("[{}]", c->ntp())) , _producer_state_manager(producer_state_manager) - , _vcluster_id(vcluster_id) { + , _vcluster_id(vcluster_id) + , _producers_pending_cleanup(std::numeric_limits::max()) { setup_metrics(); if (!_is_tx_enabled) { _is_autoabort_enabled = false; @@ -114,6 +115,18 @@ rm_stm::rm_stm( e); }); }); + + ssx::repeat_until_gate_closed_or_aborted(_gate, _as, [this] { + return cleanup_evicted_producers().handle_exception( + [h = _gate.hold(), this](const std::exception_ptr& ex) { + if (!ssx::is_shutdown_exception(ex)) { + vlog( + _ctx_log.warn, + "encountered an exception while cleaning producers: ", + ex); + } + }); + }); } ss::future rm_stm::bootstrap_committed_offset() { @@ -129,6 +142,8 @@ ss::future rm_stm::bootstrap_committed_offset() { std::pair rm_stm::maybe_create_producer(model::producer_identity pid) { + // note: must be called under state_lock in shared/read mode. + // Double lookup because of two reasons // 1. we are forced to use a ptr as map value_type because producer_state is // not movable @@ -148,23 +163,37 @@ rm_stm::maybe_create_producer(model::producer_identity pid) { return std::make_pair(producer, producer_previously_known::no); } -void rm_stm::cleanup_producer_state(model::producer_identity pid) { - auto it = _producers.find(pid.get_id()); - if (it != _producers.end() && it->second->id() == pid) { - const auto& producer = *(it->second); - if (producer._active_transaction_hook.is_linked()) { - vlog( - _ctx_log.error, - "Ignoring cleanup request of producer {} due to in progress " - "transaction.", - producer); - return; +ss::future<> rm_stm::cleanup_evicted_producers() { + while (!_as.abort_requested() && !_gate.is_closed()) { + auto pid = co_await _producers_pending_cleanup.pop_eventually(); + auto units = co_await _state_lock.hold_read_lock(); + auto it = _producers.find(pid.get_id()); + if (it != _producers.end() && it->second->id() == pid) { + const auto& producer = *(it->second); + if (producer._active_transaction_hook.is_linked()) { + vlog( + _ctx_log.error, + "Ignoring cleanup request of producer {} due to in progress " + "transaction.", + producer); + co_return; + } + _producers.erase(it); + vlog(_ctx_log.trace, "removed producer: {}", pid); } - _producers.erase(it); } +} + +void rm_stm::cleanup_producer_state(model::producer_identity pid) noexcept { + if (_as.abort_requested() || _gate.is_closed()) { + return; + } + _producers_pending_cleanup.push(std::move(pid)); }; ss::future<> rm_stm::reset_producers() { + // note: must always be called under exlusive write lock to + // avoid concurrrent state changes to _producers. co_await ss::max_concurrent_for_each( _producers.begin(), _producers.end(), 32, [this](auto& it) { auto& producer = it.second; @@ -738,6 +767,8 @@ ss::future> rm_stm::do_replicate( ss::future<> rm_stm::stop() { _as.request_abort(); + _producers_pending_cleanup.abort( + std::make_exception_ptr(ss::abort_requested_exception{})); auto_abort_timer.cancel(); co_await _gate.close(); co_await reset_producers(); @@ -1655,6 +1686,8 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const { ss::future<> rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { + auto units = co_await _state_lock.hold_write_lock(); + vlog( _ctx_log.trace, "applying snapshot with last included offset: {}", diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index f31cc1e97b6e8..4fe2cd6ee0677 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -242,7 +242,8 @@ class rm_stm final : public raft::persisted_stm<> { = ss::bool_class; std::pair maybe_create_producer(model::producer_identity); - void cleanup_producer_state(model::producer_identity); + void cleanup_producer_state(model::producer_identity) noexcept; + ss::future<> cleanup_evicted_producers(); ss::future<> reset_producers(); ss::future> do_begin_tx( model::term_id, @@ -414,6 +415,8 @@ class rm_stm final : public raft::persisted_stm<> { // producers because epoch is unused. producers_t _producers; + ss::queue _producers_pending_cleanup; + // All the producers with open transactions in this partition. // The list is sorted by the open transaction begin offset, so // the first entry in the list is the earliest open transaction