Skip to content

Commit

Permalink
Merge pull request #17882 from mmaslankaprv/v23.2.x-multi-bp
Browse files Browse the repository at this point in the history
Backport of #17673  #17498 #16560
  • Loading branch information
mmaslankaprv authored Apr 17, 2024
2 parents cfbb2a5 + 5042dfb commit c2fc38c
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 26 deletions.
21 changes: 18 additions & 3 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,16 +665,31 @@ ss::future<> group_manager::handle_partition_leader_change(
std::nullopt,
std::nullopt,
std::nullopt);

auto expected_to_read = model::prev_offset(
p->partition->high_watermark());
return p->partition->make_reader(reader_config)
.then([this, term, p, timeout](
.then([this, term, p, timeout, expected_to_read](
model::record_batch_reader reader) {
return std::move(reader)
.consume(
group_recovery_consumer(_serializer_factory(), p->as),
timeout)
.then([this, term, p](
.then([this, term, p, expected_to_read](
group_recovery_consumer_state state) {
if (state.last_read_offset < expected_to_read) {
vlog(
klog.error,
"error recovering group state from {}. "
"Expected to read up to {} but last offset "
"consumed is equal to {}",
p->partition->ntp(),
expected_to_read,
state.last_read_offset);
// force step down to allow other node to
// recover group
return p->partition->raft()->step_down(
"unable to recover group, short read");
}
// avoid trying to recover if we stopped the
// reader because an abort was requested
if (p->as.abort_requested()) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ group_recovery_consumer::operator()(model::record_batch batch) {
if (_as.abort_requested()) {
co_return ss::stop_iteration::yes;
}
_state.last_read_offset = batch.last_offset();
if (batch.header().type == model::record_batch_type::raft_data) {
_batch_base_offset = batch.base_offset();
co_await model::for_each_record(batch, [this](model::record& r) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/group_recovery_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct group_recovery_consumer_state {
* retention feature is activated. see group::offset_metadata for more info.
*/
bool has_offset_retention_feature_fence{false};
model::offset last_read_offset;
};

class group_recovery_consumer {
Expand Down
9 changes: 3 additions & 6 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,10 @@ ss::future<error_code> replicated_partition::validate_fetch_offset(
if (reading_from_follower && !_partition->is_leader()) {
auto ec = error_code::none;

const std::pair<model::offset, model::offset> bounds = std::minmax(
const auto available_to_read = std::min(
leader_high_watermark(), log_end_offset());
const auto effective_log_end_offset = bounds.second;
const auto available_to_read = bounds.first;
if (
fetch_offset < start_offset()
|| fetch_offset > effective_log_end_offset) {

if (fetch_offset < start_offset()) {
ec = error_code::offset_out_of_range;
} else if (fetch_offset > available_to_read) {
/**
Expand Down
13 changes: 11 additions & 2 deletions src/v/kafka/server/replicated_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,18 @@ class replicated_partition final : public kafka::partition_proxy::impl {

ss::future<std::optional<model::offset>>
get_leader_epoch_last_offset(kafka::leader_epoch) const final;

/**
* A leader epoch is used by Kafka clients to determine if a replica is up
* to date with the leader and to detect truncation.
*
* The leader epoch differs from Raft term as the term is updated when
* leader election starts. Whereas the leader epoch is updated after the
* state of the replica is determined. Therefore the leader epoch uses
* confirmed term instead of the simple term which is incremented every time
* the leader election starts.
*/
kafka::leader_epoch leader_epoch() const final {
return leader_epoch_from_term(_partition->term());
return leader_epoch_from_term(_partition->raft()->confirmed_term());
}

ss::future<error_code> validate_fetch_offset(
Expand Down
17 changes: 13 additions & 4 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,7 @@ consensus::do_append_entries(append_entries_request&& r) {
maybe_update_last_visible_index(last_visible);
_last_leader_visible_offset = std::max(
request_metadata.last_visible_index, _last_leader_visible_offset);
_confirmed_term = _term;
return f.then([this, reply, request_metadata] {
return maybe_update_follower_commit_idx(
model::offset(request_metadata.commit_index))
Expand All @@ -1925,11 +1926,11 @@ consensus::do_append_entries(append_entries_request&& r) {

// section 3
if (request_metadata.prev_log_index < last_log_offset) {
if (unlikely(request_metadata.prev_log_index < last_visible_index())) {
if (unlikely(request_metadata.prev_log_index < _commit_index)) {
reply.result = append_entries_reply::status::success;
// clamp dirty offset to the current commit index not to allow
// leader reasoning about follower log beyond that point
reply.last_dirty_log_index = last_visible_index();
reply.last_dirty_log_index = _commit_index;
reply.last_flushed_log_index = _commit_index;
vlog(
_ctxlog.info,
Expand All @@ -1955,19 +1956,26 @@ consensus::do_append_entries(append_entries_request&& r) {
truncate_at);
_probe->log_truncated();

_majority_replicated_index = std::min(
model::prev_offset(truncate_at), _majority_replicated_index);
_last_quorum_replicated_index = std::min(
model::prev_offset(truncate_at), _last_quorum_replicated_index);
// update flushed offset since truncation may happen to already
// flushed entries
_flushed_offset = std::min(
model::prev_offset(truncate_at), _flushed_offset);
// We are truncating the offset translator before truncating the log
// because if saving offset translator state fails, we will retry and
// eventually log and offset translator will become consistent. OTOH if
// log truncation were first and saving offset translator state failed,
// we wouldn't retry and log and offset translator could diverge.

return _offset_translator.truncate(truncate_at)
.then([this, truncate_at] {
return _log->truncate(storage::truncate_config(
truncate_at, _scheduling.default_iopc));
})
.then([this, truncate_at] {
_last_quorum_replicated_index = std::min(
model::prev_offset(truncate_at), _last_quorum_replicated_index);
// update flushed offset since truncation may happen to already
// flushed entries
_flushed_offset = std::min(
Expand Down Expand Up @@ -2012,6 +2020,7 @@ consensus::do_append_entries(append_entries_request&& r) {
maybe_update_last_visible_index(last_visible);
_last_leader_visible_offset = std::max(
m.last_visible_index, _last_leader_visible_offset);
_confirmed_term = _term;
return maybe_update_follower_commit_idx(model::offset(m.commit_index))
.then([this, ofs, target] {
return make_append_entries_reply(target, ofs);
Expand Down
26 changes: 15 additions & 11 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -711,17 +711,21 @@ class consensus {
// consensus state
model::offset _commit_index;
model::term_id _term;
// It's common to use raft log as a foundation for state machines:
// when a node becomes a leader it replays the log, reconstructs
// the state and becomes ready to serve the requests. However it is
// not enough for a node to become a leader, it should successfully
// replicate a new record to be sure that older records stored in
// the local log were actually replicated and do not constitute an
// artifact of the previously crashed leader. Redpanda uses a confi-
// guration batch for the initial replication to gain certainty. When
// commit index moves past the configuration batch _confirmed_term
// gets updated. So when _term==_confirmed_term it's safe to use
// local log to reconstruct the state.

/**
* A confirmed term is used to determine if the state of a replica is up to
* date after the leader election. Only after the confirmed term is equal to
* the current term one can reason about the Raft group state.
*
* On the leader the confirmed term is updated after first successful
* replication of a batch subsequent to a leader election. After the
* replication succeed leader is guaranteed to have up to date committed and
* visible offsets.
*
* On the follower the confirmed term is updated only when an append entries
* request from the current leader may be accepted and follower may return
* success.
*/
model::term_id _confirmed_term;
model::offset _flushed_offset{};

Expand Down

0 comments on commit c2fc38c

Please sign in to comment.