diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index d4440c95a56f..4813708aad7d 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -129,8 +129,12 @@ model::offset group_tx_tracker_stm::max_collectible_offset() { ss::future group_tx_tracker_stm::apply_local_snapshot( - raft::stm_snapshot_header, iobuf&& snap_buf) { + raft::stm_snapshot_header header, iobuf&& snap_buf) { auto holder = _gate.hold(); + if (header.version != supported_local_snapshot_version) { + // fall back to applying from the log + co_return raft::local_snapshot_applied::no; + } iobuf_parser parser(std::move(snap_buf)); auto snap = co_await serde::read_async(parser); _all_txs = std::move(snap.transactions); @@ -147,8 +151,8 @@ group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) { iobuf snap_buf; apply_units.return_all(); co_await serde::write_async(snap_buf, snap); - // snapshot versioning handled via serde. - co_return raft::stm_snapshot::create(0, offset, std::move(snap_buf)); + co_return raft::stm_snapshot::create( + supported_local_snapshot_version, offset, std::move(snap_buf)); } ss::future<> group_tx_tracker_stm::apply_raft_snapshot(const iobuf&) { diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 3e394a1b7e79..bde9ce1f8eb0 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -126,6 +126,7 @@ class group_tx_tracker_stm final const all_txs_t& inflight_transactions() const { return _all_txs; } private: + static constexpr int8_t supported_local_snapshot_version = 1; struct snapshot : serde::envelope, serde::compat_version<0>> { all_txs_t transactions;