Skip to content

Commit

Permalink
Merge pull request #23398 from bashtanov/gtest_raft_rpunit-replicatio…
Browse files Browse the repository at this point in the history
…n_monitor_basic-not_leader

tests/gtest_raft_rpunit: monitor_test_fixture to honour leadership changes
  • Loading branch information
bashtanov authored Nov 28, 2024
2 parents 9b42aaf + 796e6d6 commit cf3e9ed
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 64 deletions.
2 changes: 2 additions & 0 deletions src/v/raft/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,13 @@ redpanda_cc_gtest(
],
tags = ["exclusive"],
deps = [
"//src/v/model",
"//src/v/raft/tests:raft_fixture",
"//src/v/raft/tests:stm_raft_fixture_gtest",
"//src/v/test_utils:fixture",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
"@seastar",
"@seastar//:testing",
],
)
Expand Down
21 changes: 21 additions & 0 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "model/timeout_clock.h"
#include "raft/consensus_client_protocol.h"
#include "raft/coordinated_recovery_throttle.h"
#include "raft/errc.h"
#include "raft/fwd.h"
#include "raft/heartbeat_manager.h"
#include "raft/recovery_memory_quota.h"
Expand All @@ -37,6 +38,7 @@
#include <boost/range/irange.hpp>

#include <ranges>
#include <system_error>
namespace raft {

inline constexpr raft::group_id test_group(123);
Expand Down Expand Up @@ -495,6 +497,7 @@ class raft_fixture
.then([&state] { return state.result; });
});
}

template<typename Func>
auto
retry_with_leader(model::timeout_clock::time_point deadline, Func&& f) {
Expand Down Expand Up @@ -523,6 +526,24 @@ class raft_fixture
_heartbeat_interval.update(std::move(timeout));
}

protected:
class raft_not_leader_exception : std::exception {};

template<std::derived_from<raft_fixture> Subclass>
ss::future<> test_with_leader(
model::timeout_clock::duration timeout,
ss::future<> (Subclass::*method)(raft_node_instance& leader)) {
co_await retry_with_leader(
model::timeout_clock::now() + timeout,
[this, method](raft_node_instance& leader) {
return ((static_cast<Subclass*>(this)->*method)(leader))
.then([] { return errc::success; })
.handle_exception_type([](const raft_not_leader_exception&) {
return errc::not_leader;
});
});
}

private:
void validate_leaders();

Expand Down
157 changes: 93 additions & 64 deletions src/v/raft/tests/replication_monitor_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "model/fundamental.h"
#include "raft/tests/raft_fixture.h"
#include "raft/tests/raft_group_fixture.h"
#include "test_utils/async.h"

using namespace raft;
#include <seastar/core/lowres_clock.hh>

class monitor_test_fixture
: public raft_fixture
, public ::testing::WithParamInterface<std::tuple<bool, size_t>> {
public:
static bool write_caching() { return std::get<0>(GetParam()); }
static size_t num_waiters() { return std::get<1>(GetParam()); }
};
using namespace raft;

namespace {

Expand Down Expand Up @@ -49,75 +43,110 @@ auto populate_waiters(
return ss::when_all_succeed(futures.begin(), futures.end());
}

class monitor_test_fixture
: public raft_fixture
, public ::testing::WithParamInterface<std::tuple<bool, size_t>> {
public:
static bool write_caching() { return std::get<0>(GetParam()); }
static size_t num_waiters() { return std::get<1>(GetParam()); }

ss::future<> truncation_detection_test(raft_node_instance& leader) {
auto raft = leader.raft();
auto wait_futures = ::populate_waiters(
raft, write_caching(), num_waiters());
co_await ss::sleep(500ms);
ASSERT_FALSE_CORO(wait_futures.available());

for (auto& [id, node] : nodes()) {
if (id == leader.get_vnode().id()) {
node->on_dispatch(
[](model::node_id, raft::msg_type) { return ss::sleep(3s); });
}
}

std::vector<ss::future<result<replicate_result>>> replicate_f;
replicate_f.reserve(num_waiters());
for (size_t i = 0; i < num_waiters(); i++) {
replicate_f.push_back(raft->replicate(
make_batches({{"k", "v"}}),
replicate_options{raft::consistency_level::quorum_ack}));
}
auto repl_results = co_await ss::when_all(
replicate_f.begin(), replicate_f.end());
for (auto& r : repl_results) {
auto res = r.get();
ASSERT_TRUE_CORO(res.has_error());
if (res.error() == errc::not_leader) {
throw raft_not_leader_exception();
}
ASSERT_EQ_CORO(res.error(), errc::replicated_entry_truncated);
}

co_await tests::cooperative_spin_wait_with_timeout(2s, [&] {
return wait_futures.available() && !wait_futures.failed();
});

auto wait_results = wait_futures.get();
for (size_t i = 0; i < num_waiters(); i++) {
if (wait_results.at(i) == errc::not_leader) {
throw raft_not_leader_exception();
}
ASSERT_EQ_CORO(
wait_results.at(i), errc::replicated_entry_truncated);
}
}
ss::future<> replication_monitor_wait_test(raft_node_instance& leader) {
auto raft = leader.raft();

auto wait_futures = ::populate_waiters(
raft, write_caching(), num_waiters());
co_await ss::sleep(500ms);
ASSERT_FALSE_CORO(wait_futures.available());

for (size_t i = 0; i < num_waiters(); i++) {
auto repl_result = co_await raft->replicate(
make_batches({{"k", "v"}}),
replicate_options{raft::consistency_level::quorum_ack});
if (
repl_result.has_error()
&& repl_result.error() == errc::not_leader) {
throw raft_not_leader_exception();
}
ASSERT_TRUE_CORO(repl_result.has_value()) << repl_result.error();
}

co_await tests::cooperative_spin_wait_with_timeout(2s, [&] {
return wait_futures.available() && !wait_futures.failed();
});

auto wait_results = wait_futures.get();
for (size_t i = 0; i < num_waiters(); i++) {
if (wait_results.at(i) == errc::not_leader) {
throw raft_not_leader_exception();
}
ASSERT_EQ_CORO(wait_results.at(i), errc::success);
}
}
};
} // namespace

TEST_P_CORO(monitor_test_fixture, replication_monitor_wait) {
co_await create_simple_group(5);

co_await set_write_caching(write_caching());
auto leader = co_await wait_for_leader(10s);
auto raft = node(leader).raft();

auto all = ::populate_waiters(raft, write_caching(), num_waiters());
co_await ss::sleep(500ms);
ASSERT_FALSE_CORO(all.available());

for (size_t i = 0; i < num_waiters(); i++) {
auto result = co_await raft->replicate(
make_batches({{"k", "v"}}),
replicate_options{raft::consistency_level::quorum_ack});
ASSERT_TRUE_CORO(result.has_value()) << result.error();
}

co_await tests::cooperative_spin_wait_with_timeout(
2s, [&] { return all.available() && !all.failed(); });

auto result = all.get();
for (size_t i = 0; i < num_waiters(); i++) {
ASSERT_EQ_CORO(result.at(i), errc::success);
}
co_await test_with_leader(
60s, &monitor_test_fixture::replication_monitor_wait_test);
}

TEST_P_CORO(monitor_test_fixture, truncation_detection) {
set_enable_longest_log_detection(false);
co_await create_simple_group(3);
auto leader = co_await wait_for_leader(10s);
co_await set_write_caching(write_caching());

auto raft = node(leader).raft();
auto all = ::populate_waiters(raft, write_caching(), num_waiters());
co_await ss::sleep(500ms);
ASSERT_FALSE_CORO(all.available());

for (auto& [id, node] : nodes()) {
if (id == leader) {
node->on_dispatch(
[](model::node_id, raft::msg_type) { return ss::sleep(3s); });
}
}

std::vector<ss::future<result<replicate_result>>> replicate_f;
replicate_f.reserve(num_waiters());
for (size_t i = 0; i < num_waiters(); i++) {
replicate_f.push_back(raft->replicate(
make_batches({{"k", "v"}}),
replicate_options{raft::consistency_level::quorum_ack}));
}
auto results = co_await ss::when_all(
replicate_f.begin(), replicate_f.end());
for (auto& r : results) {
auto res = r.get();
ASSERT_TRUE_CORO(res.has_error());
ASSERT_EQ_CORO(res.error(), errc::replicated_entry_truncated);
}

co_await tests::cooperative_spin_wait_with_timeout(
2s, [&] { return all.available() && !all.failed(); });
co_await set_write_caching(write_caching());

auto result = all.get();
for (size_t i = 0; i < num_waiters(); i++) {
ASSERT_EQ_CORO(result.at(i), errc::replicated_entry_truncated);
}
co_await test_with_leader(
60s, &monitor_test_fixture::truncation_detection_test);
}

INSTANTIATE_TEST_SUITE_P(
Expand Down

0 comments on commit cf3e9ed

Please sign in to comment.