From 206a512bf7309347a73b89ccc5a67993343c1b9c Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 10:34:04 -0800 Subject: [PATCH] tx/group_tx_stm: invalidate existing snapshots for the stm Bumps the supported snapshot version so the existing snapshots are invalidated as they may contain stale max_collectible_offset. This forces the stm to reconstruct the state form the log and recompute correct max_collectible_offset. (cherry picked from commit 0051463f23316562812a88e77efb1598bfe87463) --- src/v/kafka/server/group_tx_tracker_stm.cc | 10 +++++++--- src/v/kafka/server/group_tx_tracker_stm.h | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index 7be610c55aa8..4f622974dce6 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -132,8 +132,12 @@ model::offset group_tx_tracker_stm::max_collectible_offset() { ss::future group_tx_tracker_stm::apply_local_snapshot( - raft::stm_snapshot_header, iobuf&& snap_buf) { + raft::stm_snapshot_header header, iobuf&& snap_buf) { auto holder = _gate.hold(); + if (header.version != supported_local_snapshot_version) { + // fall back to applying from the log + co_return raft::local_snapshot_applied::no; + } iobuf_parser parser(std::move(snap_buf)); auto snap = co_await serde::read_async(parser); _all_txs = std::move(snap.transactions); @@ -150,8 +154,8 @@ group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) { iobuf snap_buf; apply_units.return_all(); co_await serde::write_async(snap_buf, snap); - // snapshot versioning handled via serde. - co_return raft::stm_snapshot::create(0, offset, std::move(snap_buf)); + co_return raft::stm_snapshot::create( + supported_local_snapshot_version, offset, std::move(snap_buf)); } ss::future<> group_tx_tracker_stm::apply_raft_snapshot(const iobuf&) { diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 35d88c0d5130..9082d29bf984 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -123,6 +123,7 @@ class group_tx_tracker_stm final const all_txs_t& inflight_transactions() const { return _all_txs; } private: + static constexpr int8_t supported_local_snapshot_version = 1; struct snapshot : serde::envelope, serde::compat_version<0>> { all_txs_t transactions;