Skip to content

Commit

Permalink
rm_stm/tests: add a high concurrency test for producer eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Dec 10, 2024
1 parent 48371f4 commit 6e800c2
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 5 deletions.
50 changes: 45 additions & 5 deletions src/v/cluster/tests/rm_stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#pragma once
#include "cluster/producer_state_manager.h"
#include "cluster/rm_stm.h"
#include "config/property.h"
#include "config/mock_property.h"
#include "raft/tests/simple_raft_fixture.h"

#include <seastar/core/sharded.hh>
Expand All @@ -23,12 +23,14 @@ static prefix_logger ctx_logger{logger, ""};
struct rm_stm_test_fixture : simple_raft_fixture {
void create_stm_and_start_raft(
storage::ntp_config::default_overrides overrides = {}) {
max_concurent_producers.start(std::numeric_limits<size_t>::max()).get();
producer_expiration_ms.start(std::chrono::milliseconds::max()).get();
producer_state_manager
.start(
config::mock_binding(std::numeric_limits<uint64_t>::max()),
config::mock_binding(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds::max())),
ss::sharded_parameter(
[this] { return max_concurent_producers.local().bind(); }),
ss::sharded_parameter(
[this] { return producer_expiration_ms.local().bind(); }),
config::mock_binding(std::numeric_limits<uint64_t>::max()))
.get();
producer_state_manager
Expand All @@ -55,6 +57,8 @@ struct rm_stm_test_fixture : simple_raft_fixture {
if (_started) {
stop_all();
producer_state_manager.stop().get();
producer_expiration_ms.stop().get();
max_concurent_producers.stop().get();
}
}

Expand All @@ -66,6 +70,17 @@ struct rm_stm_test_fixture : simple_raft_fixture {
return _stm->do_take_local_snapshot(version, {});
}

void update_producer_expiration(std::chrono::milliseconds value) {
producer_expiration_ms
.invoke_on_all(
[value](auto& local) mutable { local.update(std::move(value)); })
.get();
}

auto apply_raft_snapshot(const iobuf& buf) {
return _stm->apply_raft_snapshot(buf);
}

auto apply_snapshot(raft::stm_snapshot_header hdr, iobuf buf) {
return _stm->apply_local_snapshot(hdr, std::move(buf));
}
Expand All @@ -77,6 +92,31 @@ struct rm_stm_test_fixture : simple_raft_fixture {

auto get_expired_producers() const { return _stm->get_expired_producers(); }

auto stm_read_lock() { return _stm->_state_lock.hold_read_lock(); }

auto maybe_create_producer(model::producer_identity pid) {
return stm_read_lock().then([pid, this](auto /*units*/) {
return _stm->maybe_create_producer(pid);
});
}

auto reset_producers() {
return _stm->_state_lock.hold_write_lock().then([this](auto units) {
return _stm->reset_producers().then([units = std::move(units)] {});
});
}

auto rearm_eviction_timer(std::chrono::milliseconds period) {
return producer_state_manager
.invoke_on_all([period](auto& mgr) {
return mgr.rearm_eviction_timer_for_testing(period);
})
.get();
}

ss::sharded<config::mock_property<size_t>> max_concurent_producers;
ss::sharded<config::mock_property<std::chrono::milliseconds>>
producer_expiration_ms;
ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
ss::sharded<cluster::tx::producer_state_manager> producer_state_manager;
ss::shared_ptr<cluster::rm_stm> _stm;
Expand Down
100 changes: 100 additions & 0 deletions src/v/cluster/tests/rm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cluster/tests/randoms.h"
#include "cluster/tests/rm_stm_test_fixture.h"
#include "finjector/hbadger.h"
#include "finjector/stress_fiber.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
Expand Down Expand Up @@ -931,3 +932,102 @@ FIXTURE_TEST(test_tx_expiration_without_data_batches, rm_stm_test_fixture) {
!= expired.end();
}).get();
}

/*
* This test ensures concurrent evictions can happen in the presence of
* replication operations and operations that reset the state (snapshots,
* partition stop).
*/
FIXTURE_TEST(test_concurrent_producer_evictions, rm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;
stm.start().get0();
stm.testing_only_disable_auto_abort();

wait_for_confirmed_leader();
wait_for_meta_initialized();

// Ensure eviction runs with higher frequency
// and evicts everything possible.
update_producer_expiration(0ms);
rearm_eviction_timer(1ms);

stress_fiber_manager stress_mgr;
stress_mgr.start(
{.min_spins_per_scheduling_point = random_generators::get_int(50, 100),
.max_spins_per_scheduling_point = random_generators::get_int(500, 1000),
.num_fibers = random_generators::get_int<size_t>(5, 10)});
auto stop = ss::defer([&stress_mgr] { stress_mgr.stop().get(); });

int64_t counter= 0;
ss::abort_source as;
ss::gate gate;
size_t max_replication_fibers = 1000;

// simulates replication.
// In each iteration of the loop, we create some producers and randomly
// hold the producer state lock on some of them(thus preventing eviction).
// This is roughly the lifecycle of replicate requests using a producer
// state. This creates stream of producer states in a tight loop, some
// evictable and some non evictable while eviction constantly runs in the
// background.
auto replicate_f = ss::do_until(
[&as] { return as.abort_requested(); },
[&, this] {
std::vector<ss::future<>> spawn_replicate_futures;
for (int i = 0; i < 5; i++) {
auto maybe_replicate_f
= maybe_create_producer(model::producer_identity{counter++, 0})
.then([&, this](auto result) {
auto producer = result.first;
if (
gate.get_count() < max_replication_fibers
&& tests::random_bool()) {
// simulates replication.
ssx::spawn_with_gate(gate, [this, producer] {
return stm_read_lock().then([producer](
auto stm_units) {
return producer
->run_with_lock([](auto units) {
auto sleep_ms
= std::chrono::milliseconds{
random_generators::get_int(3)};
return ss::sleep(sleep_ms).finally(
[units = std::move(units)] {});
})
.handle_exception_type(
[producer](
const ss::gate_closed_exception&) {
vlog(
logger.info,
"producer {} already evicted, "
"ignoring",
producer->id());
})
.finally(
[producer,
stm_units = std::move(stm_units)] {});
});
});
}
});
spawn_replicate_futures.push_back(std::move(maybe_replicate_f));
}

return ss::when_all_succeed(std::move(spawn_replicate_futures))
.then([]() { return ss::sleep(1ms); });
});

// simulates raft snapshot application / partition shutdown
// applying a snapshot is stop the world operation that resets
// all the producers.
auto reset_f = ss::do_until(
[&as] { return as.abort_requested(); },
[&, this] {
return reset_producers().then([] { return ss::sleep(3ms); });
});

ss::sleep(20s).finally([&as] { as.request_abort(); }).get();
ss::when_all_succeed(std::move(replicate_f), std::move(reset_f)).get();
gate.close().get();
}
2 changes: 2 additions & 0 deletions src/v/cluster/tests/tx_compaction_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ using cluster::tx_executor;
_data_dir = "test_dir_" + random_generators::gen_alphanum_string(6); \
stop_all(); \
producer_state_manager.stop().get(); \
producer_expiration_ms.stop().get(); \
max_concurent_producers.stop().get(); \
_stm = nullptr; \
}); \
wait_for_confirmed_leader(); \
Expand Down

0 comments on commit 6e800c2

Please sign in to comment.