From b8e40d0d15f93548bcd0a573cc96faaff814e18f Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 10:34:04 -0800 Subject: [PATCH] tx/group_tx_stm: invalidate existing snapshots for the stm Bumps the supported snapshot version so the existing snapshots are invalidated as they may contain stale max_collectible_offset. This forces the stm to reconstruct the state form the log and recompute correct max_collectible_offset. --- src/v/kafka/server/group_tx_tracker_stm.cc | 10 +++++++--- src/v/kafka/server/group_tx_tracker_stm.h | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index 0b06c5ede3112..42f3bd83ef53a 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -62,8 +62,12 @@ model::offset group_tx_tracker_stm::max_collectible_offset() { ss::future group_tx_tracker_stm::apply_local_snapshot( - raft::stm_snapshot_header, iobuf&& snap_buf) { + raft::stm_snapshot_header header, iobuf&& snap_buf) { auto holder = _gate.hold(); + if (header.version != supported_local_snapshot_version) { + // fall back to applying from the log + co_return raft::local_snapshot_applied::no; + } iobuf_parser parser(std::move(snap_buf)); auto snap = co_await serde::read_async(parser); _all_txs = std::move(snap.transactions); @@ -80,8 +84,8 @@ group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) { iobuf snap_buf; apply_units.return_all(); co_await serde::write_async(snap_buf, snap); - // snapshot versioning handled via serde. - co_return raft::stm_snapshot::create(0, offset, std::move(snap_buf)); + co_return raft::stm_snapshot::create( + supported_local_snapshot_version, offset, std::move(snap_buf)); } ss::future<> group_tx_tracker_stm::apply_raft_snapshot(const iobuf&) { diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index dbcb342810d5d..a0107d0305c2e 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -99,6 +99,7 @@ class group_tx_tracker_stm final const all_txs_t& inflight_transactions() const { return _all_txs; } private: + static constexpr int8_t supported_local_snapshot_version = 1; struct snapshot : serde::envelope, serde::compat_version<0>> { all_txs_t transactions;