From d1ecf0e4debae17fb500a8708f38ae5c0314baa5 Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Fri, 20 Sep 2024 08:49:12 +0100 Subject: [PATCH 1/2] tests/gtest_raft_rpunit: monitor_test_fixture honour leadership changes 1) added a utility function to raft_fixture to execute a testing coro in retry_with_leader 2) made both monitor_test_fixture tests use it --- src/v/raft/tests/BUILD | 2 + src/v/raft/tests/raft_fixture.h | 21 +++ src/v/raft/tests/replication_monitor_tests.cc | 150 ++++++++++-------- 3 files changed, 109 insertions(+), 64 deletions(-) diff --git a/src/v/raft/tests/BUILD b/src/v/raft/tests/BUILD index 02b517e87c0a..2ffcb1643a66 100644 --- a/src/v/raft/tests/BUILD +++ b/src/v/raft/tests/BUILD @@ -658,11 +658,13 @@ redpanda_cc_gtest( ], tags = ["exclusive"], deps = [ + "//src/v/model", "//src/v/raft/tests:raft_fixture", "//src/v/raft/tests:stm_raft_fixture_gtest", "//src/v/test_utils:fixture", "//src/v/test_utils:gtest", "@googletest//:gtest", + "@seastar", "@seastar//:testing", ], ) diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 39ff3ae27f38..99e92cf6238e 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -19,6 +19,7 @@ #include "model/timeout_clock.h" #include "raft/consensus_client_protocol.h" #include "raft/coordinated_recovery_throttle.h" +#include "raft/errc.h" #include "raft/fwd.h" #include "raft/heartbeat_manager.h" #include "raft/recovery_memory_quota.h" @@ -37,6 +38,7 @@ #include #include +#include namespace raft { inline constexpr raft::group_id test_group(123); @@ -495,6 +497,7 @@ class raft_fixture .then([&state] { return state.result; }); }); } + template auto retry_with_leader(model::timeout_clock::time_point deadline, Func&& f) { @@ -523,6 +526,24 @@ class raft_fixture _heartbeat_interval.update(std::move(timeout)); } +protected: + class raft_not_leader_exception : std::exception {}; + + template Subclass> + ss::future<> test_with_leader( + model::timeout_clock::duration timeout, + ss::future<> (Subclass::*method)(raft_node_instance& leader)) { + co_await retry_with_leader( + model::timeout_clock::now() + timeout, + [this, method](raft_node_instance& leader) { + return ((static_cast(this)->*method)(leader)) + .then([] { return errc::success; }) + .handle_exception_type([](const raft_not_leader_exception&) { + return errc::not_leader; + }); + }); + } + private: void validate_leaders(); diff --git a/src/v/raft/tests/replication_monitor_tests.cc b/src/v/raft/tests/replication_monitor_tests.cc index 8d0c40d7c6ad..30b73aaab641 100644 --- a/src/v/raft/tests/replication_monitor_tests.cc +++ b/src/v/raft/tests/replication_monitor_tests.cc @@ -7,19 +7,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "model/fundamental.h" #include "raft/tests/raft_fixture.h" -#include "raft/tests/raft_group_fixture.h" #include "test_utils/async.h" -using namespace raft; +#include -class monitor_test_fixture - : public raft_fixture - , public ::testing::WithParamInterface> { -public: - static bool write_caching() { return std::get<0>(GetParam()); } - static size_t num_waiters() { return std::get<1>(GetParam()); } -}; +using namespace raft; namespace { @@ -49,75 +43,103 @@ auto populate_waiters( return ss::when_all_succeed(futures.begin(), futures.end()); } +class monitor_test_fixture + : public raft_fixture + , public ::testing::WithParamInterface> { +public: + static bool write_caching() { return std::get<0>(GetParam()); } + static size_t num_waiters() { return std::get<1>(GetParam()); } + + ss::future<> truncation_detection_test(raft_node_instance& leader) { + auto raft = leader.raft(); + auto all = ::populate_waiters(raft, write_caching(), num_waiters()); + co_await ss::sleep(500ms); + ASSERT_FALSE_CORO(all.available()); + + for (auto& [id, node] : nodes()) { + if (id == leader.get_vnode().id()) { + node->on_dispatch( + [](model::node_id, raft::msg_type) { return ss::sleep(3s); }); + } + } + + std::vector>> replicate_f; + replicate_f.reserve(num_waiters()); + for (size_t i = 0; i < num_waiters(); i++) { + replicate_f.push_back(raft->replicate( + make_batches({{"k", "v"}}), + replicate_options{raft::consistency_level::quorum_ack})); + } + auto results = co_await ss::when_all( + replicate_f.begin(), replicate_f.end()); + for (auto& r : results) { + auto res = r.get(); + ASSERT_TRUE_CORO(res.has_error()); + if (res.error() == errc::not_leader) { + throw raft_not_leader_exception(); + } + ASSERT_EQ_CORO(res.error(), errc::replicated_entry_truncated); + } + + co_await tests::cooperative_spin_wait_with_timeout( + 2s, [&] { return all.available() && !all.failed(); }); + + auto result = all.get(); + for (size_t i = 0; i < num_waiters(); i++) { + if (result.at(i) == errc::not_leader) { + throw raft_not_leader_exception(); + } + ASSERT_EQ_CORO(result.at(i), errc::replicated_entry_truncated); + } + } + ss::future<> replication_monitor_wait_test(raft_node_instance& leader) { + auto raft = leader.raft(); + + auto all = ::populate_waiters(raft, write_caching(), num_waiters()); + co_await ss::sleep(500ms); + ASSERT_FALSE_CORO(all.available()); + + for (size_t i = 0; i < num_waiters(); i++) { + auto result = co_await raft->replicate( + make_batches({{"k", "v"}}), + replicate_options{raft::consistency_level::quorum_ack}); + if (result.has_error() && result.error() == errc::not_leader) { + throw raft_not_leader_exception(); + } + ASSERT_TRUE_CORO(result.has_value()) << result.error(); + } + + co_await tests::cooperative_spin_wait_with_timeout( + 2s, [&] { return all.available() && !all.failed(); }); + + auto result = all.get(); + for (size_t i = 0; i < num_waiters(); i++) { + if (result.at(i) == errc::not_leader) { + throw raft_not_leader_exception(); + } + ASSERT_EQ_CORO(result.at(i), errc::success); + } + } +}; } // namespace TEST_P_CORO(monitor_test_fixture, replication_monitor_wait) { co_await create_simple_group(5); co_await set_write_caching(write_caching()); - auto leader = co_await wait_for_leader(10s); - auto raft = node(leader).raft(); - - auto all = ::populate_waiters(raft, write_caching(), num_waiters()); - co_await ss::sleep(500ms); - ASSERT_FALSE_CORO(all.available()); - - for (size_t i = 0; i < num_waiters(); i++) { - auto result = co_await raft->replicate( - make_batches({{"k", "v"}}), - replicate_options{raft::consistency_level::quorum_ack}); - ASSERT_TRUE_CORO(result.has_value()) << result.error(); - } - co_await tests::cooperative_spin_wait_with_timeout( - 2s, [&] { return all.available() && !all.failed(); }); - - auto result = all.get(); - for (size_t i = 0; i < num_waiters(); i++) { - ASSERT_EQ_CORO(result.at(i), errc::success); - } + co_await test_with_leader( + 60s, &monitor_test_fixture::replication_monitor_wait_test); } TEST_P_CORO(monitor_test_fixture, truncation_detection) { set_enable_longest_log_detection(false); co_await create_simple_group(3); - auto leader = co_await wait_for_leader(10s); - co_await set_write_caching(write_caching()); - - auto raft = node(leader).raft(); - auto all = ::populate_waiters(raft, write_caching(), num_waiters()); - co_await ss::sleep(500ms); - ASSERT_FALSE_CORO(all.available()); - - for (auto& [id, node] : nodes()) { - if (id == leader) { - node->on_dispatch( - [](model::node_id, raft::msg_type) { return ss::sleep(3s); }); - } - } - std::vector>> replicate_f; - replicate_f.reserve(num_waiters()); - for (size_t i = 0; i < num_waiters(); i++) { - replicate_f.push_back(raft->replicate( - make_batches({{"k", "v"}}), - replicate_options{raft::consistency_level::quorum_ack})); - } - auto results = co_await ss::when_all( - replicate_f.begin(), replicate_f.end()); - for (auto& r : results) { - auto res = r.get(); - ASSERT_TRUE_CORO(res.has_error()); - ASSERT_EQ_CORO(res.error(), errc::replicated_entry_truncated); - } - - co_await tests::cooperative_spin_wait_with_timeout( - 2s, [&] { return all.available() && !all.failed(); }); + co_await set_write_caching(write_caching()); - auto result = all.get(); - for (size_t i = 0; i < num_waiters(); i++) { - ASSERT_EQ_CORO(result.at(i), errc::replicated_entry_truncated); - } + co_await test_with_leader( + 60s, &monitor_test_fixture::truncation_detection_test); } INSTANTIATE_TEST_SUITE_P( From 796e6d62f9002fd8d8c55c2d20069e09dc49634e Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Tue, 12 Nov 2024 14:54:50 +0000 Subject: [PATCH 2/2] raft/tests/replication_monitor: rename confusing identifiers --- src/v/raft/tests/replication_monitor_tests.cc | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/src/v/raft/tests/replication_monitor_tests.cc b/src/v/raft/tests/replication_monitor_tests.cc index 30b73aaab641..768705e3e037 100644 --- a/src/v/raft/tests/replication_monitor_tests.cc +++ b/src/v/raft/tests/replication_monitor_tests.cc @@ -52,9 +52,10 @@ class monitor_test_fixture ss::future<> truncation_detection_test(raft_node_instance& leader) { auto raft = leader.raft(); - auto all = ::populate_waiters(raft, write_caching(), num_waiters()); + auto wait_futures = ::populate_waiters( + raft, write_caching(), num_waiters()); co_await ss::sleep(500ms); - ASSERT_FALSE_CORO(all.available()); + ASSERT_FALSE_CORO(wait_futures.available()); for (auto& [id, node] : nodes()) { if (id == leader.get_vnode().id()) { @@ -70,9 +71,9 @@ class monitor_test_fixture make_batches({{"k", "v"}}), replicate_options{raft::consistency_level::quorum_ack})); } - auto results = co_await ss::when_all( + auto repl_results = co_await ss::when_all( replicate_f.begin(), replicate_f.end()); - for (auto& r : results) { + for (auto& r : repl_results) { auto res = r.get(); ASSERT_TRUE_CORO(res.has_error()); if (res.error() == errc::not_leader) { @@ -81,43 +82,49 @@ class monitor_test_fixture ASSERT_EQ_CORO(res.error(), errc::replicated_entry_truncated); } - co_await tests::cooperative_spin_wait_with_timeout( - 2s, [&] { return all.available() && !all.failed(); }); + co_await tests::cooperative_spin_wait_with_timeout(2s, [&] { + return wait_futures.available() && !wait_futures.failed(); + }); - auto result = all.get(); + auto wait_results = wait_futures.get(); for (size_t i = 0; i < num_waiters(); i++) { - if (result.at(i) == errc::not_leader) { + if (wait_results.at(i) == errc::not_leader) { throw raft_not_leader_exception(); } - ASSERT_EQ_CORO(result.at(i), errc::replicated_entry_truncated); + ASSERT_EQ_CORO( + wait_results.at(i), errc::replicated_entry_truncated); } } ss::future<> replication_monitor_wait_test(raft_node_instance& leader) { auto raft = leader.raft(); - auto all = ::populate_waiters(raft, write_caching(), num_waiters()); + auto wait_futures = ::populate_waiters( + raft, write_caching(), num_waiters()); co_await ss::sleep(500ms); - ASSERT_FALSE_CORO(all.available()); + ASSERT_FALSE_CORO(wait_futures.available()); for (size_t i = 0; i < num_waiters(); i++) { - auto result = co_await raft->replicate( + auto repl_result = co_await raft->replicate( make_batches({{"k", "v"}}), replicate_options{raft::consistency_level::quorum_ack}); - if (result.has_error() && result.error() == errc::not_leader) { + if ( + repl_result.has_error() + && repl_result.error() == errc::not_leader) { throw raft_not_leader_exception(); } - ASSERT_TRUE_CORO(result.has_value()) << result.error(); + ASSERT_TRUE_CORO(repl_result.has_value()) << repl_result.error(); } - co_await tests::cooperative_spin_wait_with_timeout( - 2s, [&] { return all.available() && !all.failed(); }); + co_await tests::cooperative_spin_wait_with_timeout(2s, [&] { + return wait_futures.available() && !wait_futures.failed(); + }); - auto result = all.get(); + auto wait_results = wait_futures.get(); for (size_t i = 0; i < num_waiters(); i++) { - if (result.at(i) == errc::not_leader) { + if (wait_results.at(i) == errc::not_leader) { throw raft_not_leader_exception(); } - ASSERT_EQ_CORO(result.at(i), errc::success); + ASSERT_EQ_CORO(wait_results.at(i), errc::success); } } };