Skip to content

Commit

Permalink
tx/group_tx_stm: invalidate existing snapshots for the stm
Browse files Browse the repository at this point in the history
Bumps the supported snapshot version so the existing snapshots are
invalidated as they may contain stale max_collectible_offset. This forces
the stm to reconstruct the state form the log and recompute correct
max_collectible_offset.

(cherry picked from commit 0051463)
  • Loading branch information
bharathv committed Jan 4, 2025
1 parent 09a3bcf commit 206a512
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/v/kafka/server/group_tx_tracker_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,12 @@ model::offset group_tx_tracker_stm::max_collectible_offset() {

ss::future<raft::local_snapshot_applied>
group_tx_tracker_stm::apply_local_snapshot(
raft::stm_snapshot_header, iobuf&& snap_buf) {
raft::stm_snapshot_header header, iobuf&& snap_buf) {
auto holder = _gate.hold();
if (header.version != supported_local_snapshot_version) {
// fall back to applying from the log
co_return raft::local_snapshot_applied::no;
}
iobuf_parser parser(std::move(snap_buf));
auto snap = co_await serde::read_async<snapshot>(parser);
_all_txs = std::move(snap.transactions);
Expand All @@ -150,8 +154,8 @@ group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
iobuf snap_buf;
apply_units.return_all();
co_await serde::write_async(snap_buf, snap);
// snapshot versioning handled via serde.
co_return raft::stm_snapshot::create(0, offset, std::move(snap_buf));
co_return raft::stm_snapshot::create(
supported_local_snapshot_version, offset, std::move(snap_buf));
}

ss::future<> group_tx_tracker_stm::apply_raft_snapshot(const iobuf&) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/group_tx_tracker_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class group_tx_tracker_stm final
const all_txs_t& inflight_transactions() const { return _all_txs; }

private:
static constexpr int8_t supported_local_snapshot_version = 1;
struct snapshot
: serde::envelope<snapshot, serde::version<0>, serde::compat_version<0>> {
all_txs_t transactions;
Expand Down

0 comments on commit 206a512

Please sign in to comment.