From 6e800c242ded8cff95f4ce08225b76049ee1c0a1 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 8 Dec 2024 19:41:00 -0800 Subject: [PATCH] rm_stm/tests: add a high concurrency test for producer eviction --- src/v/cluster/tests/rm_stm_test_fixture.h | 50 +++++++++-- src/v/cluster/tests/rm_stm_tests.cc | 100 +++++++++++++++++++++ src/v/cluster/tests/tx_compaction_tests.cc | 2 + 3 files changed, 147 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/tests/rm_stm_test_fixture.h b/src/v/cluster/tests/rm_stm_test_fixture.h index fc86cdc53e137..2637de1a83e6e 100644 --- a/src/v/cluster/tests/rm_stm_test_fixture.h +++ b/src/v/cluster/tests/rm_stm_test_fixture.h @@ -10,7 +10,7 @@ #pragma once #include "cluster/producer_state_manager.h" #include "cluster/rm_stm.h" -#include "config/property.h" +#include "config/mock_property.h" #include "raft/tests/simple_raft_fixture.h" #include @@ -23,12 +23,14 @@ static prefix_logger ctx_logger{logger, ""}; struct rm_stm_test_fixture : simple_raft_fixture { void create_stm_and_start_raft( storage::ntp_config::default_overrides overrides = {}) { + max_concurent_producers.start(std::numeric_limits::max()).get(); + producer_expiration_ms.start(std::chrono::milliseconds::max()).get(); producer_state_manager .start( - config::mock_binding(std::numeric_limits::max()), - config::mock_binding( - std::chrono::duration_cast( - std::chrono::microseconds::max())), + ss::sharded_parameter( + [this] { return max_concurent_producers.local().bind(); }), + ss::sharded_parameter( + [this] { return producer_expiration_ms.local().bind(); }), config::mock_binding(std::numeric_limits::max())) .get(); producer_state_manager @@ -55,6 +57,8 @@ struct rm_stm_test_fixture : simple_raft_fixture { if (_started) { stop_all(); producer_state_manager.stop().get(); + producer_expiration_ms.stop().get(); + max_concurent_producers.stop().get(); } } @@ -66,6 +70,17 @@ struct rm_stm_test_fixture : simple_raft_fixture { return _stm->do_take_local_snapshot(version, {}); } + void update_producer_expiration(std::chrono::milliseconds value) { + producer_expiration_ms + .invoke_on_all( + [value](auto& local) mutable { local.update(std::move(value)); }) + .get(); + } + + auto apply_raft_snapshot(const iobuf& buf) { + return _stm->apply_raft_snapshot(buf); + } + auto apply_snapshot(raft::stm_snapshot_header hdr, iobuf buf) { return _stm->apply_local_snapshot(hdr, std::move(buf)); } @@ -77,6 +92,31 @@ struct rm_stm_test_fixture : simple_raft_fixture { auto get_expired_producers() const { return _stm->get_expired_producers(); } + auto stm_read_lock() { return _stm->_state_lock.hold_read_lock(); } + + auto maybe_create_producer(model::producer_identity pid) { + return stm_read_lock().then([pid, this](auto /*units*/) { + return _stm->maybe_create_producer(pid); + }); + } + + auto reset_producers() { + return _stm->_state_lock.hold_write_lock().then([this](auto units) { + return _stm->reset_producers().then([units = std::move(units)] {}); + }); + } + + auto rearm_eviction_timer(std::chrono::milliseconds period) { + return producer_state_manager + .invoke_on_all([period](auto& mgr) { + return mgr.rearm_eviction_timer_for_testing(period); + }) + .get(); + } + + ss::sharded> max_concurent_producers; + ss::sharded> + producer_expiration_ms; ss::sharded tx_gateway_frontend; ss::sharded producer_state_manager; ss::shared_ptr _stm; diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index 8477c91ce4ff4..36ab2d94c32e1 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -12,6 +12,7 @@ #include "cluster/tests/randoms.h" #include "cluster/tests/rm_stm_test_fixture.h" #include "finjector/hbadger.h" +#include "finjector/stress_fiber.h" #include "model/fundamental.h" #include "model/metadata.h" #include "model/record.h" @@ -931,3 +932,102 @@ FIXTURE_TEST(test_tx_expiration_without_data_batches, rm_stm_test_fixture) { != expired.end(); }).get(); } + +/* + * This test ensures concurrent evictions can happen in the presence of + * replication operations and operations that reset the state (snapshots, + * partition stop). + */ +FIXTURE_TEST(test_concurrent_producer_evictions, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; + stm.start().get0(); + stm.testing_only_disable_auto_abort(); + + wait_for_confirmed_leader(); + wait_for_meta_initialized(); + + // Ensure eviction runs with higher frequency + // and evicts everything possible. + update_producer_expiration(0ms); + rearm_eviction_timer(1ms); + + stress_fiber_manager stress_mgr; + stress_mgr.start( + {.min_spins_per_scheduling_point = random_generators::get_int(50, 100), + .max_spins_per_scheduling_point = random_generators::get_int(500, 1000), + .num_fibers = random_generators::get_int(5, 10)}); + auto stop = ss::defer([&stress_mgr] { stress_mgr.stop().get(); }); + + int64_t counter= 0; + ss::abort_source as; + ss::gate gate; + size_t max_replication_fibers = 1000; + + // simulates replication. + // In each iteration of the loop, we create some producers and randomly + // hold the producer state lock on some of them(thus preventing eviction). + // This is roughly the lifecycle of replicate requests using a producer + // state. This creates stream of producer states in a tight loop, some + // evictable and some non evictable while eviction constantly runs in the + // background. + auto replicate_f = ss::do_until( + [&as] { return as.abort_requested(); }, + [&, this] { + std::vector> spawn_replicate_futures; + for (int i = 0; i < 5; i++) { + auto maybe_replicate_f + = maybe_create_producer(model::producer_identity{counter++, 0}) + .then([&, this](auto result) { + auto producer = result.first; + if ( + gate.get_count() < max_replication_fibers + && tests::random_bool()) { + // simulates replication. + ssx::spawn_with_gate(gate, [this, producer] { + return stm_read_lock().then([producer]( + auto stm_units) { + return producer + ->run_with_lock([](auto units) { + auto sleep_ms + = std::chrono::milliseconds{ + random_generators::get_int(3)}; + return ss::sleep(sleep_ms).finally( + [units = std::move(units)] {}); + }) + .handle_exception_type( + [producer]( + const ss::gate_closed_exception&) { + vlog( + logger.info, + "producer {} already evicted, " + "ignoring", + producer->id()); + }) + .finally( + [producer, + stm_units = std::move(stm_units)] {}); + }); + }); + } + }); + spawn_replicate_futures.push_back(std::move(maybe_replicate_f)); + } + + return ss::when_all_succeed(std::move(spawn_replicate_futures)) + .then([]() { return ss::sleep(1ms); }); + }); + + // simulates raft snapshot application / partition shutdown + // applying a snapshot is stop the world operation that resets + // all the producers. + auto reset_f = ss::do_until( + [&as] { return as.abort_requested(); }, + [&, this] { + return reset_producers().then([] { return ss::sleep(3ms); }); + }); + + ss::sleep(20s).finally([&as] { as.request_abort(); }).get(); + ss::when_all_succeed(std::move(replicate_f), std::move(reset_f)).get(); + gate.close().get(); +} diff --git a/src/v/cluster/tests/tx_compaction_tests.cc b/src/v/cluster/tests/tx_compaction_tests.cc index 41c230b3ab05e..3f91805e4577a 100644 --- a/src/v/cluster/tests/tx_compaction_tests.cc +++ b/src/v/cluster/tests/tx_compaction_tests.cc @@ -25,6 +25,8 @@ using cluster::tx_executor; _data_dir = "test_dir_" + random_generators::gen_alphanum_string(6); \ stop_all(); \ producer_state_manager.stop().get(); \ + producer_expiration_ms.stop().get(); \ + max_concurent_producers.stop().get(); \ _stm = nullptr; \ }); \ wait_for_confirmed_leader(); \