Skip to content

Commit

Permalink
Merge pull request #21524 from bharathv/fix-19841
Browse files Browse the repository at this point in the history
 [v24.1.x] producer_state: add request::set_error and make it idempotent
  • Loading branch information
piyushredpanda authored Jul 22, 2024
2 parents 2006379 + a113477 commit b2246ae
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 23 deletions.
67 changes: 63 additions & 4 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,44 @@

namespace cluster {

std::ostream& operator<<(std::ostream& os, request_state state) {
switch (state) {
case request_state::initialized:
return os << "initialized";
case request_state::in_progress:
return os << "in_progress";
case request_state::completed:
return os << "completed";
}
}

result_promise_t::future_type request::result() const {
return _result.get_shared_future();
}

void request::set_value(request_result_t::value_type value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during result set: {}",
*this);
_result.set_value(value);
_state = request_state::completed;
}

void request::set_error(request_result_t::error_type error) {
// This is idempotent as different fibers can mark the result error
// at different times in some edge cases.
if (_state != request_state::completed) {
_result.set_value(error);
_state = request_state::completed;
return;
}
vassert(
_result.available() && result().get0().has_error(),
"Invalid result state, expected to be available and errored out: {}",
*this);
}

bool request::operator==(const request& other) const {
bool compare = _first_sequence == other._first_sequence
&& _last_sequence == other._last_sequence
Expand Down Expand Up @@ -108,7 +142,7 @@ result<request_ptr> requests::try_emplace(
// checks for sequence tracking.
while (!_inflight_requests.empty()) {
if (!_inflight_requests.front()->has_completed()) {
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand All @@ -122,7 +156,7 @@ result<request_ptr> requests::try_emplace(
if (!_inflight_requests.front()->has_completed()) {
// Here we know for sure the term change, these in flight
// requests are going to fail anyway, mark them so.
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand Down Expand Up @@ -203,7 +237,7 @@ bool requests::stm_apply(
void requests::shutdown() {
for (auto& request : _inflight_requests) {
if (!request->has_completed()) {
request->_result.set_value(errc::shutting_down);
request->set_error(errc::shutting_down);
}
}
_inflight_requests.clear();
Expand Down Expand Up @@ -233,6 +267,18 @@ bool producer_state::operator==(const producer_state& other) const {
&& _requests == other._requests;
}

std::ostream& operator<<(std::ostream& o, const request& request) {
fmt::print(
o,
"{{ first: {}, last: {}, term: {}, result_available: {}, state: {} }}",
request._first_sequence,
request._last_sequence,
request._term,
request._result.available(),
request._state);
return o;
}

std::ostream& operator<<(std::ostream& o, const requests& requests) {
fmt::print(
o,
Expand Down Expand Up @@ -286,8 +332,21 @@ result<request_ptr> producer_state::try_emplace_request(
current_term,
reset,
_requests);
return _requests.try_emplace(

auto result = _requests.try_emplace(
bid.first_seq, bid.last_seq, current_term, reset);

if (unlikely(result.has_error())) {
vlog(
clusterlog.warn,
"[{}] error {} processing request {}, term: {}, reset: {}",
*this,
result.error(),
bid,
current_term,
reset);
}
return result;
}

bool producer_state::update(
Expand Down
20 changes: 8 additions & 12 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ using producer_ptr = ss::lw_shared_ptr<producer_state>;
// right after set_value(), this is an implementation quirk, be
// mindful of that behavior when using it. We have a test for
// it in expiring_promise_test
using result_promise_t = ss::shared_promise<result<kafka_result>>;
using request_result_t = result<kafka_result>;
using result_promise_t = ss::shared_promise<request_result_t>;
using request_ptr = ss::lw_shared_ptr<request>;
using seq_t = int32_t;

Expand All @@ -56,6 +57,8 @@ enum class request_state : uint8_t {
completed = 2
};

std::ostream& operator<<(std::ostream&, request_state);

/// A request for a given sequence range, both inclusive.
/// The sequence numbers are stamped by the client and are a part
/// of batch header. A request can either be in progress or completed
Expand All @@ -73,23 +76,16 @@ class request {
}
}

template<class ValueType>
void set_value(ValueType&& value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during set: state: {}, result available: "
"{}",
static_cast<std::underlying_type_t<request_state>>(_state),
_result.available());
_result.set_value(std::forward<ValueType>(value));
_state = request_state::completed;
}
void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
request_state state() const { return _state; }
result_promise_t::future_type result() const;

bool operator==(const request&) const;

friend std::ostream& operator<<(std::ostream&, const request&);

private:
request_state _state{request_state::initialized};
seq_t _first_sequence;
Expand Down
13 changes: 6 additions & 7 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
auto expiration_it = _log_state.expiration.find(bid.pid);
if (expiration_it == _log_state.expiration.end()) {
vlog(_ctx_log.warn, "Can not find expiration info for pid:{}", bid.pid);
req_ptr->set_value(errc::generic_tx_error);
req_ptr->set_error(errc::generic_tx_error);
co_return errc::generic_tx_error;
}
expiration_it->second.last_update = clock_type::now();
Expand All @@ -1079,7 +1079,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
"got {} on replicating tx data batch for pid:{}",
r.error(),
bid.pid);
req_ptr->set_value(r.error());
req_ptr->set_error(r.error());
co_return r.error();
}
if (!co_await wait_no_throw(
Expand All @@ -1089,7 +1089,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
_ctx_log.warn,
"application of the replicated tx batch has timed out pid:{}",
bid.pid);
req_ptr->set_value(errc::timeout);
req_ptr->set_error(errc::timeout);
co_return tx_errc::timeout;
}
_mem_state.estimated.erase(bid.pid);
Expand Down Expand Up @@ -1179,7 +1179,6 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued,
ssx::semaphore_units& units) {
using ret_t = result<kafka_result>;
auto request = producer->try_emplace_request(bid, synced_term);
if (!request) {
co_return request.error();
Expand All @@ -1199,7 +1198,7 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
_ctx_log.warn,
"replication failed, request enqueue returned error: {}",
req_enqueued.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
units.return_all();
Expand All @@ -1209,13 +1208,13 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
if (replicated.failed()) {
vlog(
_ctx_log.warn, "replication failed: {}", replicated.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
auto result = replicated.get0();
if (result.has_error()) {
vlog(_ctx_log.warn, "replication failed: {}", result.error());
req_ptr->set_value<ret_t>(result.error());
req_ptr->set_error(result.error());
co_return result.error();
}
// translate to kafka offset.
Expand Down

0 comments on commit b2246ae

Please sign in to comment.