From b4f9ca67880e31d98247dc712f7f817d3b5729a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 29 Mar 2024 11:25:00 +0100 Subject: [PATCH 1/6] Revert "r/consensus: do not allow follower to truncate log that is consumable" This reverts commit 3d9a794c2fa339f540d71f5b212e2a8c500a4dd9. (cherry picked from commit eff61b3a29b77f5ba3474ee96834afb18ecd0c86) --- src/v/raft/consensus.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 139644a88077f..80bef8ff544bc 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1925,11 +1925,11 @@ consensus::do_append_entries(append_entries_request&& r) { // section 3 if (request_metadata.prev_log_index < last_log_offset) { - if (unlikely(request_metadata.prev_log_index < last_visible_index())) { + if (unlikely(request_metadata.prev_log_index < _commit_index)) { reply.result = append_entries_reply::status::success; // clamp dirty offset to the current commit index not to allow // leader reasoning about follower log beyond that point - reply.last_dirty_log_index = last_visible_index(); + reply.last_dirty_log_index = _commit_index; reply.last_flushed_log_index = _commit_index; vlog( _ctxlog.info, From f09ab54f869e344f2343aaca0b386d5fe01cbe48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 29 Mar 2024 13:12:29 +0100 Subject: [PATCH 2/6] r/consensus: changed order of truncate and offset updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When truncation happens all the offsets should be updated before it finishes as the truncated data should not be accessed during and immediately after truncation. Updating offsets before truncation guarantees the state is up to data right after the truncation finishes. Signed-off-by: Michał Maślanka (cherry picked from commit a1097d435d8da11728d3c0e2e876d1c165d7e78e) --- src/v/raft/consensus.cc | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 80bef8ff544bc..d0f66bd14e587 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1955,19 +1955,26 @@ consensus::do_append_entries(append_entries_request&& r) { truncate_at); _probe->log_truncated(); + _majority_replicated_index = std::min( + model::prev_offset(truncate_at), _majority_replicated_index); + _last_quorum_replicated_index = std::min( + model::prev_offset(truncate_at), _last_quorum_replicated_index); + // update flushed offset since truncation may happen to already + // flushed entries + _flushed_offset = std::min( + model::prev_offset(truncate_at), _flushed_offset); // We are truncating the offset translator before truncating the log // because if saving offset translator state fails, we will retry and // eventually log and offset translator will become consistent. OTOH if // log truncation were first and saving offset translator state failed, // we wouldn't retry and log and offset translator could diverge. + return _offset_translator.truncate(truncate_at) .then([this, truncate_at] { return _log->truncate(storage::truncate_config( truncate_at, _scheduling.default_iopc)); }) .then([this, truncate_at] { - _last_quorum_replicated_index = std::min( - model::prev_offset(truncate_at), _last_quorum_replicated_index); // update flushed offset since truncation may happen to already // flushed entries _flushed_offset = std::min( From 9ea3059b5afcd722008908a91878028bf63d4acd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 29 Mar 2024 13:16:24 +0100 Subject: [PATCH 3/6] k/replicated_partition: do not return offset out of range from follower MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When reading from the follower the offset information may not be up to date. In Redpanda we implemented the behavior that is described in `KIP-392`. The described mechanism assumes that consumer will reconcile with the leader after they receive the offset out of range error from the follower. This is not implemented by the clients hence returning out of range error resulted in offset reset. With the fix in this commit we will never return the out of range error from a node not being a leader. This way consumers will only use leader to validate its offset and we will avoid spurious resets. Signed-off-by: Michał Maślanka (cherry picked from commit 0c6354cc3363bd049d17e6bf39e6290019aa27ed) --- src/v/kafka/server/replicated_partition.cc | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 6627f7fffca6d..441509e425485 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -461,13 +461,10 @@ ss::future replicated_partition::validate_fetch_offset( if (reading_from_follower && !_partition->is_leader()) { auto ec = error_code::none; - const std::pair bounds = std::minmax( + const auto available_to_read = std::min( leader_high_watermark(), log_end_offset()); - const auto effective_log_end_offset = bounds.second; - const auto available_to_read = bounds.first; - if ( - fetch_offset < start_offset() - || fetch_offset > effective_log_end_offset) { + + if (fetch_offset < start_offset()) { ec = error_code::offset_out_of_range; } else if (fetch_offset > available_to_read) { /** From 2d313edbc471cb124b156486b2dfe25cb0ce8cad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 5 Apr 2024 14:31:44 +0200 Subject: [PATCH 4/6] k/group: abort recovery when the read is incomplete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If storage layer will return early from the reader the group stm is not fully recovered. In this case a consumer offset coordinator can not start replying to client requests as the in memory consumer group representation may be incomplete. Added a code preventing this situation from happening. When incomplete read happened the group manager requests the underlying partition leader to step down. This will trigger another leader election and will trigger leadership notification leading to group recovery. Signed-off-by: Michał Maślanka (cherry picked from commit 9d75795e4000cb50a33627fe871bc0d1d513dd39) --- src/v/kafka/server/group_manager.cc | 21 ++++++++++++++++--- src/v/kafka/server/group_recovery_consumer.cc | 1 + src/v/kafka/server/group_recovery_consumer.h | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index be336115d8340..4f802de3e8a94 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -665,16 +665,31 @@ ss::future<> group_manager::handle_partition_leader_change( std::nullopt, std::nullopt, std::nullopt); - + auto expected_to_read = model::prev_offset( + p->partition->high_watermark()); return p->partition->make_reader(reader_config) - .then([this, term, p, timeout]( + .then([this, term, p, timeout, expected_to_read]( model::record_batch_reader reader) { return std::move(reader) .consume( group_recovery_consumer(_serializer_factory(), p->as), timeout) - .then([this, term, p]( + .then([this, term, p, expected_to_read]( group_recovery_consumer_state state) { + if (state.last_read_offset < expected_to_read) { + vlog( + klog.error, + "error recovering group state from {}. " + "Expected to read up to {} but last offset " + "consumed is equal to {}", + p->partition->ntp(), + expected_to_read, + state.last_read_offset); + // force step down to allow other node to + // recover group + return p->partition->raft()->step_down( + "unable to recover group, short read"); + } // avoid trying to recover if we stopped the // reader because an abort was requested if (p->as.abort_requested()) { diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index ee44c150f6b09..ae9bb621036b0 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -73,6 +73,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { if (_as.abort_requested()) { co_return ss::stop_iteration::yes; } + _state.last_read_offset = batch.last_offset(); if (batch.header().type == model::record_batch_type::raft_data) { _batch_base_offset = batch.base_offset(); co_await model::for_each_record(batch, [this](model::record& r) { diff --git a/src/v/kafka/server/group_recovery_consumer.h b/src/v/kafka/server/group_recovery_consumer.h index 9038533254af5..e4cd3d69988f5 100644 --- a/src/v/kafka/server/group_recovery_consumer.h +++ b/src/v/kafka/server/group_recovery_consumer.h @@ -31,6 +31,7 @@ struct group_recovery_consumer_state { * retention feature is activated. see group::offset_metadata for more info. */ bool has_offset_retention_feature_fence{false}; + model::offset last_read_offset; }; class group_recovery_consumer { From 77ce3c96593ec3c437a126b90e8ab92aa7ea5acc Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 9 Feb 2024 11:04:18 +0100 Subject: [PATCH 5/6] r/consensus: extend confirmed term semantics to apply for the follower A confirmed term is used to determine if the state of a replica is up to date after the leader election. Only after the confirmed term is equal to the current term one can reason about the Raft group state. On the leader the confirmed term is updated after first successful replication of a batch subsequent to a leader election. After the replication succeed leader is guaranteed to have up to date committed and visible offsets. On the follower the confirmed term is updated only when an append entries request from the current leader may be accepted and follower may return success. Signed-off-by: Michal Maslanka (cherry picked from commit 6037462dfa080c23df509db0d766d5338eccef21) --- src/v/raft/consensus.cc | 2 ++ src/v/raft/consensus.h | 26 +++++++++++++++----------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index d0f66bd14e587..ff37006ad11c6 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1912,6 +1912,7 @@ consensus::do_append_entries(append_entries_request&& r) { maybe_update_last_visible_index(last_visible); _last_leader_visible_offset = std::max( request_metadata.last_visible_index, _last_leader_visible_offset); + _confirmed_term = _term; return f.then([this, reply, request_metadata] { return maybe_update_follower_commit_idx( model::offset(request_metadata.commit_index)) @@ -2019,6 +2020,7 @@ consensus::do_append_entries(append_entries_request&& r) { maybe_update_last_visible_index(last_visible); _last_leader_visible_offset = std::max( m.last_visible_index, _last_leader_visible_offset); + _confirmed_term = _term; return maybe_update_follower_commit_idx(model::offset(m.commit_index)) .then([this, ofs, target] { return make_append_entries_reply(target, ofs); diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index c05e9285da927..b47ccaf60db2b 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -711,17 +711,21 @@ class consensus { // consensus state model::offset _commit_index; model::term_id _term; - // It's common to use raft log as a foundation for state machines: - // when a node becomes a leader it replays the log, reconstructs - // the state and becomes ready to serve the requests. However it is - // not enough for a node to become a leader, it should successfully - // replicate a new record to be sure that older records stored in - // the local log were actually replicated and do not constitute an - // artifact of the previously crashed leader. Redpanda uses a confi- - // guration batch for the initial replication to gain certainty. When - // commit index moves past the configuration batch _confirmed_term - // gets updated. So when _term==_confirmed_term it's safe to use - // local log to reconstruct the state. + + /** + * A confirmed term is used to determine if the state of a replica is up to + * date after the leader election. Only after the confirmed term is equal to + * the current term one can reason about the Raft group state. + * + * On the leader the confirmed term is updated after first successful + * replication of a batch subsequent to a leader election. After the + * replication succeed leader is guaranteed to have up to date committed and + * visible offsets. + * + * On the follower the confirmed term is updated only when an append entries + * request from the current leader may be accepted and follower may return + * success. + */ model::term_id _confirmed_term; model::offset _flushed_offset{}; From 5042dfbdcbbc18963fe71a5f831798cefc4df816 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 9 Feb 2024 11:05:51 +0100 Subject: [PATCH 6/6] k/replicated_partition: use confirmed term as a source of leader epoch A leader epoch is used by Kafka clients to determine if a replica is up to date with the leader and to detect truncation. The leader epoch differs from Raft term as the term is updated when leader election starts. Whereas the leader epoch is updated after the state of the replica is determined. Therefore the leader epoch uses confirmed term instead of the simple term which is incremented every time the leader election starts. Signed-off-by: Michal Maslanka (cherry picked from commit e746f79bd85a288b378a1dcbcdb6d0c9cf25f853) --- src/v/kafka/server/replicated_partition.h | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 3916e6992ffab..7905cbbd83168 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -173,9 +173,18 @@ class replicated_partition final : public kafka::partition_proxy::impl { ss::future> get_leader_epoch_last_offset(kafka::leader_epoch) const final; - + /** + * A leader epoch is used by Kafka clients to determine if a replica is up + * to date with the leader and to detect truncation. + * + * The leader epoch differs from Raft term as the term is updated when + * leader election starts. Whereas the leader epoch is updated after the + * state of the replica is determined. Therefore the leader epoch uses + * confirmed term instead of the simple term which is incremented every time + * the leader election starts. + */ kafka::leader_epoch leader_epoch() const final { - return leader_epoch_from_term(_partition->term()); + return leader_epoch_from_term(_partition->raft()->confirmed_term()); } ss::future validate_fetch_offset(