diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 623bc9cbb042..bbcc2598528e 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -31,6 +31,29 @@ result_promise_t::future_type request::result() const { return _result.get_shared_future(); } +void request::set_value(request_result_t::value_type value) { + vassert( + _state <= request_state::in_progress && !_result.available(), + "unexpected request state during result set: {}", + *this); + _result.set_value(value); + _state = request_state::completed; +} + +void request::set_error(request_result_t::error_type error) { + // This is idempotent as different fibers can mark the result error + // at different times in some edge cases. + if (_state != request_state::completed) { + _result.set_value(error); + _state = request_state::completed; + return; + } + vassert( + _result.available() && result().get0().has_error(), + "Invalid result state, expected to be available and errored out: {}", + *this); +} + bool request::operator==(const request& other) const { bool compare = _first_sequence == other._first_sequence && _last_sequence == other._last_sequence @@ -119,7 +142,7 @@ result requests::try_emplace( // checks for sequence tracking. while (!_inflight_requests.empty()) { if (!_inflight_requests.front()->has_completed()) { - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -133,7 +156,7 @@ result requests::try_emplace( if (!_inflight_requests.front()->has_completed()) { // Here we know for sure the term change, these in flight // requests are going to fail anyway, mark them so. - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -214,7 +237,7 @@ bool requests::stm_apply( void requests::shutdown() { for (auto& request : _inflight_requests) { if (!request->has_completed()) { - request->_result.set_value(errc::shutting_down); + request->set_error(errc::shutting_down); } } _inflight_requests.clear(); diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index 993d6d844b19..d5f710c3e053 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -46,7 +46,8 @@ using producer_ptr = ss::lw_shared_ptr; // right after set_value(), this is an implementation quirk, be // mindful of that behavior when using it. We have a test for // it in expiring_promise_test -using result_promise_t = ss::shared_promise>; +using request_result_t = result; +using result_promise_t = ss::shared_promise; using request_ptr = ss::lw_shared_ptr; using seq_t = int32_t; @@ -75,15 +76,8 @@ class request { } } - template - void set_value(ValueType&& value) { - vassert( - _state <= request_state::in_progress && !_result.available(), - "unexpected request state during result set: {}", - *this); - _result.set_value(std::forward(value)); - _state = request_state::completed; - } + void set_value(request_result_t::value_type); + void set_error(request_result_t::error_type); void mark_request_in_progress() { _state = request_state::in_progress; } request_state state() const { return _state; } result_promise_t::future_type result() const; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5ff5ee9036d3..7a538389bcd7 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1065,7 +1065,7 @@ ss::future> rm_stm::do_transactional_replicate( auto expiration_it = _log_state.expiration.find(bid.pid); if (expiration_it == _log_state.expiration.end()) { vlog(_ctx_log.warn, "Can not find expiration info for pid:{}", bid.pid); - req_ptr->set_value(errc::generic_tx_error); + req_ptr->set_error(errc::generic_tx_error); co_return errc::generic_tx_error; } expiration_it->second.last_update = clock_type::now(); @@ -1079,7 +1079,7 @@ ss::future> rm_stm::do_transactional_replicate( "got {} on replicating tx data batch for pid:{}", r.error(), bid.pid); - req_ptr->set_value(r.error()); + req_ptr->set_error(r.error()); co_return r.error(); } if (!co_await wait_no_throw( @@ -1089,7 +1089,7 @@ ss::future> rm_stm::do_transactional_replicate( _ctx_log.warn, "application of the replicated tx batch has timed out pid:{}", bid.pid); - req_ptr->set_value(errc::timeout); + req_ptr->set_error(errc::timeout); co_return tx_errc::timeout; } _mem_state.estimated.erase(bid.pid); @@ -1179,7 +1179,6 @@ ss::future> rm_stm::do_idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units& units) { - using ret_t = result; auto request = producer->try_emplace_request(bid, synced_term); if (!request) { co_return request.error(); @@ -1199,7 +1198,7 @@ ss::future> rm_stm::do_idempotent_replicate( _ctx_log.warn, "replication failed, request enqueue returned error: {}", req_enqueued.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } units.return_all(); @@ -1209,13 +1208,13 @@ ss::future> rm_stm::do_idempotent_replicate( if (replicated.failed()) { vlog( _ctx_log.warn, "replication failed: {}", replicated.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } auto result = replicated.get0(); if (result.has_error()) { vlog(_ctx_log.warn, "replication failed: {}", result.error()); - req_ptr->set_value(result.error()); + req_ptr->set_error(result.error()); co_return result.error(); } // translate to kafka offset.