Skip to content

Commit

Permalink
producer_state: add request::set_error and make it idempotent
Browse files Browse the repository at this point in the history
A request can be marked as errored multiple times, consider the example
below.

replicate_f : waiting for replication
term change
requests from old terms gc-ed: set_err(ec)
replicate_f: set_err(ec)

Current assert assumes that a request can be set only once, which is
true for setting a successful result but not for errors. This commit
splits set_value() into set_value() and set_error() and adjusts
assert conditions accordingly.

(cherry picked from commit 7719140)
  • Loading branch information
bharathv committed Jul 19, 2024
1 parent 12f46b2 commit a113477
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
29 changes: 26 additions & 3 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ result_promise_t::future_type request::result() const {
return _result.get_shared_future();
}

void request::set_value(request_result_t::value_type value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during result set: {}",
*this);
_result.set_value(value);
_state = request_state::completed;
}

void request::set_error(request_result_t::error_type error) {
// This is idempotent as different fibers can mark the result error
// at different times in some edge cases.
if (_state != request_state::completed) {
_result.set_value(error);
_state = request_state::completed;
return;
}
vassert(
_result.available() && result().get0().has_error(),
"Invalid result state, expected to be available and errored out: {}",
*this);
}

bool request::operator==(const request& other) const {
bool compare = _first_sequence == other._first_sequence
&& _last_sequence == other._last_sequence
Expand Down Expand Up @@ -119,7 +142,7 @@ result<request_ptr> requests::try_emplace(
// checks for sequence tracking.
while (!_inflight_requests.empty()) {
if (!_inflight_requests.front()->has_completed()) {
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand All @@ -133,7 +156,7 @@ result<request_ptr> requests::try_emplace(
if (!_inflight_requests.front()->has_completed()) {
// Here we know for sure the term change, these in flight
// requests are going to fail anyway, mark them so.
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand Down Expand Up @@ -214,7 +237,7 @@ bool requests::stm_apply(
void requests::shutdown() {
for (auto& request : _inflight_requests) {
if (!request->has_completed()) {
request->_result.set_value(errc::shutting_down);
request->set_error(errc::shutting_down);
}
}
_inflight_requests.clear();
Expand Down
14 changes: 4 additions & 10 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ using producer_ptr = ss::lw_shared_ptr<producer_state>;
// right after set_value(), this is an implementation quirk, be
// mindful of that behavior when using it. We have a test for
// it in expiring_promise_test
using result_promise_t = ss::shared_promise<result<kafka_result>>;
using request_result_t = result<kafka_result>;
using result_promise_t = ss::shared_promise<request_result_t>;
using request_ptr = ss::lw_shared_ptr<request>;
using seq_t = int32_t;

Expand Down Expand Up @@ -75,15 +76,8 @@ class request {
}
}

template<class ValueType>
void set_value(ValueType&& value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during result set: {}",
*this);
_result.set_value(std::forward<ValueType>(value));
_state = request_state::completed;
}
void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
request_state state() const { return _state; }
result_promise_t::future_type result() const;
Expand Down
13 changes: 6 additions & 7 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
auto expiration_it = _log_state.expiration.find(bid.pid);
if (expiration_it == _log_state.expiration.end()) {
vlog(_ctx_log.warn, "Can not find expiration info for pid:{}", bid.pid);
req_ptr->set_value(errc::generic_tx_error);
req_ptr->set_error(errc::generic_tx_error);
co_return errc::generic_tx_error;
}
expiration_it->second.last_update = clock_type::now();
Expand All @@ -1079,7 +1079,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
"got {} on replicating tx data batch for pid:{}",
r.error(),
bid.pid);
req_ptr->set_value(r.error());
req_ptr->set_error(r.error());
co_return r.error();
}
if (!co_await wait_no_throw(
Expand All @@ -1089,7 +1089,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
_ctx_log.warn,
"application of the replicated tx batch has timed out pid:{}",
bid.pid);
req_ptr->set_value(errc::timeout);
req_ptr->set_error(errc::timeout);
co_return tx_errc::timeout;
}
_mem_state.estimated.erase(bid.pid);
Expand Down Expand Up @@ -1179,7 +1179,6 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued,
ssx::semaphore_units& units) {
using ret_t = result<kafka_result>;
auto request = producer->try_emplace_request(bid, synced_term);
if (!request) {
co_return request.error();
Expand All @@ -1199,7 +1198,7 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
_ctx_log.warn,
"replication failed, request enqueue returned error: {}",
req_enqueued.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
units.return_all();
Expand All @@ -1209,13 +1208,13 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
if (replicated.failed()) {
vlog(
_ctx_log.warn, "replication failed: {}", replicated.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
auto result = replicated.get0();
if (result.has_error()) {
vlog(_ctx_log.warn, "replication failed: {}", result.error());
req_ptr->set_value<ret_t>(result.error());
req_ptr->set_error(result.error());
co_return result.error();
}
// translate to kafka offset.
Expand Down

0 comments on commit a113477

Please sign in to comment.