diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index be336115d8340..4f802de3e8a94 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -665,16 +665,31 @@ ss::future<> group_manager::handle_partition_leader_change( std::nullopt, std::nullopt, std::nullopt); - + auto expected_to_read = model::prev_offset( + p->partition->high_watermark()); return p->partition->make_reader(reader_config) - .then([this, term, p, timeout]( + .then([this, term, p, timeout, expected_to_read]( model::record_batch_reader reader) { return std::move(reader) .consume( group_recovery_consumer(_serializer_factory(), p->as), timeout) - .then([this, term, p]( + .then([this, term, p, expected_to_read]( group_recovery_consumer_state state) { + if (state.last_read_offset < expected_to_read) { + vlog( + klog.error, + "error recovering group state from {}. " + "Expected to read up to {} but last offset " + "consumed is equal to {}", + p->partition->ntp(), + expected_to_read, + state.last_read_offset); + // force step down to allow other node to + // recover group + return p->partition->raft()->step_down( + "unable to recover group, short read"); + } // avoid trying to recover if we stopped the // reader because an abort was requested if (p->as.abort_requested()) { diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index ee44c150f6b09..ae9bb621036b0 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -73,6 +73,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { if (_as.abort_requested()) { co_return ss::stop_iteration::yes; } + _state.last_read_offset = batch.last_offset(); if (batch.header().type == model::record_batch_type::raft_data) { _batch_base_offset = batch.base_offset(); co_await model::for_each_record(batch, [this](model::record& r) { diff --git a/src/v/kafka/server/group_recovery_consumer.h b/src/v/kafka/server/group_recovery_consumer.h index 9038533254af5..e4cd3d69988f5 100644 --- a/src/v/kafka/server/group_recovery_consumer.h +++ b/src/v/kafka/server/group_recovery_consumer.h @@ -31,6 +31,7 @@ struct group_recovery_consumer_state { * retention feature is activated. see group::offset_metadata for more info. */ bool has_offset_retention_feature_fence{false}; + model::offset last_read_offset; }; class group_recovery_consumer { diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 6627f7fffca6d..441509e425485 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -461,13 +461,10 @@ ss::future replicated_partition::validate_fetch_offset( if (reading_from_follower && !_partition->is_leader()) { auto ec = error_code::none; - const std::pair bounds = std::minmax( + const auto available_to_read = std::min( leader_high_watermark(), log_end_offset()); - const auto effective_log_end_offset = bounds.second; - const auto available_to_read = bounds.first; - if ( - fetch_offset < start_offset() - || fetch_offset > effective_log_end_offset) { + + if (fetch_offset < start_offset()) { ec = error_code::offset_out_of_range; } else if (fetch_offset > available_to_read) { /** diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 3916e6992ffab..7905cbbd83168 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -173,9 +173,18 @@ class replicated_partition final : public kafka::partition_proxy::impl { ss::future> get_leader_epoch_last_offset(kafka::leader_epoch) const final; - + /** + * A leader epoch is used by Kafka clients to determine if a replica is up + * to date with the leader and to detect truncation. + * + * The leader epoch differs from Raft term as the term is updated when + * leader election starts. Whereas the leader epoch is updated after the + * state of the replica is determined. Therefore the leader epoch uses + * confirmed term instead of the simple term which is incremented every time + * the leader election starts. + */ kafka::leader_epoch leader_epoch() const final { - return leader_epoch_from_term(_partition->term()); + return leader_epoch_from_term(_partition->raft()->confirmed_term()); } ss::future validate_fetch_offset( diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 139644a88077f..ff37006ad11c6 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1912,6 +1912,7 @@ consensus::do_append_entries(append_entries_request&& r) { maybe_update_last_visible_index(last_visible); _last_leader_visible_offset = std::max( request_metadata.last_visible_index, _last_leader_visible_offset); + _confirmed_term = _term; return f.then([this, reply, request_metadata] { return maybe_update_follower_commit_idx( model::offset(request_metadata.commit_index)) @@ -1925,11 +1926,11 @@ consensus::do_append_entries(append_entries_request&& r) { // section 3 if (request_metadata.prev_log_index < last_log_offset) { - if (unlikely(request_metadata.prev_log_index < last_visible_index())) { + if (unlikely(request_metadata.prev_log_index < _commit_index)) { reply.result = append_entries_reply::status::success; // clamp dirty offset to the current commit index not to allow // leader reasoning about follower log beyond that point - reply.last_dirty_log_index = last_visible_index(); + reply.last_dirty_log_index = _commit_index; reply.last_flushed_log_index = _commit_index; vlog( _ctxlog.info, @@ -1955,19 +1956,26 @@ consensus::do_append_entries(append_entries_request&& r) { truncate_at); _probe->log_truncated(); + _majority_replicated_index = std::min( + model::prev_offset(truncate_at), _majority_replicated_index); + _last_quorum_replicated_index = std::min( + model::prev_offset(truncate_at), _last_quorum_replicated_index); + // update flushed offset since truncation may happen to already + // flushed entries + _flushed_offset = std::min( + model::prev_offset(truncate_at), _flushed_offset); // We are truncating the offset translator before truncating the log // because if saving offset translator state fails, we will retry and // eventually log and offset translator will become consistent. OTOH if // log truncation were first and saving offset translator state failed, // we wouldn't retry and log and offset translator could diverge. + return _offset_translator.truncate(truncate_at) .then([this, truncate_at] { return _log->truncate(storage::truncate_config( truncate_at, _scheduling.default_iopc)); }) .then([this, truncate_at] { - _last_quorum_replicated_index = std::min( - model::prev_offset(truncate_at), _last_quorum_replicated_index); // update flushed offset since truncation may happen to already // flushed entries _flushed_offset = std::min( @@ -2012,6 +2020,7 @@ consensus::do_append_entries(append_entries_request&& r) { maybe_update_last_visible_index(last_visible); _last_leader_visible_offset = std::max( m.last_visible_index, _last_leader_visible_offset); + _confirmed_term = _term; return maybe_update_follower_commit_idx(model::offset(m.commit_index)) .then([this, ofs, target] { return make_append_entries_reply(target, ofs); diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index c05e9285da927..b47ccaf60db2b 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -711,17 +711,21 @@ class consensus { // consensus state model::offset _commit_index; model::term_id _term; - // It's common to use raft log as a foundation for state machines: - // when a node becomes a leader it replays the log, reconstructs - // the state and becomes ready to serve the requests. However it is - // not enough for a node to become a leader, it should successfully - // replicate a new record to be sure that older records stored in - // the local log were actually replicated and do not constitute an - // artifact of the previously crashed leader. Redpanda uses a confi- - // guration batch for the initial replication to gain certainty. When - // commit index moves past the configuration batch _confirmed_term - // gets updated. So when _term==_confirmed_term it's safe to use - // local log to reconstruct the state. + + /** + * A confirmed term is used to determine if the state of a replica is up to + * date after the leader election. Only after the confirmed term is equal to + * the current term one can reason about the Raft group state. + * + * On the leader the confirmed term is updated after first successful + * replication of a batch subsequent to a leader election. After the + * replication succeed leader is guaranteed to have up to date committed and + * visible offsets. + * + * On the follower the confirmed term is updated only when an append entries + * request from the current leader may be accepted and follower may return + * success. + */ model::term_id _confirmed_term; model::offset _flushed_offset{};