Skip to content

Commit

Permalink
producer_state_mgr: limit number of evictions per tick
Browse files Browse the repository at this point in the history
.. to avoid reactor stalls.
  • Loading branch information
bharathv committed May 22, 2024
1 parent ddca893 commit 60868ed
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/v/cluster/producer_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ void producer_state_manager::do_evict_excess_producers() {
}
vlog(clusterlog.debug, "producer eviction tick");
auto it = _lru_producers.begin();
while (it != _lru_producers.end() && can_evict_producer(*it)) {
// to avoid reactor stalls.
static constexpr auto max_evictions_per_tick = 10000;
int evicted_so_far = 0;
while (evicted_so_far++ < max_evictions_per_tick
&& it != _lru_producers.end() && can_evict_producer(*it)) {
auto it_copy = it;
++it;
auto& state = *it_copy;
Expand Down
33 changes: 33 additions & 0 deletions src/v/cluster/tests/producer_state_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ struct test_fixture {
producers.clear();
}

void local_eviction_tick() { manager().do_evict_excess_producers(); }

long _counter = 0;
ss::sharded<config::mock_property<uint64_t>> _max_producers;
ss::sharded<cluster::producer_state_manager> _psm;
Expand Down Expand Up @@ -197,3 +199,34 @@ FIXTURE_TEST(test_eviction_expired_pids, test_fixture) {
10s, [&] { return evicted_so_far == total_producers; });
clean(producers);
}

FIXTURE_TEST(test_evict_many_producers_at_once, test_fixture) {
// disable eviction timer.
// we manually invoke it below.
_psm
.invoke_on_all([](cluster::producer_state_manager& local) {
local.rearm_timer_for_testing(std::chrono::milliseconds{1h});
})
.get();

auto total_producers = 1000000;
int evicted_so_far = 0;
std::vector<cluster::producer_ptr> producers;
producers.reserve(total_producers);
for (int i = 0; i < total_producers; i++) {
producers.push_back(ss::make_lw_shared<cluster::producer_state>(
_psm.local(),
model::random_producer_identity(),
raft::group_id{i},
[&] { evicted_so_far++; }));
}
check_producers(total_producers, total_producers);
BOOST_REQUIRE_EQUAL(evicted_so_far, 0);

// allow producer eviction
_max_producers.invoke_on_all([](auto& local) { local.update(0UL); }).get();
local_eviction_tick();
BOOST_REQUIRE_EQUAL(evicted_so_far, 10000);

clean(producers);
}

0 comments on commit 60868ed

Please sign in to comment.