Skip to content

Commit

Permalink
UT improvement
Browse files Browse the repository at this point in the history
When using the write_on_leader() for long running tests, it is very
possbile a leader switch happens during the test. Previous
implementation the new leader(prvious follower) will not able to
aware the role change and pick up to do more write.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Jul 29, 2024
1 parent 742c85b commit cca15ad
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class HSReplTestHelper : public HSTestHelper {
void setup() {
replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >();
sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_));
sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v");
sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%n] [%t] %v");
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();

boost::uuids::string_generator gen;
Expand Down
70 changes: 59 additions & 11 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) override {
LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn);
{
std::unique_lock lk(db_mtx_);
rollback_count_++;
}
// continue the test
if (ctx->is_proposer()) { g_helper->runner().next_task(); }
}

void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
Expand Down Expand Up @@ -224,6 +230,11 @@ class TestReplicatedDB : public homestore::ReplDevListener {
return commit_count_;
}

uint64_t db_rollback_count() const {
std::shared_lock lk(db_mtx_);
return rollback_count_;
}

uint64_t db_size() const {
std::shared_lock lk(db_mtx_);
return inmem_db_.size();
Expand All @@ -232,6 +243,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
private:
std::map< Key, Value > inmem_db_;
uint64_t commit_count_{0};
uint64_t rollback_count_{0};
std::shared_mutex db_mtx_;
};

Expand Down Expand Up @@ -273,6 +285,22 @@ class RaftReplDevTest : public testing::Test {

void wait_for_all_commits() { wait_for_commits(written_entries_); }

uint64_t total_committed_cnt() {
uint64_t total_writes{0};
for (auto const& db : dbs_) {
total_writes += db->db_commit_count();
}
return total_writes;
}

uint64_t total_rollback_cnt() {
uint64_t total_rollback{0};
for (auto const& db : dbs_) {
total_rollback += db->db_rollback_count();
}
return total_rollback;
}

void wait_for_commits(uint64_t exp_writes) {
uint64_t total_writes{0};
while (true) {
Expand Down Expand Up @@ -346,27 +374,47 @@ class RaftReplDevTest : public testing::Test {
LOGINFO("Waiting for leader to be elected");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
} else if (leader_uuid == g_helper->my_replica_id()) {
LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries,
boost::uuids::to_string(g_helper->my_replica_id()));
// LEADER ROLE
auto batch_size = wait_for_commit ? g_helper->runner().qdepth_ * 10 : num_entries;
// cap batch_size but should be larger than QD.
// It is possible after leader switch the writes run on previous leader will fail
// so we need to do more IOs to have num_entries committed.
if (batch_size > num_entries - written_entries_)
batch_size = std::max(num_entries - written_entries_, g_helper->runner().qdepth_);
LOGINFO("Writing {} entries since I am the leader my_uuid={}, target_total {}, written {}", batch_size,
boost::uuids::to_string(g_helper->my_replica_id()), num_entries, written_entries_);
auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
g_helper->runner().set_num_tasks(num_entries);

g_helper->runner().set_num_tasks(batch_size);
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size, db]() {
static std::normal_distribution<> num_blks_gen{3.0, 2.0};
this->generate_writes(std::abs(std::round(num_blks_gen(g_re))) * block_size, block_size, db);
});
if (wait_for_commit) { g_helper->runner().execute().get(); }
break;
written_entries_ += batch_size;
if (wait_for_commit) {
g_helper->runner().execute().get();
if (total_committed_cnt() >= num_entries) { break; }
} else {
if (written_entries_ >= num_entries) { break; }
}
} else {
LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries,
// FOLLOWER ROLE
LOGINFO("{} entries are expected to be written on the leader_uuid={}, my_uuid={}", num_entries,
boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id()));
break;
if (wait_for_commit) {
LOGINFO("{} entries are expected to be written, now I committed {}, my_uuid={}", num_entries,
total_committed_cnt(), boost::uuids::to_string(leader_uuid),
boost::uuids::to_string(g_helper->my_replica_id()));
if (total_committed_cnt() >= num_entries) { break; }
std::this_thread::sleep_for(std::chrono::milliseconds{5000});
} else {
break;
}
}
} while (true);

written_entries_ += num_entries;
if (wait_for_commit) { this->wait_for_all_commits(); }
LOGINFO("my_uuid={}, {} entries are expected to be written, I wrote {}, committed {}, rollback {}",
boost::uuids::to_string(g_helper->my_replica_id()), num_entries, written_entries_,
total_committed_cnt(), total_rollback_cnt());
}

void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) {
Expand Down

0 comments on commit cca15ad

Please sign in to comment.