Skip to content

Commit

Permalink
Merge pull request #24637 from bharathv/fix_co
Browse files Browse the repository at this point in the history
tx/group compaction fixes
  • Loading branch information
bharathv authored Jan 3, 2025
2 parents dbb6265 + 0051463 commit 0eadf28
Show file tree
Hide file tree
Showing 62 changed files with 2,070 additions and 172 deletions.
5 changes: 3 additions & 2 deletions src/v/cloud_topics/dl_stm/dl_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ ss::future<> dl_stm::do_apply(const model::record_batch& batch) {
co_return;
}

ss::future<> dl_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) {
co_return;
ss::future<raft::local_snapshot_applied>
dl_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) {
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_topics/dl_stm/dl_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class dl_stm final : public raft::persisted_stm<> {
private:
ss::future<> do_apply(const model::record_batch& batch) override;

ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units u) override;
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,8 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
get_last_offset());
}

ss::future<> archival_metadata_stm::apply_local_snapshot(
ss::future<raft::local_snapshot_applied>
archival_metadata_stm::apply_local_snapshot(
raft::stm_snapshot_header header, iobuf&& data) {
auto snap = serde::from_iobuf<snapshot>(std::move(data));

Expand Down Expand Up @@ -1310,7 +1311,7 @@ ss::future<> archival_metadata_stm::apply_local_snapshot(
} else {
_last_clean_at = header.offset;
}
co_return;
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
ss::future<> do_apply(const model::record_batch& batch) override;
ss::future<> apply_raft_snapshot(const iobuf&) override;

ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
});
}

ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override {
auto holder = _gate.hold();
iobuf_parser parser(std::move(bytes));
Expand All @@ -144,6 +144,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
}
}
_kvs = std::move(snap.kv_data);
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ ss::future<> id_allocator_stm::write_snapshot() {
.finally([this] { _is_writing_snapshot = false; });
}

ss::future<>
ss::future<raft::local_snapshot_applied>
id_allocator_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) {
return ss::make_exception_future<>(
return ss::make_exception_future<raft::local_snapshot_applied>(
std::logic_error("id_allocator_stm doesn't support snapshots"));
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class id_allocator_stm final : public raft::persisted_stm<> {
advance_state(int64_t, model::timeout_clock::duration);

ss::future<> write_snapshot();
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,14 @@ ss::future<> log_eviction_stm::apply_raft_snapshot(const iobuf&) {
co_return;
}

ss::future<> log_eviction_stm::apply_local_snapshot(
ss::future<raft::local_snapshot_applied> log_eviction_stm::apply_local_snapshot(
raft::stm_snapshot_header header, iobuf&& data) {
auto snapshot = serde::from_iobuf<snapshot_data>(std::move(data));
vlog(
_log.info, "Applying snapshot {} at offset: {}", snapshot, header.offset);

_delete_records_eviction_offset = snapshot.effective_start_offset;
return ss::now();
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class log_eviction_stm
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

protected:
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;

ss::future<raft::stm_snapshot>
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/partition_properties_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ ss::future<iobuf> partition_properties_stm::take_snapshot(model::offset o) {
raft_snapshot{.writes_disabled = it->writes_disabled});
}

ss::future<> partition_properties_stm::apply_local_snapshot(
ss::future<raft::local_snapshot_applied>
partition_properties_stm::apply_local_snapshot(
raft::stm_snapshot_header header, iobuf&& buffer) {
vlog(_log.debug, "Applying local snapshot with offset {}", header.offset);
auto snapshot = serde::from_iobuf<local_snapshot>(std::move(buffer));
_state_snapshots = std::move(snapshot.state_updates);
co_return;
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot> partition_properties_stm::take_local_snapshot(
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition_properties_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class partition_properties_stm
writes_disabled are_writes_disabled() const;

protected:
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;

ss::future<raft::stm_snapshot>
Expand Down
30 changes: 24 additions & 6 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class request {
}
}

seq_t first_sequence() const { return _first_sequence; }
seq_t last_sequence() const { return _last_sequence; }
model::term_id term() const { return _term; }

void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
Expand Down Expand Up @@ -106,6 +110,14 @@ class request {
// Kafka clients only issue requests in batches of 5, the queue is fairly small
// at all times.
class requests {
private:
static constexpr int32_t requests_cached_max = 5;
// chunk size of the request containers to avoid wastage.
static constexpr size_t chunk_size = std::bit_ceil(
static_cast<unsigned long>(requests_cached_max));

using request_queue = ss::chunked_fifo<request_ptr, chunk_size>;

public:
result<request_ptr> try_emplace(
seq_t first, seq_t last, model::term_id current, bool reset_sequences);
Expand All @@ -118,17 +130,21 @@ class requests {
bool operator==(const requests&) const;
friend std::ostream& operator<<(std::ostream&, const requests&);

const request_queue& inflight_requests() const {
return _inflight_requests;
}

const request_queue& finished_requests() const {
return _finished_requests;
}

private:
static constexpr int32_t requests_cached_max = 5;
// chunk size of the request containers to avoid wastage.
static constexpr size_t chunk_size = std::bit_ceil(
static_cast<unsigned long>(requests_cached_max));
bool is_valid_sequence(seq_t incoming) const;
std::optional<request_ptr> last_request() const;
void gc_requests_from_older_terms(model::term_id current);
void reset(request_result_t::error_type);
ss::chunked_fifo<request_ptr, chunk_size> _inflight_requests;
ss::chunked_fifo<request_ptr, chunk_size> _finished_requests;
request_queue _inflight_requests;
request_queue _finished_requests;
friend producer_state;
};

Expand Down Expand Up @@ -271,6 +287,8 @@ class producer_state {
// progress transactions with older epoch.
void reset_with_new_epoch(model::producer_epoch new_epoch);

const requests& idempotent_request_state() const { return _requests; }

private:
prefix_logger& _logger;

Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/rm_group_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ class rm_group_proxy {

virtual ss::future<abort_group_tx_reply>
abort_group_tx_locally(abort_group_tx_request) = 0;

virtual ss::future<get_producers_reply>
get_group_producers_locally(get_producers_request) = 0;
};
} // namespace cluster
56 changes: 56 additions & 0 deletions src/v/cluster/rm_partition_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,60 @@ ss::future<abort_tx_reply> rm_partition_frontend::do_abort_tx(
});
}

ss::future<get_producers_reply>
rm_partition_frontend::get_producers_locally(get_producers_request request) {
get_producers_reply reply;
auto partition = _partition_manager.local().get(request.ntp);
if (!partition || !partition->is_leader()) {
reply.error_code = tx::errc::not_coordinator;
co_return reply;
}
reply.error_code = tx::errc::none;
auto stm = partition->raft()->stm_manager()->get<rm_stm>();
if (!stm) {
// maybe an internal (non data) partition
co_return reply;
}
const auto& producers = stm->get_producers();
reply.producer_count = producers.size();
for (const auto& [pid, state] : producers) {
producer_state_info producer_info;
producer_info.pid = state->id();
// fill in the idempotent producer state.
const auto& requests = state->idempotent_request_state();
for (const auto& request : requests.inflight_requests()) {
idempotent_request_info request_info;
request_info.first_sequence = request->first_sequence();
request_info.last_sequence = request->last_sequence();
request_info.term = request->term();
producer_info.inflight_requests.push_back(std::move(request_info));
}

for (const auto& request : requests.finished_requests()) {
idempotent_request_info request_info;
request_info.first_sequence = request->first_sequence();
request_info.last_sequence = request->last_sequence();
request_info.term = request->term();
producer_info.finished_requests.push_back(std::move(request_info));
}
producer_info.last_update = state->last_update_timestamp();

// Fill in transactional producer state, if any.
const auto& tx_state = state->transaction_state();
if (state->has_transaction_in_progress() && tx_state) {
producer_info.tx_begin_offset = tx_state->first;
producer_info.tx_end_offset = tx_state->last;
producer_info.tx_seq = tx_state->sequence;
producer_info.tx_timeout = tx_state->timeout;
producer_info.coordinator_partition
= tx_state->coordinator_partition;
}
reply.producers.push_back(std::move(producer_info));
if (reply.producers.size() > request.max_producers_to_include) {
break;
}
}
co_return reply;
}

} // namespace cluster
2 changes: 2 additions & 0 deletions src/v/cluster/rm_partition_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class rm_partition_frontend {
model::producer_identity,
model::tx_seq,
model::timeout_clock::duration);
ss::future<get_producers_reply>
get_producers_locally(get_producers_request);
ss::future<> stop() {
_as.request_abort();
return ss::make_ready_future<>();
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {
return model::offset(k_offset);
}

ss::future<>
ss::future<raft::local_snapshot_applied>
rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto units = co_await _state_lock.hold_write_lock();

Expand All @@ -1707,7 +1707,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data = co_await serde::read_async<tx_snapshot_v6>(data_parser);
} else {
vlog(_ctx_log.error, "Ignored snapshot version {}", hdr.version);
co_return;
co_return raft::local_snapshot_applied::no;
}

_highest_producer_id = std::max(
Expand Down Expand Up @@ -1792,6 +1792,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
snapshot_opt.value());
}
}
co_return raft::local_snapshot_applied::yes;
}

uint8_t rm_stm::active_snapshot_version() {
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class rm_stm final : public raft::persisted_stm<> {
tx::producer_ptr,
std::optional<model::tx_seq>,
model::timeout_clock::duration);
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ fragmented_vector<tx_metadata> tm_stm::get_transactions_list() const {
return ret;
}

ss::future<>
ss::future<raft::local_snapshot_applied>
tm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tm_ss_buf) {
vassert(
hdr.version >= tm_snapshot_v0::version
Expand All @@ -625,7 +625,7 @@ tm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tm_ss_buf) {
vlog(_ctx_log.trace, "Applied snapshot at offset: {}", hdr.offset);
}

return ss::now();
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class tm_stm final : public raft::persisted_stm<> {

private:
std::optional<tx_metadata> find_tx(const kafka::transactional_id&);
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/tx_gateway.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,10 @@ ss::future<find_coordinator_reply> tx_gateway::find_coordinator(
co_return co_await _tx_gateway_frontend.local().find_coordinator(r.tid);
}

ss::future<get_producers_reply> tx_gateway::get_producers(
get_producers_request request, rpc::streaming_context&) {
co_return co_await _tx_gateway_frontend.local().get_producers(
std::move(request));
}

} // namespace cluster
3 changes: 3 additions & 0 deletions src/v/cluster/tx_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class tx_gateway final : public tx_gateway_service {
ss::future<find_coordinator_reply> find_coordinator(
find_coordinator_request, rpc::streaming_context&) override;

ss::future<get_producers_reply>
get_producers(get_producers_request, rpc::streaming_context&) override;

private:
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
rm_group_proxy* _rm_group_proxy;
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/tx_gateway.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
"name": "find_coordinator",
"input_type": "find_coordinator_request",
"output_type": "find_coordinator_reply"
},
{
"name": "get_producers",
"input_type": "get_producers_request",
"output_type": "get_producers_reply"
}
]
}
Loading

0 comments on commit 0eadf28

Please sign in to comment.