From d5dfe5fdfddcca91606603e32811bc0c1078b529 Mon Sep 17 00:00:00 2001 From: juntaosu Date: Thu, 25 May 2023 23:18:40 +0800 Subject: [PATCH] add readme --- README.md | 244 +++++++++++++++++++++++++++++++--- README_zh-cn.md | 239 +++++++++++++++++++++++++++++++++ example/multi_mem_node.cc | 25 +++- example/single_mem_node.cc | 25 +++- src/confchange/confchange.cc | 76 +++++------ src/confchange/cquick_test.cc | 2 +- src/raft.cc | 8 +- src/raft_test.cc | 8 +- src/raft_test_util.cc | 6 +- src/rawnode.cc | 75 ++++------- src/rawnode.h | 21 +-- src/rawnode_test.cc | 124 ++++++++--------- src/tracker/tracker.cc | 22 +-- src/tracker/tracker.h | 36 ++--- 14 files changed, 680 insertions(+), 231 deletions(-) create mode 100644 README_zh-cn.md diff --git a/README.md b/README.md index 604f084..54aae61 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,237 @@ -- etcd raft -- cpp17 -- cmake?makefile? -- goggle test +# Raft +[中文介绍](./README_zh-cn.md) +## Abstract +Raft is a distributed consensus algorithm designed to manage replication of state machines in distributed systems. Its purpose is to ensure data consistency across multiple servers, despite failures and network partitions. It is widely used in modern distributed systems like etcd, cockroach, and tikv, providing a solid foundation for maintaining data consistency and reliability. -git submodule add https://xxx -git submodule update --init --recursive -git submodule update --init xxx +The library includes only the raft algorithm, leaving the network and storage layers to be implemented by the user, which makes the library more flexible and customizable. +## Build +> - bazel > 3.0 +> - c++17 -git submodule update --remote liba +### Build all +```shell +bazel build //... +``` +All binary files will be generated in the bazel-bin directory. +### Build all tests +```shell +bazel test //... +``` +### Using in your project +First add the following to your **WORKSPACE**. -git config -f .gitmodules submodule.liba.branch dev -git submodule update --remote +```shell +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") +load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") -git submodule rm xxx +git_repository( + name = "raft-cpp", + branch = "main", + remote = "https://github.com/ImSjt/raft-cpp.git", +) +``` +Since the library depends on protobuf, you also need to add the following to your **WORKSPACE**. +```shell -TODO -- 使用glog -- status抽象状态码? -- 补充测试 +# rules_cc defines rules for generating C++ code from Protocol Buffers. +http_archive( + name = "rules_cc", + sha256 = "35f2fb4ea0b3e61ad64a369de284e4fbbdcdba71836a5555abb5e194cf119509", + strip_prefix = "rules_cc-624b5d59dfb45672d4239422fa1e3de1822ee110", + urls = [ + "https://mirror.bazel.build/github.com/bazelbuild/rules_cc/archive/624b5d59dfb45672d4239422fa1e3de1822ee110.tar.gz", + "https://github.com/bazelbuild/rules_cc/archive/624b5d59dfb45672d4239422fa1e3de1822ee110.tar.gz", + ], +) +# rules_proto defines abstract rules for building Protocol Buffers. +http_archive( + name = "rules_proto", + sha256 = "2490dca4f249b8a9a3ab07bd1ba6eca085aaf8e45a734af92aad0c42d9dc7aaf", + strip_prefix = "rules_proto-218ffa7dfa5408492dc86c01ee637614f8695c45", + urls = [ + "https://mirror.bazel.build/github.com/bazelbuild/rules_proto/archive/218ffa7dfa5408492dc86c01ee637614f8695c45.tar.gz", + "https://github.com/bazelbuild/rules_proto/archive/218ffa7dfa5408492dc86c01ee637614f8695c45.tar.gz", + ], +) + +load("@rules_cc//cc:repositories.bzl", "rules_cc_dependencies") +rules_cc_dependencies() + +load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies", "rules_proto_toolchains") +rules_proto_dependencies() +rules_proto_toolchains() + +``` +Reference the library in **BUILD** file. + +```shell +package(default_visibility = ["//visibility:public"]) + +cc_binary( + name = "example", + srcs = [ + "main.cc", + ], + deps = [ + "@raft-cpp//:raft-cpp", + ], +) +``` +Start using the raft-cpp. +```c++ +#include "rawnode.h" + +int main(int argc, char* argv[]) { + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + craft::Raft::Config cfg { + .id = 1, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + auto node = craft::RawNode::Start(cfg, {craft::Peer{1}}); + + return 0; +} +``` + +## Usage +Start a node from scratch using RawNode::Start or start a node from some initial state using RawNode::Restart. + +**To start a three-node cluster** + +```c++ + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + craft::Raft::Config cfg = { + .id = 1, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + // Set peer list to the nodes in the cluster. + // Note that they need to be started separately as well. + auto rn = craft::RawNode::Start(cfg, {craft::Peer{1}, craft::Peer{2}, craft::Peer{3}}); +``` +**Adding a node to the cluster** + +First add a new node by calling `Rawnode::ProposeConfChange` to one of the nodes in the cluster, and then start an empty new node as follows. + +```c++ + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + craft::Raft::Config cfg = { + .id = 4, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + // Restart raft without peer information. + // Peer information should be synchronized from the leader. + auto rn = craft::RawNode::ReStart(cfg); +``` +**To restart a node from state** +```c++ + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + + // Recover the in-memory storage from persistent snapshot, state and entries. + storage->ApplySnapshot(snapshot); + storage->SetHardState(state); + storage->Append(entries); + + craft::Raft::Config cfg = { + .id = 1, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + // Restart raft without peer information. + // Peer information is already included in the storage. + auto rn = craft::RawNode::ReStart(cfg); +``` +**After creating the node, there is still some work to be done** + +First read ready by RawNode::GetReady and process the updates it contains. + +1. Persist Entries, HardState, Snapshot, write to Entries first, and then write to HardState and Snapshot if they are not empty. +2. Send the messages to the specified node. If there is a MsgSnap type, call the RawNode::ReportSnapshot after sending the snapshot. +3. Apply snapshot and committed_entries to the state machine, if the committed_entries have EntryConfChange type entries, then you need to call RawNode::ApplyConfChange to apply. +4. The final call to RawNode::Advance() indicates that processing is complete and the next batch of updates can be accepted. + +**Call awNode::Tick at regular intervals** +```c++ +void RawNode::Tick(); +``` +RawNode::Tick will drive Raft's heartbeat and election timeout. + +**RawNode::Step needs to be called when a message is received to process** +```c++ +void RawNode::Step(MsgPtr m); +``` + +**Send request using RawNode::Propose** + +Serialize the request into a string before sending it. +```c++ +Status RawNode::Propose(const std::string& data); +``` + +The whole process is similar to the following. +```c++ + while (1) { + auto items = queue.wait_dequeue(timeout); + for (auto item : items) { + if (request) { + n->Propose(item); + } else if (message) { + n->Step(item); + } + } + + if (heartbeat_timeout) { + n->Tick(); + } + + auto ready = n->GetReady(); + saveToStorage(ready->hard_state, ready->entries, ready->snapshot); + send(ready->messages); + if (ready->snapshot) { + processSnapshot(ready->snapshot); + } + + for (auto entry : ready->committed_entries) { + process(entry); + if (entry->type() == raftpb::EntryType::EntryConfChange) { + raftpb::ConfChange cc; + cc.ParseFromString(ent->data()); + node->rn->ApplyConfChange(craft::ConfChangeI(std::move(cc))); + } + } + + n->Advance(); + } +``` +## Thank +This project references the go implementation of etcd raft, thanks to etcd for providing such an elegant implementation. +## Reference +- [Etcd Raft](https://github.com/etcd-io/raft) +- [Raft Paper](https://raft.github.io/raft.pdf) +- [The Raft Site](https://raft.github.io/) +- [The Secret Lives of Data - Raft](https://thesecretlivesofdata.com/raft/) diff --git a/README_zh-cn.md b/README_zh-cn.md new file mode 100644 index 0000000..972c5de --- /dev/null +++ b/README_zh-cn.md @@ -0,0 +1,239 @@ +# Raft +## 简介 +Raft 是一种分布式一致性算法,旨在管理分布式系统中的复制状态机。它的目的是确保多个服务器之间的数据一致性,尽管存在故障和网络分区。这种算法在现代分布式系统(如 etcd、cockroach 和 tikv)中广泛应用,为维护数据一致性和可靠性提供了坚实的基础。 + +该库仅包括 raft 算法,将网络层和存储层留给用户自己实现,这使得该库更加灵活和可定制。 +## 编译 +> - bazel > 3.0 +> - c++17 + +### 编译所有(包括example和所有测试) +```shell +bazel build //... +``` +在bazel-bin目录下将生成所有二进制文件 +### 运行所有测试 +```shell +bazel test //... +``` +### 在你的项目中使用 + +首先在你的**WORKSPACE**添加 +```shell +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") +load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") + +git_repository( + name = "raft-cpp", + branch = "main", + remote = "https://github.com/ImSjt/raft-cpp.git", +) +``` +由于raft-cpp需要使用到protobuf,所以还需要在你的**WORKSPACE**加上下面内容 +```shell + +# rules_cc defines rules for generating C++ code from Protocol Buffers. +http_archive( + name = "rules_cc", + sha256 = "35f2fb4ea0b3e61ad64a369de284e4fbbdcdba71836a5555abb5e194cf119509", + strip_prefix = "rules_cc-624b5d59dfb45672d4239422fa1e3de1822ee110", + urls = [ + "https://mirror.bazel.build/github.com/bazelbuild/rules_cc/archive/624b5d59dfb45672d4239422fa1e3de1822ee110.tar.gz", + "https://github.com/bazelbuild/rules_cc/archive/624b5d59dfb45672d4239422fa1e3de1822ee110.tar.gz", + ], +) + +# rules_proto defines abstract rules for building Protocol Buffers. +http_archive( + name = "rules_proto", + sha256 = "2490dca4f249b8a9a3ab07bd1ba6eca085aaf8e45a734af92aad0c42d9dc7aaf", + strip_prefix = "rules_proto-218ffa7dfa5408492dc86c01ee637614f8695c45", + urls = [ + "https://mirror.bazel.build/github.com/bazelbuild/rules_proto/archive/218ffa7dfa5408492dc86c01ee637614f8695c45.tar.gz", + "https://github.com/bazelbuild/rules_proto/archive/218ffa7dfa5408492dc86c01ee637614f8695c45.tar.gz", + ], +) + +load("@rules_cc//cc:repositories.bzl", "rules_cc_dependencies") +rules_cc_dependencies() + +load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies", "rules_proto_toolchains") +rules_proto_dependencies() +rules_proto_toolchains() + +``` +在**BUILD**文件中引用raft-cpp +```shell +package(default_visibility = ["//visibility:public"]) + +cc_binary( + name = "example", + srcs = [ + "main.cc", + ], + deps = [ + "@raft-cpp//:raft-cpp", + ], +) +``` +开始使用raft-cpp +```c++ +#include "rawnode.h" + +int main(int argc, char* argv[]) { + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + craft::Raft::Config cfg { + .id = 1, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + auto node = craft::RawNode::Start(cfg, {craft::Peer{1}}); + + return 0; +} +``` + +## 用法 +使用RawNode::Start从头开始建立一个节点,使用RawNode::Restart从某个初始状态启动一个节点。 + +**启动三节点集群** + +```c++ + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + craft::Raft::Config cfg = { + .id = 1, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + // Set peer list to the nodes in the cluster. + // Note that they need to be started separately as well. + auto rn = craft::RawNode::Start(cfg, {craft::Peer{1}, craft::Peer{2}, craft::Peer{3}}); +``` +**添加新节点到集群中** + +首先通过向集群中某个节点调用`ProposeConfChange`添加新节点,然后再启动一个空的新节点,如下。 +```c++ + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + craft::Raft::Config cfg = { + .id = 4, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + // Restart raft without peer information. + // Peer information should be synchronized from the leader. + auto rn = craft::RawNode::ReStart(cfg); +``` +**使用旧状态启动节点** +```c++ + auto logger = std::make_shared(); + auto storage = std::make_shared(logger); + + // Recover the in-memory storage from persistent snapshot, state and entries. + storage->ApplySnapshot(snapshot); + storage->SetHardState(state); + storage->Append(entries); + + craft::Raft::Config cfg = { + .id = 1, + .election_tick = 10, + .heartbeat_tick = 3, + .storage = storage, + .max_size_per_msg = 1024 * 1024 * 1024, + .max_inflight_msgs = 256, + .logger = logger, + }; + // Restart raft without peer information. + // Peer information is already included in the storage. + auto rn = craft::RawNode::ReStart(cfg); +``` +**创建节点后,还有一些工作要做** + +首先通过RawNode::GetReady()读取并处理最新的更新。 + +1. 将Entries、HardState、Snapshot持久化,先写入Entries,如果HardState和Snapshot不为空再将它们写入。 +2. 将messages发送到指定的节点,如果有MsgSnap类型,在发送完snapshot后要调用RawNode::ReportSnapshot接口。 +3. 应用snapshot和committed_entries到状态机中,如果committed_entries中有EntryConfChange类型的entry,那么需要调用RawNode::ApplyConfChange应用。 +4. 最后调用RawNode::Advance()表示处理完成,可以接受下一批更新。 + +**在收到消息时需要调用RawNode::Step处理** +```c++ +void RawNode::Step(MsgPtr m); +``` +*最后,需要定时调用RawNode::Tick* +```c++ +void RawNode::Tick(); +``` +RawNode::Tick会驱动Raft的心跳机制以及选举机制。 + +**发送请求需要使用RawNode::Propose处理** + +将请求序列化成字符串再发送。 +```c++ +Status RawNode::Propose(const std::string& data); +``` + +**处理消息需要调用RawNode::Step处理** + +```c++ +Status RawNode::Step(MsgPtr m); +``` + +整个流程类似于下面。 +```c++ + while (1) { + auto items = queue.wait_dequeue(timeout); + for (auto item : items) { + if (request) { + n->Propose(item); + } else if (message) { + n->Step(item); + } + } + + if (heartbeat_timeout) { + n->Tick(); + } + + auto ready = n->GetReady(); + saveToStorage(ready->hard_state, ready->entries, ready->snapshot); + send(ready->messages); + if (ready->snapshot) { + processSnapshot(ready->snapshot); + } + + for (auto entry : ready->committed_entries) { + process(entry); + if (entry->type() == raftpb::EntryType::EntryConfChange) { + raftpb::ConfChange cc; + cc.ParseFromString(ent->data()); + node->rn->ApplyConfChange(craft::ConfChangeI(std::move(cc))); + } + } + + n->Advance(); + } +``` +## 感谢 +该项目参考了etcd raft的实现,感谢etcd提供了如此优雅的实现。 + +## 参考 +- [Etcd Raft](https://github.com/etcd-io/raft) +- [Raft Paper](https://raft.github.io/raft.pdf) +- [The Raft Site](https://raft.github.io/) +- [The Secret Lives of Data - Raft](https://thesecretlivesofdata.com/raft/) + diff --git a/example/multi_mem_node.cc b/example/multi_mem_node.cc index 2e82fe9..e48b784 100644 --- a/example/multi_mem_node.cc +++ b/example/multi_mem_node.cc @@ -1,3 +1,16 @@ +// Copyright 2023 JT +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. #include #include #include @@ -102,20 +115,20 @@ static void onReady(std::shared_ptr node, Transport& transport) { // Persistent hard state and entries. // rd.hard_state / rd.entries - if (!craft::IsEmptySnap(rd.snapshot)) { + if (!craft::IsEmptySnap(rd->snapshot)) { // Persistent snapshot - node->storage->ApplySnapshot(rd.snapshot); + node->storage->ApplySnapshot(rd->snapshot); // Apply snapshot } - node->storage->Append(rd.entries); + node->storage->Append(rd->entries); auto handle_messages = [&transport](const craft::MsgPtrs& msgs) { for (auto msg : msgs) { transport.Send(msg); } }; - handle_messages(rd.messages); + handle_messages(rd->messages); auto handle_committed_entries = [node](const craft::EntryPtrs& ents) { for (auto& ent : ents) { @@ -140,9 +153,9 @@ static void onReady(std::shared_ptr node, Transport& transport) { } } }; - handle_committed_entries(rd.committed_entries); + handle_committed_entries(rd->committed_entries); - node->rn->Advance(rd); + node->rn->Advance(); } static int64_t genId() { diff --git a/example/single_mem_node.cc b/example/single_mem_node.cc index f183392..5ba0790 100644 --- a/example/single_mem_node.cc +++ b/example/single_mem_node.cc @@ -1,3 +1,16 @@ +// Copyright 2023 JT +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. #include #include #include @@ -26,17 +39,17 @@ static void onReady(Peer& peer, std::map>& wa // Persistent hard state and entries. // rd.hard_state / rd.entries - if (!craft::IsEmptySnap(rd.snapshot)) { + if (!craft::IsEmptySnap(rd->snapshot)) { // Persistent snapshot - peer.storage->ApplySnapshot(rd.snapshot); + peer.storage->ApplySnapshot(rd->snapshot); // Apply snapshot } - peer.storage->Append(rd.entries); + peer.storage->Append(rd->entries); auto handle_messages = [](const craft::MsgPtrs& msgs) { }; - handle_messages(rd.messages); + handle_messages(rd->messages); auto handle_committed_entries = [&waiters](const craft::EntryPtrs& ents) { for (auto& ent : ents) { @@ -53,9 +66,9 @@ static void onReady(Peer& peer, std::map>& wa } } }; - handle_committed_entries(rd.committed_entries); + handle_committed_entries(rd->committed_entries); - peer.rn->Advance(rd); + peer.rn->Advance(); } static void sendPropose(std::shared_ptr logger, moodycamel::BlockingConcurrentQueue& q) { diff --git a/src/confchange/confchange.cc b/src/confchange/confchange.cc index 48afe69..ac3f5a8 100644 --- a/src/confchange/confchange.cc +++ b/src/confchange/confchange.cc @@ -32,22 +32,22 @@ std::tuple Changer::EnterJoint( return err(Status::Error("config is already joint")); } - if (cfg.voters_.Incoming().Size() == 0) { + if (cfg.voters.Incoming().Size() == 0) { // We allow adding nodes to an empty config for convenience (testing and // bootstrap), but you can't enter a joint state. return err(Status::Error("can't make a zero-voter config joint")); } // Clear the outgoing config. - cfg.voters_.Outgoing().Reset(); + cfg.voters.Outgoing().Reset(); // Copy incoming to outgoing. - cfg.voters_.Outgoing() = cfg.voters_.Incoming(); + cfg.voters.Outgoing() = cfg.voters.Incoming(); status = Apply(cfg, prs, ccs); if (!status.IsOK()) { return err(std::move(status)); } - cfg.auto_leave_ = auto_leave; + cfg.auto_leave = auto_leave; return CheckAndReturn(std::move(cfg), std::move(prs)); } @@ -59,24 +59,24 @@ std::tuple Changer::LeaveJoint() c if (!cfg.Joint()) { return err(Status::Error("can't leave a non-joint config")); } - if (cfg.voters_.Outgoing().Size() == 0) { + if (cfg.voters.Outgoing().Size() == 0) { return err(Status::Error("configuration is not joint: %s", cfg.String().c_str())); } - for (auto id : cfg.learners_next_) { - cfg.learners_.insert(id); + for (auto id : cfg.learners_next) { + cfg.learners.insert(id); GetProgress(prs, id)->SetIsLearner(true); } - cfg.learners_next_.clear(); + cfg.learners_next.clear(); - for (auto id : cfg.voters_.Outgoing().IDs()) { - bool is_voter = cfg.voters_.Incoming().Exist(id); - bool is_learner = cfg.learners_.count(id) != 0; + for (auto id : cfg.voters.Outgoing().IDs()) { + bool is_voter = cfg.voters.Incoming().Exist(id); + bool is_learner = cfg.learners.count(id) != 0; if (!is_voter && !is_learner) { prs.erase(id); } } - cfg.voters_.Outgoing().Reset(); - cfg.auto_leave_ = false; + cfg.voters.Outgoing().Reset(); + cfg.auto_leave = false; return CheckAndReturn(std::move(cfg), std::move(prs)); } @@ -94,7 +94,7 @@ std::tuple Changer::Simple( if (!status.IsOK()) { return err(std::move(status)); } - auto n = Symdiff(tracker_.GetConfig().voters_.Incoming().IDs(), cfg.voters_.Incoming().IDs()); + auto n = Symdiff(tracker_.GetConfig().voters.Incoming().IDs(), cfg.voters.Incoming().IDs()); if (n > 1) { return err(Status::Error("more than one voter changed without entering joint config")); } @@ -126,7 +126,7 @@ Status Changer::Apply(ProgressTracker::Config& cfg, ProgressMap& prs, return Status::Error("unexpected conf type %d", cc.type()); } } - if (cfg.voters_.Incoming().Size() == 0) { + if (cfg.voters.Incoming().Size() == 0) { return Status::Error("removed all voters"); } return Status::OK(); @@ -140,9 +140,9 @@ void Changer::MakeVoter(ProgressTracker::Config& cfg, ProgressMap& prs, return; } pr->SetIsLearner(false); - cfg.learners_.erase(id); - cfg.learners_next_.erase(id); - cfg.voters_.Incoming().Add(id); + cfg.learners.erase(id); + cfg.learners_next.erase(id); + cfg.voters.Incoming().Add(id); } void Changer::MakeLearner(ProgressTracker::Config& cfg, ProgressMap& prs, @@ -164,11 +164,11 @@ void Changer::MakeLearner(ProgressTracker::Config& cfg, ProgressMap& prs, // be turned into a learner in LeaveJoint(). // // Otherwise, add a regular learner right away. - if (cfg.voters_.Outgoing().Exist(id)) { - cfg.learners_next_.insert(id); + if (cfg.voters.Outgoing().Exist(id)) { + cfg.learners_next.insert(id); } else { pr->SetIsLearner(true); - cfg.learners_.insert(id); + cfg.learners.insert(id); } } @@ -178,12 +178,12 @@ void Changer::Remove(ProgressTracker::Config& cfg, ProgressMap& prs, if (!pr) { return; } - cfg.voters_.Incoming().Remove(id); - cfg.learners_.erase(id); - cfg.learners_next_.erase(id); + cfg.voters.Incoming().Remove(id); + cfg.learners.erase(id); + cfg.learners_next.erase(id); // If the peer is still a voter in the outgoing config, keep the Progress. - if (!cfg.voters_.Outgoing().Exist(id)) { + if (!cfg.voters.Outgoing().Exist(id)) { prs.erase(id); } } @@ -191,9 +191,9 @@ void Changer::Remove(ProgressTracker::Config& cfg, ProgressMap& prs, void Changer::InitProgress(ProgressTracker::Config& cfg, ProgressMap& prs, uint64_t id, bool is_learner) const { if (!is_learner) { - cfg.voters_.Incoming().Add(id); + cfg.voters.Incoming().Add(id); } else { - cfg.learners_.insert(id); + cfg.learners.insert(id); } prs[id] = std::make_shared( @@ -233,23 +233,23 @@ Status Changer::CheckInvariants(const ProgressTracker::Config& cfg, } return Status::OK(); }; - auto status = check_exist(cfg.voters_.IDs()); + auto status = check_exist(cfg.voters.IDs()); if (!status.IsOK()) { return status; } - status = check_exist(cfg.learners_); + status = check_exist(cfg.learners); if (!status.IsOK()) { return status; } - status = check_exist(cfg.learners_next_); + status = check_exist(cfg.learners_next); if (!status.IsOK()) { return status; } // Any staged learner was staged because it could not be directly added due // to a conflicting voter in the outgoing config. - for (auto id : cfg.learners_next_) { - if (!cfg.voters_.Outgoing().Exist(id)) { + for (auto id : cfg.learners_next) { + if (!cfg.voters.Outgoing().Exist(id)) { return Status::Error("%d is in LearnersNext, but not Voters[1]", id); } if (GetProgress(prs, id)->IsLearner()) { @@ -258,11 +258,11 @@ Status Changer::CheckInvariants(const ProgressTracker::Config& cfg, } // Conversely Learners and Voters doesn't intersect at all. - for (auto id : cfg.learners_) { - if (cfg.voters_.Outgoing().Exist(id)) { + for (auto id : cfg.learners) { + if (cfg.voters.Outgoing().Exist(id)) { return Status::Error("%d is in Learners and Voters[1]", id); } - if (cfg.voters_.Incoming().Exist(id)) { + if (cfg.voters.Incoming().Exist(id)) { return Status::Error("%d is in Learners and Voters[0]", id); } if (!GetProgress(prs, id)->IsLearner()) { @@ -271,13 +271,13 @@ Status Changer::CheckInvariants(const ProgressTracker::Config& cfg, } if (!cfg.Joint()) { - if (cfg.voters_.Outgoing().Size() > 0) { + if (cfg.voters.Outgoing().Size() > 0) { return Status::Error("cfg.Voters[1] must be nil when not joint"); } - if (!cfg.learners_next_.empty()) { + if (!cfg.learners_next.empty()) { return Status::Error("cfg.LearnersNext must be nil when not joint"); } - if (cfg.auto_leave_) { + if (cfg.auto_leave) { return Status::Error("AutoLeave must be false when not joint"); } } diff --git a/src/confchange/cquick_test.cc b/src/confchange/cquick_test.cc index f1a0f4b..80e27b6 100644 --- a/src/confchange/cquick_test.cc +++ b/src/confchange/cquick_test.cc @@ -106,7 +106,7 @@ TEST(ConfChange, Quick) { auto [cfg2a, prs2a, s2] = c.EnterJoint(true, ccs); EXPECT_TRUE(s2.IsOK()) << s2.Str(); - cfg2a.auto_leave_ = false; + cfg2a.auto_leave = false; EXPECT_EQ(cfg, cfg2a); EXPECT_TRUE(isEqual(prs, prs2a)); diff --git a/src/raft.cc b/src/raft.cc index e53180d..42c789d 100644 --- a/src/raft.cc +++ b/src/raft.cc @@ -440,7 +440,7 @@ void Raft::Advance(const Ready& rd) { auto old_applied = raft_log_->Applied(); raft_log_->AppliedTo(new_applied); - if (trk_.GetConfig().auto_leave_ && old_applied <= pending_conf_index_ && + if (trk_.GetConfig().auto_leave && old_applied <= pending_conf_index_ && new_applied >= pending_conf_index_ && state_ == RaftStateType::kLeader) { // If the current (and most recent, at least for this leader's term) @@ -1008,7 +1008,7 @@ Status Raft::StepLeader(MsgPtr m) { } if (!cc.IsNull()) { auto already_pending = pending_conf_index_ > raft_log_->Applied(); - auto already_joint = trk_.GetConfig().voters_.At(1).Size() > 0; + auto already_joint = trk_.GetConfig().voters.At(1).Size() > 0; auto wants_leave_joint = cc.AsV2().changes().size() == 0; std::string refused; @@ -1267,7 +1267,7 @@ Status Raft::StepLeader(MsgPtr m) { return Status::OK(); } - if (trk_.GetConfig().voters_.VoteResult(read_only_->RecvAck( + if (trk_.GetConfig().voters.VoteResult(read_only_->RecvAck( m->from(), m->context())) != VoteState::kVoteWon) { return Status::OK(); } @@ -1761,7 +1761,7 @@ raftpb::ConfState Raft::SwitchToConfig(const ProgressTracker::Config& cfg, } // If the leadTransferee was removed or demoted, abort the leadership // transfer. - if (trk_.GetConfig().voters_.IDs().count(lead_transferee_) == 0 && + if (trk_.GetConfig().voters.IDs().count(lead_transferee_) == 0 && lead_transferee_ != 0) { AbortLeaderTransfer(); } diff --git a/src/raft_test.cc b/src/raft_test.cc index 2d4986c..1638250 100644 --- a/src/raft_test.cc +++ b/src/raft_test.cc @@ -2320,8 +2320,8 @@ TEST(Raft, LearnerReceiveSnapshot) { n1->Get()->Restore(s); auto ready = craft::NewReady(n1->Get(), craft::SoftState{}, raftpb::HardState{}); - storage->ApplySnapshot(ready.snapshot); - n1->Get()->Advance(ready); + storage->ApplySnapshot(ready->snapshot); + n1->Get()->Advance(*ready); // Force set n1 appplied index. n1->Get()->GetRaftLog()->AppliedTo(n1->Get()->GetRaftLog()->Committed()); @@ -2851,8 +2851,8 @@ TEST(Raft, LeaderTransferAfterSnapshot) { // Apply snapshot and resume progress auto follower = std::dynamic_pointer_cast(nt->Peers()[3]); auto ready = craft::NewReady(follower->Get(), craft::SoftState{}, raftpb::HardState{}); - nt->Storages()[3]->ApplySnapshot(ready.snapshot); - follower->Get()->Advance(ready); + nt->Storages()[3]->ApplySnapshot(ready->snapshot); + follower->Get()->Advance(*ready); nt->SetMsgHook(nullptr); nt->Send({filtered}); checkLeaderTransferState(lead->Get(), craft::RaftStateType::kFollower, 3); diff --git a/src/raft_test_util.cc b/src/raft_test_util.cc index c90315e..9a2dbcd 100644 --- a/src/raft_test_util.cc +++ b/src/raft_test_util.cc @@ -20,7 +20,7 @@ NetWork::NetWork(NetWork::ConfigFunc cfg_func, std::vector learners; auto sm = std::dynamic_pointer_cast(peer); auto raft = sm->Get(); - for (auto i : raft->GetTracker().GetConfig().learners_) { + for (auto i : raft->GetTracker().GetConfig().learners) { learners.insert(i); } raft->SetID(id); @@ -31,9 +31,9 @@ NetWork::NetWork(NetWork::ConfigFunc cfg_func, std::vector(std::make_shared()); if (learners.count(peer_addrs[i]) > 0) { pr->SetIsLearner(true); - raft->GetTracker().GetConfig().learners_.insert(peer_addrs[i]); + raft->GetTracker().GetConfig().learners.insert(peer_addrs[i]); } else { - raft->GetTracker().GetConfig().voters_.Incoming().Add(peer_addrs[i]); + raft->GetTracker().GetConfig().voters.Incoming().Add(peer_addrs[i]); } prs[peer_addrs[i]] = pr; } diff --git a/src/rawnode.cc b/src/rawnode.cc index 493804a..0beefeb 100644 --- a/src/rawnode.cc +++ b/src/rawnode.cc @@ -156,46 +156,21 @@ Status RawNode::Step(MsgPtr m) { return Status::Error(kErrStepPeerNotFound); } -Ready RawNode::GetReady() { - auto rd = ReadyWithoutAccept(); - AcceptReady(rd); - return rd; +std::shared_ptr RawNode::GetReady() { + ready_ = ReadyWithoutAccept(); + AcceptReady(ready_); + return ready_; } -Ready RawNode::ReadyWithoutAccept() { - // Ready rd; - // rd.entries = raft_->GetRaftLog()->UnstableEntries(); - // rd.committed_entries = raft_->GetRaftLog()->NextEnts(); - // rd.messages = raft_->Msgs(); - - // auto soft_st = raft_->GetSoftState(); - // if (soft_st != *prev_soft_st_) { - // rd.soft_state = soft_st; - // } - - // auto hard_st = raft_->GetHardState(); - // if (hard_st != *prev_hard_st_) { - // rd.hard_state = hard_st; - // } - - // if (raft_->GetRaftLog()->UnstableSnapshot()) { - // rd.snapshot = raft_->GetRaftLog()->UnstableSnapshot(); - // } - - // if (!raft_->GetReadStates().empty()) { - // rd.read_states = raft_->GetReadStates(); - // } - - // rd.must_sync = MustSync(raft_->GetHardState(), *prev_hard_st_, rd.entries.size()); - // return rd; +std::shared_ptr RawNode::ReadyWithoutAccept() { return NewReady(raft_.get(), prev_soft_st_, prev_hard_st_); } -void RawNode::AcceptReady(const Ready& rd) { - if (rd.soft_state.has_value()) { - prev_soft_st_ = *(rd.soft_state); +void RawNode::AcceptReady(std::shared_ptr rd) { + if (rd->soft_state.has_value()) { + prev_soft_st_ = *(rd->soft_state); } - if (!rd.read_states.empty()) { + if (!rd->read_states.empty()) { raft_->ClearReadStates(); } raft_->ClearMsgs(); @@ -223,11 +198,12 @@ bool RawNode::HasReady() const { return false; } -void RawNode::Advance(const Ready& rd) { - if (!IsEmptyHardState(rd.hard_state)) { - prev_hard_st_ = rd.hard_state; +void RawNode::Advance() { + if (!IsEmptyHardState(ready_->hard_state)) { + prev_hard_st_ = ready_->hard_state; } - raft_->Advance(rd); + raft_->Advance(*ready_); + ready_.reset(); } NodeStatus RawNode::GetStatus() const { @@ -294,33 +270,34 @@ void RawNode::ReadIndex(const std::string& rctx) { raft_->Step(m); } -Ready NewReady(Raft* raft, const SoftState& prev_soft_st, const raftpb::HardState& prev_hard_st) { - Ready rd; - rd.entries = raft->GetRaftLog()->UnstableEntries(); - rd.committed_entries = raft->GetRaftLog()->NextEnts(); - rd.messages = raft->Msgs(); +std::shared_ptr NewReady(Raft* raft, const SoftState& prev_soft_st, const raftpb::HardState& prev_hard_st) { + auto rd = std::make_shared(); + + rd->entries = raft->GetRaftLog()->UnstableEntries(); + rd->committed_entries = raft->GetRaftLog()->NextEnts(); + rd->messages = raft->Msgs(); auto soft_st = raft->GetSoftState(); if (!(soft_st == prev_soft_st)) { - rd.soft_state = soft_st; + rd->soft_state = soft_st; } auto hard_st = raft->GetHardState(); if (!(hard_st == prev_hard_st)) { - rd.hard_state = hard_st; + rd->hard_state = hard_st; } if (raft->GetRaftLog()->UnstableSnapshot()) { - rd.snapshot = raft->GetRaftLog()->UnstableSnapshot(); + rd->snapshot = raft->GetRaftLog()->UnstableSnapshot(); } else { - rd.snapshot = std::make_shared(); + rd->snapshot = std::make_shared(); } if (!raft->GetReadStates().empty()) { - rd.read_states = raft->GetReadStates(); + rd->read_states = raft->GetReadStates(); } - rd.must_sync = MustSync(raft->GetHardState(), prev_hard_st, rd.entries.size()); + rd->must_sync = MustSync(raft->GetHardState(), prev_hard_st, rd->entries.size()); return rd; } diff --git a/src/rawnode.h b/src/rawnode.h index 9d60390..8782679 100644 --- a/src/rawnode.h +++ b/src/rawnode.h @@ -131,16 +131,16 @@ class RawNode { // includes appending and applying entries or a snapshot, updating the HardState, // and sending messages. The returned Ready() *must* be handled and subsequently // passed back via Advance(). - Ready GetReady(); + std::shared_ptr GetReady(); // ReadyWithoutAccept returns a Ready. This is a read-only operation, i.e. there // is no obligation that the Ready must be handled. - Ready ReadyWithoutAccept(); + std::shared_ptr ReadyWithoutAccept(); // acceptReady is called when the consumer of the RawNode has decided to go // ahead and handle a Ready. Nothing must alter the state of the RawNode between // this call and the prior call to Ready(). - void AcceptReady(const Ready& rd); + void AcceptReady(std::shared_ptr rd); // HasReady called when RawNode user need to check if any Ready pending. // Checking logic in this method should be consistent with Ready.containsUpdates(). @@ -148,7 +148,7 @@ class RawNode { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. - void Advance(const Ready& rd); + void Advance(); // GetStatus returns the current status of the given group. This allocates, see // BasicStatus and WithProgress for allocation-friendlier choices. @@ -177,23 +177,16 @@ class RawNode { // processed safely. The read state will have the same rctx attached. void ReadIndex(const std::string& rctx); - // void SetPreSoftState(const SoftState& state) { - // pre_soft_st_ = state; - // } - // void SetPreHardState(const raftpb::HardState& state) { - // pre_hard_st_ = state; - // } - Raft* GetRaft() { return raft_.get(); } private: std::unique_ptr raft_; - // std::optional prev_soft_st_; - // std::optional prev_hard_st_; SoftState prev_soft_st_; raftpb::HardState prev_hard_st_; + + std::shared_ptr ready_; }; -Ready NewReady(Raft* raft, const SoftState& prev_soft_st, const raftpb::HardState& prev_hard_st); +std::shared_ptr NewReady(Raft* raft, const SoftState& prev_soft_st, const raftpb::HardState& prev_hard_st); } // namespace craft \ No newline at end of file diff --git a/src/rawnode_test.cc b/src/rawnode_test.cc index aa9a836..e5f1699 100644 --- a/src/rawnode_test.cc +++ b/src/rawnode_test.cc @@ -162,8 +162,8 @@ TEST(RawNode, ProposeAndConfChange) { std::shared_ptr cs; while (!cs) { auto rd = raw_node->GetReady(); - s->Append(rd.entries); - for (auto& ent : rd.committed_entries) { + s->Append(rd->entries); + for (auto& ent : rd->committed_entries) { craft::ConfChangeI cc; if (ent->type() == raftpb::EntryType::EntryConfChange) { raftpb::ConfChange ccc; @@ -178,9 +178,9 @@ TEST(RawNode, ProposeAndConfChange) { cs = std::make_shared(raw_node->ApplyConfChange(cc)); } } - raw_node->Advance(rd); + raw_node->Advance(); // Once we are the leader, propose a command and a ConfChange. - if (!proposed && rd.soft_state->lead == raw_node->GetRaft()->ID()) { + if (!proposed && rd->soft_state->lead == raw_node->GetRaft()->ID()) { auto s1 = raw_node->Propose("somedata"); ASSERT_TRUE(s1.IsOK()); auto [ccv1, ok] = tt.cc.AsV1(); @@ -241,7 +241,7 @@ TEST(RawNode, ProposeAndConfChange) { auto rd = raw_node->GetReady(); std::string context; if (!tt.exp.auto_leave()) { - ASSERT_TRUE(rd.entries.empty()); + ASSERT_TRUE(rd->entries.empty()); if (tt.exp2 == nullptr) { continue; } @@ -254,11 +254,11 @@ TEST(RawNode, ProposeAndConfChange) { } // Check that the right ConfChange comes out. - ASSERT_EQ(rd.entries.size(), static_cast(1)); - ASSERT_EQ(rd.entries[0]->type(), raftpb::EntryType::EntryConfChangeV2); + ASSERT_EQ(rd->entries.size(), static_cast(1)); + ASSERT_EQ(rd->entries[0]->type(), raftpb::EntryType::EntryConfChangeV2); raftpb::ConfChangeV2 cc; - ASSERT_TRUE(cc.ParseFromString(rd.entries[0]->data())); + ASSERT_TRUE(cc.ParseFromString(rd->entries[0]->data())); ASSERT_EQ(cc.context(), context); // Lie and pretend the ConfChange applied. It won't do so because now // we require the joint quorum and we're only running one node. @@ -288,8 +288,8 @@ TEST(RawNode, JointAutoLeave) { std::shared_ptr cs; while (!cs) { auto rd = raw_node->GetReady(); - s->Append(rd.entries); - for (auto& ent : rd.committed_entries) { + s->Append(rd->entries); + for (auto& ent : rd->committed_entries) { craft::ConfChangeI cc; if (ent->type() == raftpb::EntryType::EntryConfChangeV2) { raftpb::ConfChangeV2 ccc; @@ -302,9 +302,9 @@ TEST(RawNode, JointAutoLeave) { cs = std::make_shared(raw_node->ApplyConfChange(cc)); } } - raw_node->Advance(rd); + raw_node->Advance(); // Once we are the leader, propose a command and a ConfChange. - if (!proposed && rd.soft_state->lead == raw_node->GetRaft()->ID()) { + if (!proposed && rd->soft_state->lead == raw_node->GetRaft()->ID()) { auto s1 = raw_node->Propose("somedata"); ASSERT_TRUE(s1.IsOK()); ccdata = test_cc.SerializeAsString(); @@ -334,22 +334,22 @@ TEST(RawNode, JointAutoLeave) { // Move the RawNode along. It should not leave joint because it's follower. auto rd = raw_node->ReadyWithoutAccept(); // Check that the right ConfChange comes out. - ASSERT_EQ(rd.entries.size(), static_cast(0)); + ASSERT_EQ(rd->entries.size(), static_cast(0)); // Make it leader again. It should leave joint automatically after moving apply index. raw_node->Campaign(); rd = raw_node->GetReady(); - s->Append(rd.entries); - raw_node->Advance(rd); + s->Append(rd->entries); + raw_node->Advance(); rd = raw_node->GetReady(); - s->Append(rd.entries); + s->Append(rd->entries); // Check that the right ConfChange comes out. - ASSERT_EQ(rd.entries.size(), static_cast(1)); - ASSERT_EQ(rd.entries[0]->type(), raftpb::EntryType::EntryConfChangeV2); + ASSERT_EQ(rd->entries.size(), static_cast(1)); + ASSERT_EQ(rd->entries[0]->type(), raftpb::EntryType::EntryConfChangeV2); raftpb::ConfChangeV2 cc; - ASSERT_TRUE(cc.ParseFromString(rd.entries[0]->data())); + ASSERT_TRUE(cc.ParseFromString(rd->entries[0]->data())); ASSERT_EQ(cc.context().empty(), true); // Lie and pretend the ConfChange applied. It won't do so because now // we require the joint quorum and we're only running one node. @@ -363,32 +363,32 @@ TEST(RawNode, RawNodeProposeAddDuplicateNode) { auto s = newTestMemoryStorage({withPeers({1})}); auto raw_node = craft::RawNode::New(newTestConfig(1, 10, 1, s)); auto rd = raw_node->GetReady(); - s->Append(rd.entries); - raw_node->Advance(rd); + s->Append(rd->entries); + raw_node->Advance(); raw_node->Campaign(); while (1) { rd = raw_node->GetReady(); - s->Append(rd.entries); - if (rd.soft_state->lead == raw_node->GetRaft()->ID()) { - raw_node->Advance(rd); + s->Append(rd->entries); + if (rd->soft_state->lead == raw_node->GetRaft()->ID()) { + raw_node->Advance(); break; } - raw_node->Advance(rd); + raw_node->Advance(); } auto propose_confchange_and_apply = [&raw_node, &rd, &s](raftpb::ConfChange cc) { raw_node->ProposeConfChange(craft::ConfChangeI(cc)); rd = raw_node->GetReady(); - s->Append(rd.entries); - for (auto entry : rd.committed_entries) { + s->Append(rd->entries); + for (auto entry : rd->committed_entries) { if (entry->type() == raftpb::EntryType::EntryConfChange) { raftpb::ConfChange cc; cc.ParseFromString(entry->data()); raw_node->ApplyConfChange(craft::ConfChangeI(cc)); } } - raw_node->Advance(rd); + raw_node->Advance(); }; raftpb::ConfChange cc1; @@ -438,9 +438,9 @@ TEST(RawNode, ReadIndex) { ASSERT_TRUE(has_ready); auto rd = rawnode->GetReady(); - ASSERT_EQ(rd.read_states, wrs); - s->Append(rd.entries); - rawnode->Advance(rd); + ASSERT_EQ(rd->read_states, wrs); + s->Append(rd->entries); + rawnode->Advance(); // ensure raft.readStates is reset after advance ASSERT_EQ(rawnode->GetRaft()->GetReadStates().size(), static_cast(0)); @@ -448,17 +448,17 @@ TEST(RawNode, ReadIndex) { rawnode->Campaign(); while (1) { rd = rawnode->GetReady(); - s->Append(rd.entries); + s->Append(rd->entries); - if (rd.soft_state->lead == rawnode->GetRaft()->ID()) { - rawnode->Advance(rd); + if (rd->soft_state->lead == rawnode->GetRaft()->ID()) { + rawnode->Advance(); // Once we are the leader, issue a ReadIndex request rawnode->GetRaft()->SetStep(append_step); rawnode->ReadIndex(wrequest_ctx); break; } - rawnode->Advance(rd); + rawnode->Advance(); } // ensure that MsgReadIndex message is sent to the underlying raft ASSERT_EQ(msgs.size(), static_cast(1)); @@ -580,13 +580,13 @@ TEST(RawNode, Start) { ASSERT_TRUE(rawnode->HasReady()); auto rd = rawnode->GetReady(); - storage->Append(rd.entries); - rawnode->Advance(rd); + storage->Append(rd->entries); + rawnode->Advance(); - rd.soft_state.reset(); + rd->soft_state.reset(); want.soft_state.reset(); - readyEqual(rd, want); + readyEqual(*rd, want); ASSERT_FALSE(rawnode->HasReady()); } @@ -610,8 +610,8 @@ TEST(RawNode, Restart) { storage->Append(entries); auto rawnode = craft::RawNode::New(newTestConfig(1, 10, 1, storage)); auto rd = rawnode->GetReady(); - readyEqual(rd, want); - rawnode->Advance(rd); + readyEqual(*rd, want); + rawnode->Advance(); ASSERT_FALSE(rawnode->HasReady()); } @@ -641,8 +641,8 @@ TEST(RawNode, RestartFromSnapshot) { s->Append(entries); auto rawnode = craft::RawNode::New(newTestConfig(1, 10, 1, s)); auto rd = rawnode->GetReady(); - readyEqual(rd, want); - rawnode->Advance(rd); + readyEqual(*rd, want); + rawnode->Advance(); ASSERT_FALSE(rawnode->HasReady()); } @@ -664,7 +664,7 @@ TEST(RawNode, NodeStatus) { ASSERT_EQ(exp->String(), act->String()); craft::ProgressTracker::Config exp_cfg; - exp_cfg.voters_.Incoming().Add(1); + exp_cfg.voters.Incoming().Add(1); ASSERT_EQ(status.config, exp_cfg); } @@ -713,12 +713,12 @@ TEST(RawNode, CommitPaginationAfterRestart) { auto rawnode = craft::RawNode::New(cfg); for (uint64_t highest_applied = 0; highest_applied != 11;) { auto rd = rawnode->GetReady(); - auto n = rd.committed_entries.size(); + auto n = rd->committed_entries.size(); ASSERT_NE(n, static_cast(0)); - auto next = rd.committed_entries[0]->index(); + auto next = rd->committed_entries[0]->index(); ASSERT_FALSE(highest_applied != 0 && highest_applied + 1 != next); - highest_applied = rd.committed_entries[n-1]->index(); - rawnode->Advance(rd); + highest_applied = rd->committed_entries[n-1]->index(); + rawnode->Advance(); rawnode->Step(NEW_MSG() .Type(raftpb::MessageType::MsgHeartbeat) .To(1) @@ -743,19 +743,19 @@ TEST(RawNode, BoundedLogGrowthWithPartition) { cfg.max_uncommitted_entries_size = max_entry_size; auto rawnode = craft::RawNode::New(cfg); auto rd = rawnode->GetReady(); - s->Append(rd.entries); - rawnode->Advance(rd); + s->Append(rd->entries); + rawnode->Advance(); // Become the leader. rawnode->Campaign(); while (1) { rd = rawnode->GetReady(); - s->Append(rd.entries); - if (rd.soft_state->lead == rawnode->GetRaft()->ID()) { - rawnode->Advance(rd); + s->Append(rd->entries); + if (rd->soft_state->lead == rawnode->GetRaft()->ID()) { + rawnode->Advance(); break; } - rawnode->Advance(rd); + rawnode->Advance(); } // Simulate a network partition while we make our proposals by never @@ -775,9 +775,9 @@ TEST(RawNode, BoundedLogGrowthWithPartition) { // Recover from the partition. The uncommitted tail of the Raft log should // disappear as entries are committed. rd = rawnode->GetReady(); - ASSERT_EQ(rd.committed_entries.size(), max_entries); - s->Append(rd.entries); - rawnode->Advance(rd); + ASSERT_EQ(rd->committed_entries.size(), max_entries); + s->Append(rd->entries); + rawnode->Advance(); check_uncommitted(0); } @@ -792,8 +792,8 @@ TEST(RawNode, ConsumeReady) { // Inject first message, make sure it's visible via readyWithoutAccept. rn->GetRaft()->GetMsgsForTest().push_back(m1); auto rd = rn->ReadyWithoutAccept(); - ASSERT_EQ(rd.messages.size(), static_cast(1)); - ASSERT_EQ(rd.messages[0]->context(), m1->context()); + ASSERT_EQ(rd->messages.size(), static_cast(1)); + ASSERT_EQ(rd->messages[0]->context(), m1->context()); ASSERT_EQ(rn->GetRaft()->Msgs().size(), static_cast(1)); ASSERT_EQ(rn->GetRaft()->Msgs()[0]->context(), m1->context()); @@ -802,11 +802,11 @@ TEST(RawNode, ConsumeReady) { // to leaving it in both places). rd = rn->GetReady(); ASSERT_EQ(rn->GetRaft()->Msgs().size(), static_cast(0)); - ASSERT_EQ(rd.messages.size(), static_cast(1)); - ASSERT_EQ(rd.messages[0]->context(), m1->context()); + ASSERT_EQ(rd->messages.size(), static_cast(1)); + ASSERT_EQ(rd->messages[0]->context(), m1->context()); // Add a message to raft to make sure that Advance() doesn't drop it. rn->GetRaft()->GetMsgsForTest().push_back(m2); - rn->Advance(rd); + rn->Advance(); ASSERT_EQ(rn->GetRaft()->Msgs().size(), static_cast(1)); ASSERT_EQ(rn->GetRaft()->Msgs()[0]->context(), m2->context()); } diff --git a/src/tracker/tracker.cc b/src/tracker/tracker.cc index d1da0ee..29d1a61 100644 --- a/src/tracker/tracker.cc +++ b/src/tracker/tracker.cc @@ -55,31 +55,31 @@ const ProgressPtr GetProgress(const ProgressMap& prs, uint64_t id) { raftpb::ConfState ProgressTracker::ConfState() { raftpb::ConfState conf_state; - std::vector voters = config_.voters_.Incoming().Slice(); + std::vector voters = config_.voters.Incoming().Slice(); for (uint64_t voter : voters) { conf_state.add_voters(voter); } - std::vector voters_outgoing = config_.voters_.Outgoing().Slice(); + std::vector voters_outgoing = config_.voters.Outgoing().Slice(); for (uint64_t voter : voters_outgoing) { conf_state.add_voters_outgoing(voter); } - for (uint64_t learner : config_.learners_) { + for (uint64_t learner : config_.learners) { conf_state.add_learners(learner); } - for (uint64_t learner : config_.learners_next_) { + for (uint64_t learner : config_.learners_next) { conf_state.add_learners_next(learner); } - conf_state.set_auto_leave(config_.auto_leave_); + conf_state.set_auto_leave(config_.auto_leave); return conf_state; } uint64_t ProgressTracker::Committed() const { - return config_.voters_.CommittedIndex(MatchAckIndexer(&progress_)); + return config_.voters.CommittedIndex(MatchAckIndexer(&progress_)); } void ProgressTracker::Visit(Closure&& func) { @@ -97,11 +97,11 @@ bool ProgressTracker::QuorumActive() { votes[id] = pr->RecentActive(); }); - return config_.voters_.VoteResult(votes) == kVoteWon; + return config_.voters.VoteResult(votes) == kVoteWon; } std::vector ProgressTracker::VoterNodes() { - std::set m = config_.voters_.IDs(); + std::set m = config_.voters.IDs(); std::vector nodes; for (uint64_t n : m) { nodes.push_back(n); @@ -110,12 +110,12 @@ std::vector ProgressTracker::VoterNodes() { } std::vector ProgressTracker::LearnerNodes() { - if (config_.learners_.empty()) { + if (config_.learners.empty()) { return {}; } std::vector nodes; - for (uint64_t n : config_.learners_) { + for (uint64_t n : config_.learners) { nodes.push_back(n); } return nodes; @@ -149,7 +149,7 @@ std::tuple ProgressTracker::TallyVotes() const { rejected++; } } - return std::make_tuple(granted, rejected, config_.voters_.VoteResult(votes_)); + return std::make_tuple(granted, rejected, config_.voters.VoteResult(votes_)); } std::shared_ptr ProgressTracker::GetProgress(uint64_t id) { diff --git a/src/tracker/tracker.h b/src/tracker/tracker.h index 76a361c..6d3e7b2 100644 --- a/src/tracker/tracker.h +++ b/src/tracker/tracker.h @@ -40,12 +40,12 @@ class ProgressTracker { public: // Config reflects the configuration tracked in a ProgressTracker. struct Config { - JointConfig voters_; + JointConfig voters; // AutoLeave is true if the configuration is joint and a transition to the // incoming configuration should be carried out automatically by Raft when // this is possible. If false, the configuration will be joint until the // application initiates the transition manually. - bool auto_leave_ = false; + bool auto_leave = false; // Learners is a set of IDs corresponding to the learners active in the // current configuration. // @@ -54,7 +54,7 @@ class ProgressTracker { // learner it can't be in either half of the joint config. This invariant // simplifies the implementation since it allows peers to have clarity about // its current role without taking into account joint consensus. - std::set learners_; + std::set learners; // When we turn a voter into a learner during a joint consensus transition, // we cannot add the learner directly when entering the joint state. This is // because this would violate the invariant that the intersection of @@ -89,28 +89,28 @@ class ProgressTracker { // also a voter in the joint config. In this case, the learner is added // right away when entering the joint configuration, so that it is caught up // as soon as possible. - std::set learners_next_; + std::set learners_next; bool operator==(const Config& other) const { - return voters_.Incoming().IDs() == other.voters_.Incoming().IDs() && - voters_.Outgoing().IDs() == other.voters_.Outgoing().IDs() && - auto_leave_ == other.auto_leave_ && - learners_ == other.learners_ && - learners_next_ == other.learners_next_; + return voters.Incoming().IDs() == other.voters.Incoming().IDs() && + voters.Outgoing().IDs() == other.voters.Outgoing().IDs() && + auto_leave == other.auto_leave && + learners == other.learners && + learners_next == other.learners_next; } - bool Joint() const { return voters_.Outgoing().Size() > 0; } + bool Joint() const { return voters.Outgoing().Size() > 0; } std::string String() const { std::stringstream ss; - ss << "voters=" << voters_.String(); - if (!learners_.empty()) { - ss << " learners=" << MajorityConfig(learners_).String(); + ss << "voters=" << voters.String(); + if (!learners.empty()) { + ss << " learners=" << MajorityConfig(learners).String(); } - if (!learners_next_.empty()) { - ss << " learners_next=" << MajorityConfig(learners_next_).String(); + if (!learners_next.empty()) { + ss << " learners_next=" << MajorityConfig(learners_next).String(); } - if (auto_leave_) { + if (auto_leave) { ss << " autoleave"; } return ss.str(); @@ -127,8 +127,8 @@ class ProgressTracker { // IsSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. bool IsSingleton() { - return config_.voters_.Incoming().Size() == 1 && - config_.voters_.Outgoing().Size() == 0; + return config_.voters.Incoming().Size() == 1 && + config_.voters.Outgoing().Size() == 0; } // Committed returns the largest log index known to be committed based on what