diff --git a/Cargo.toml b/Cargo.toml index 990aa08..e666a27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmqtt-raft" -version = "0.4.0" +version = "0.4.1" authors = ["rmqtt "] edition = "2021" license = "MIT OR Apache-2.0" diff --git a/src/lib.rs b/src/lib.rs index 6051d84..b330ec4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,12 @@ use std::time::Duration; +// Re-exporting necessary types and modules for external use. pub use crate::error::{Error, Result}; pub use crate::message::Status; pub use crate::raft::{Mailbox, Raft, Store}; pub use tikv_raft::ReadOnlyOption; +// Importing modules for internal use. mod error; mod message; mod raft; @@ -13,43 +15,70 @@ mod raft_server; mod raft_service; mod storage; +/// Configuration options for the Raft-based system. #[derive(Clone)] pub struct Config { #[cfg(feature = "reuseaddr")] + /// Whether to reuse local addresses. This option is enabled only if the `reuseaddr` feature is active. pub reuseaddr: bool, + #[cfg(all( feature = "reuseport", not(any(target_os = "solaris", target_os = "illumos")) ))] + /// Whether to reuse local ports. This option is enabled only if the `reuseport` feature is active + /// and the target OS is not Solaris or Illumos. pub reuseport: bool, + + /// The timeout duration for gRPC calls. pub grpc_timeout: Duration, + + /// The maximum number of concurrent gRPC calls. pub grpc_concurrency_limit: usize, + + /// The maximum size of gRPC messages in bytes. pub grpc_message_size: usize, - //GRPC failed to fuse threshold + + /// The threshold for the gRPC circuit breaker. If the number of failed requests exceeds this threshold, + /// the circuit breaker will trip. pub grpc_breaker_threshold: u64, + + /// The interval at which the gRPC circuit breaker will retry after tripping. pub grpc_breaker_retry_interval: Duration, - //Proposal batchs + + /// The maximum number of proposals to batch together before processing. pub proposal_batch_size: usize, + + /// The timeout duration for collecting proposals into a batch. If this timeout is reached, + /// the collected proposals will be processed regardless of the batch size. pub proposal_batch_timeout: Duration, - //Snapshot generation interval + + /// The interval at which snapshots are generated. pub snapshot_interval: Duration, + + /// The interval at which heartbeat messages are sent to maintain leader election and cluster health. pub heartbeat: Duration, + + /// Configuration options for the Raft protocol. pub raft_cfg: tikv_raft::Config, } impl Default for Config { + /// Provides default values for the `Config` struct. fn default() -> Self { Self { #[cfg(feature = "reuseaddr")] reuseaddr: false, + #[cfg(all( feature = "reuseport", not(any(target_os = "solaris", target_os = "illumos")) ))] reuseport: false, + grpc_timeout: Duration::from_secs(6), grpc_concurrency_limit: 200, - grpc_message_size: 50 * 1024 * 1024, + grpc_message_size: 50 * 1024 * 1024, // 50 MB grpc_breaker_threshold: 4, grpc_breaker_retry_interval: Duration::from_millis(2500), proposal_batch_size: 50, diff --git a/src/message.rs b/src/message.rs index ee3bb05..ead5b44 100644 --- a/src/message.rs +++ b/src/message.rs @@ -5,53 +5,60 @@ use futures::channel::oneshot::Sender; use serde::{Deserialize, Serialize}; use tikv_raft::eraftpb::{ConfChange, Message as RaftMessage}; +/// Enumeration representing various types of responses that can be sent back to clients. #[derive(Serialize, Deserialize, Debug)] pub enum RaftResponse { + /// Indicates that the request was sent to the wrong leader. WrongLeader { leader_id: u64, leader_addr: Option, }, + /// Indicates that a join request was successful. JoinSuccess { assigned_id: u64, peer_addrs: HashMap, }, - RequestId { - leader_id: u64, - }, + /// Contains the leader ID in response to a request for ID. + RequestId { leader_id: u64 }, + /// Represents an error with a message. Error(String), - Response { - data: Vec, - }, + /// Contains arbitrary response data. + Response { data: Vec }, + /// Represents the status of the system. Status(Status), + /// Represents a successful operation. Ok, } +/// Enumeration representing different types of messages that can be sent within the system. #[allow(dead_code)] pub enum Message { + /// A proposal message to be processed. Propose { proposal: Vec, chan: Sender, }, + /// A query message to be processed. Query { query: Vec, chan: Sender, }, + /// A configuration change message to be processed. ConfigChange { change: ConfChange, chan: Sender, }, - RequestId { - chan: Sender, - }, - ReportUnreachable { - node_id: u64, - }, + /// A request for the leader's ID. + RequestId { chan: Sender }, + /// Report that a node is unreachable. + ReportUnreachable { node_id: u64 }, + /// A Raft message to be processed. Raft(Box), - Status { - chan: Sender, - }, + /// A request for the status of the system. + Status { chan: Sender }, } +/// Struct representing the status of the system. #[derive(Serialize, Deserialize, Debug)] pub struct Status { pub id: u64, @@ -63,28 +70,37 @@ pub struct Status { } impl Status { + /// Checks if the node has started. #[inline] pub fn is_started(&self) -> bool { self.leader_id > 0 } + /// Checks if this node is the leader. #[inline] pub fn is_leader(&self) -> bool { self.leader_id == self.id } } +/// Enumeration for reply channels which could be single or multiple. pub(crate) enum ReplyChan { + /// Single reply channel with its timestamp. One((Sender, Instant)), + /// Multiple reply channels with their timestamps. More(Vec<(Sender, Instant)>), } +/// Enumeration for proposals which could be a single proposal or multiple proposals. #[derive(Serialize, Deserialize)] pub(crate) enum Proposals { + /// A single proposal. One(Vec), + /// Multiple proposals. More(Vec>), } +/// A struct to manage proposal batching and sending. pub(crate) struct Merger { proposals: Vec>, chans: Vec<(Sender, Instant)>, @@ -94,6 +110,14 @@ pub(crate) struct Merger { } impl Merger { + /// Creates a new `Merger` instance with the specified batch size and timeout. + /// + /// # Parameters + /// - `proposal_batch_size`: The maximum number of proposals to include in a batch. + /// - `proposal_batch_timeout`: The timeout duration for collecting proposals. + /// + /// # Returns + /// A new `Merger` instance. pub fn new(proposal_batch_size: usize, proposal_batch_timeout: Duration) -> Self { Self { proposals: Vec::new(), @@ -104,17 +128,30 @@ impl Merger { } } + /// Adds a new proposal and its corresponding reply channel to the merger. + /// + /// # Parameters + /// - `proposal`: The proposal data to be added. + /// - `chan`: The reply channel for the proposal. #[inline] pub fn add(&mut self, proposal: Vec, chan: Sender) { self.proposals.push(proposal); self.chans.push((chan, Instant::now())); } + /// Returns the number of proposals currently held by the merger. + /// + /// # Returns + /// The number of proposals. #[inline] pub fn len(&self) -> usize { self.proposals.len() } + /// Retrieves a batch of proposals and their corresponding reply channels if the batch size or timeout criteria are met. + /// + /// # Returns + /// An `Option` containing the proposals and reply channels, or `None` if no batch is ready. #[inline] pub fn take(&mut self) -> Option<(Proposals, ReplyChan)> { let max = self.proposal_batch_size; diff --git a/src/raft.rs b/src/raft.rs index d27260e..968a7c9 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -64,6 +64,9 @@ pub struct Mailbox { } impl Mailbox { + /// Retrieves a list of peers with their IDs. + /// This method returns a vector containing tuples of peer IDs and their respective `Peer` objects. + /// It iterates over the internal `peers` map and collects the IDs and cloned `Peer` instances. #[inline] pub fn pears(&self) -> Vec<(u64, Peer)> { self.peers @@ -107,6 +110,10 @@ impl Mailbox { proposal_sender.send().await } + /// Sends a proposal to the leader node. + /// This method first attempts to send the proposal to the local node if it is the leader. + /// If the node is not the leader, it retrieves the leader's address and sends the proposal to the leader node. + /// If the proposal is successfully handled, the method returns a `RaftResponse::Response` with the resulting data. #[inline] pub async fn send_proposal(&self, message: Vec) -> Result> { match self.get_leader_info().await? { @@ -167,12 +174,16 @@ impl Mailbox { Err(Error::LeaderNotExist) } + /// Deprecated method to send a message, internally calls `send_proposal`. #[inline] #[deprecated] pub async fn send(&self, message: Vec) -> Result> { self.send_proposal(message).await } + /// Sends a query to the Raft node and returns the response data. + /// It sends a `Message::Query` containing the query bytes and waits for a response. + /// On success, it returns the data wrapped in `RaftResponse::Response`. #[inline] pub async fn query(&self, query: Vec) -> Result> { let (tx, rx) = oneshot::channel(); @@ -187,6 +198,8 @@ impl Mailbox { } } + /// Sends a request to leave the Raft cluster. + /// It initiates a `ConfigChange` to remove the node from the cluster and waits for a response. #[inline] pub async fn leave(&self) -> Result<()> { let mut change = ConfChange::default(); @@ -205,6 +218,8 @@ impl Mailbox { } } + /// Retrieves the current status of the Raft node. + /// Sends a `Message::Status` request and waits for a `RaftResponse::Status` reply, which contains the node's status. #[inline] pub async fn status(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -219,6 +234,8 @@ impl Mailbox { } } + /// Retrieves leader information, including whether the current node is the leader, the leader ID, and its address. + /// This method sends a `Message::RequestId` and waits for a response with the leader's ID and address. #[inline] async fn get_leader_info(&self) -> Result<(bool, u64, Option)> { let (tx, rx) = oneshot::channel(); @@ -248,7 +265,8 @@ pub struct Raft { } impl Raft { - /// creates a new node with the given address and store. + /// Creates a new Raft node with the provided address, store, logger, and configuration. + /// The node communicates with other peers using a mailbox. pub fn new( laddr: A, store: S, @@ -271,7 +289,7 @@ impl Raft { }) } - /// gets the node's `Mailbox`. + /// Returns a `Mailbox` for the Raft node, which facilitates communication with peers. pub fn mailbox(&self) -> Mailbox { Mailbox { peers: Arc::new(DashMap::default()), @@ -284,7 +302,8 @@ impl Raft { } } - /// find leader id and leader address + /// Finds leader information by querying a list of peer addresses. + /// Returns the leader ID and its address if found. pub async fn find_leader_info(&self, peer_addrs: Vec) -> Result> { let mut futs = Vec::new(); for addr in peer_addrs { @@ -311,6 +330,8 @@ impl Raft { } } + /// Requests the leader information from a specific peer. + /// Sends a `Message::RequestId` to the peer and waits for the response. async fn request_leader(&self, peer_addr: String) -> Result> { let (leader_id, leader_addr): (u64, String) = { let mut client = connect( @@ -340,8 +361,18 @@ impl Raft { Ok(Some((leader_id, leader_addr))) } - /// Create a new leader for the cluster, with id 1. There has to be exactly one node in the - /// cluster that is initialised that way + /// The `lead` function transitions the current node to the leader role in a Raft cluster. + /// It initializes the leader node and runs both the Raft server and the node concurrently. + /// The function will return once the server or node experiences an error, or when the leader + /// role is relinquished. + /// + /// # Arguments + /// + /// * `node_id` - The unique identifier for the node. + /// + /// # Returns + /// + /// A `Result<()>` indicating success or failure during the process. pub async fn lead(self, node_id: u64) -> Result<()> { let node = RaftNode::new_leader( self.rx, @@ -376,6 +407,20 @@ impl Raft { Ok(()) } + /// The `join` function is used to make the current node join an existing Raft cluster. + /// It tries to discover the current leader, communicates with the leader to join the cluster, + /// and configures the node as a follower. + /// + /// # Arguments + /// + /// * `node_id` - The unique identifier for the current node. + /// * `node_addr` - The address of the current node. + /// * `leader_id` - The optional leader node's identifier (if already known). + /// * `leader_addr` - The address of the leader node. + /// + /// # Returns + /// + /// A `Result<()>` indicating success or failure during the joining process. pub async fn join( self, node_id: u64, @@ -429,7 +474,7 @@ impl Raft { info!( "change_remove raft_response: {:?}", - deserialize(&raft_response.inner)? + deserialize::(&raft_response.inner)? ); // 3. Join the cluster diff --git a/src/raft_node.rs b/src/raft_node.rs index 67b96ff..310e029 100644 --- a/src/raft_node.rs +++ b/src/raft_node.rs @@ -157,6 +157,22 @@ pub struct Peer { } impl Peer { + /// Creates a new `Peer` instance with the specified parameters. + /// + /// # Parameters + /// - `addr`: The address of the peer to connect to. + /// - `crw_timeout`: The timeout duration for connection and read/write operations. + /// - `concurrency_limit`: The maximum number of concurrent gRPC requests allowed. + /// - `grpc_message_size`: The maximum size of a gRPC message. + /// - `grpc_breaker_threshold`: The threshold for the number of gRPC failures before breaking the circuit. + /// - `grpc_breaker_retry_interval`: The time interval for retrying after the circuit breaker is tripped. + /// + /// # Returns + /// - A new `Peer` instance with the provided configuration. + /// + /// # Behavior + /// - Initializes internal state, including counters and timeouts. + /// - Logs the connection attempt to the specified address. pub fn new( addr: String, crw_timeout: Duration, @@ -180,16 +196,39 @@ impl Peer { } } + /// Returns the number of currently active tasks associated with this peer. + /// + /// # Returns + /// - The number of active tasks as an `i64`. + /// + /// # Behavior + /// - Reads the value of the `active_tasks` counter. #[inline] pub fn active_tasks(&self) -> i64 { self.active_tasks.load(Ordering::SeqCst) } + /// Returns the number of gRPC failures encountered by this peer. + /// + /// # Returns + /// - The number of gRPC failures as a `u64`. + /// + /// # Behavior + /// - Reads the value of the `grpc_fails` counter. #[inline] pub fn grpc_fails(&self) -> u64 { self.grpc_fails.load(Ordering::SeqCst) } + /// Connects to the peer if not already connected, and returns the gRPC client. + /// + /// # Returns + /// - `Ok(RaftGrpcClient)`: On successful connection, returns the gRPC client. + /// - `Err(Error)`: On failure, returns an error. + /// + /// # Behavior + /// - Checks if the gRPC client is already connected and available. + /// - If not, attempts to establish a new connection and store the client. #[inline] async fn connect(&self) -> Result { if let Some(c) = self.client.read().await.as_ref() { @@ -212,12 +251,32 @@ impl Peer { Ok(c) } + /// Retrieves the gRPC client by establishing a connection if needed. + /// + /// # Returns + /// - `Ok(RaftGrpcClient)`: On successful connection, returns the gRPC client. + /// - `Err(Error)`: On failure, returns an error. + /// + /// # Behavior + /// - Calls `connect` to ensure the client is connected and available. #[inline] pub async fn client(&self) -> Result { self.connect().await } - ///Raft Message + /// Sends a Raft message to the peer and waits for a response. + /// + /// # Parameters + /// - `msg`: The Raft message to be sent. + /// + /// # Returns + /// - `Ok(Vec)`: On successful message send, returns the response data as a byte vector. + /// - `Err(Error)`: On failure, returns an error. + /// + /// # Behavior + /// - Checks if the peer is available for sending messages. + /// - Encodes the message and sends it using the `_send_message` method. + /// - Updates the active task count and records success or failure. #[inline] pub async fn send_message(&self, msg: &RaftMessage) -> Result> { if !self.available() { @@ -257,6 +316,19 @@ impl Peer { Ok(result) } + /// Sends a Raft proposal to the peer and waits for a response. + /// + /// # Parameters + /// - `msg`: The Raft proposal to be sent as a byte vector. + /// + /// # Returns + /// - `Ok(Vec)`: On successful proposal send, returns the response data as a byte vector. + /// - `Err(Error)`: On failure, returns an error. + /// + /// # Behavior + /// - Checks if the peer is available for sending proposals. + /// - Wraps the proposal in a `RraftProposal` and sends it using the `_send_proposal` method. + /// - Updates the active task count and records success or failure. #[inline] pub async fn send_proposal(&self, msg: Vec) -> Result> { if !self.available() { @@ -296,24 +368,19 @@ impl Peer { } #[inline] - pub fn _addr(&self) -> &str { - &self.addr - } - - #[inline] - pub fn record_failure(&self) { + fn record_failure(&self) { self.grpc_fails.fetch_add(1, Ordering::SeqCst); self.grpc_fail_time .store(chrono::Local::now().timestamp_millis(), Ordering::SeqCst); } #[inline] - pub fn record_success(&self) { + fn record_success(&self) { self.grpc_fails.store(0, Ordering::SeqCst); } #[inline] - pub fn available(&self) -> bool { + fn available(&self) -> bool { self.grpc_fails.load(Ordering::SeqCst) < self.grpc_breaker_threshold || (chrono::Local::now().timestamp_millis() - self.grpc_fail_time.load(Ordering::SeqCst)) @@ -336,6 +403,21 @@ pub struct RaftNode { } impl RaftNode { + /// Creates a new leader node for the Raft cluster. + /// + /// This function initializes a new `RaftNode` instance as a leader. It sets up the Raft configuration, + /// applies a default snapshot to initialize the state, and sets the node to be a leader. + /// + /// # Parameters + /// - `rcv`: A receiver for Raft messages. This will be used to receive incoming messages. + /// - `snd`: A sender for Raft messages. This will be used to send outgoing messages. + /// - `id`: The unique identifier for this Raft node. + /// - `store`: The store implementation used for persisting Raft state. + /// - `logger`: A logger instance for logging messages related to the Raft node. + /// - `cfg`: Configuration for the Raft node, including various timeouts and limits. + /// + /// # Returns + /// Returns a `Result` containing either the newly created `RaftNode` or an error if the creation failed. pub fn new_leader( rcv: mpsc::Receiver, snd: mpsc::Sender, @@ -385,6 +467,21 @@ impl RaftNode { Ok(node) } + /// Creates a new follower node for the Raft cluster. + /// + /// This function initializes a new `RaftNode` instance as a follower. It sets up the Raft configuration + /// and creates a new `RawNode` instance in follower mode. + /// + /// # Parameters + /// - `rcv`: A receiver for Raft messages. This will be used to receive incoming messages. + /// - `snd`: A sender for Raft messages. This will be used to send outgoing messages. + /// - `id`: The unique identifier for this Raft node. + /// - `store`: The store implementation used for persisting Raft state. + /// - `logger`: A logger instance for logging messages related to the Raft node. + /// - `cfg`: Configuration for the Raft node, including various timeouts and limits. + /// + /// # Returns + /// Returns a `Result` containing either the newly created `RaftNode` or an error if the creation failed. pub fn new_follower( rcv: mpsc::Receiver, snd: mpsc::Sender, @@ -420,6 +517,16 @@ impl RaftNode { }) } + /// Creates a new Raft configuration with the specified node ID. + /// + /// This function clones the provided configuration and sets the node ID. + /// + /// # Parameters + /// - `id`: The unique identifier for the Raft node. + /// - `cfg`: The base Raft configuration to clone and modify. + /// + /// # Returns + /// Returns a `RaftConfig` with the updated node ID. #[inline] fn new_config(id: u64, cfg: &RaftConfig) -> RaftConfig { let mut cfg = cfg.clone(); @@ -427,6 +534,15 @@ impl RaftNode { cfg } + /// Retrieves a peer by its ID. + /// + /// This function looks up a peer in the `peers` map by its ID. + /// + /// # Parameters + /// - `id`: The ID of the peer to retrieve. + /// + /// # Returns + /// Returns an `Option`. If the peer is found, it is returned; otherwise, `None` is returned. #[inline] pub fn peer(&self, id: u64) -> Option { match self.peers.get(&id) { @@ -435,16 +551,39 @@ impl RaftNode { } } + /// Checks if the current node is the leader. + /// + /// This function compares the leader ID of the Raft instance with the current node's ID. + /// + /// # Returns + /// Returns `true` if the current node is the leader, otherwise `false`. #[inline] pub fn is_leader(&self) -> bool { self.inner.raft.leader_id == self.inner.raft.id } + /// Retrieves the ID of the current node. + /// + /// This function returns the unique identifier of the current Raft node. + /// + /// # Returns + /// Returns the node's ID as a `u64`. #[inline] pub fn id(&self) -> u64 { self.raft.id } + /// Adds a new peer to the `peers` map. + /// + /// This function creates a new `Peer` instance with the specified address and configuration, + /// and adds it to the `peers` map. + /// + /// # Parameters + /// - `addr`: The address of the new peer. + /// - `id`: The unique identifier for the new peer. + /// + /// # Returns + /// Returns the newly created `Peer` instance. #[inline] pub fn add_peer(&mut self, addr: &str, id: u64) -> Peer { let peer = Peer::new( @@ -590,7 +729,7 @@ impl RaftNode { } } - pub async fn run(mut self) -> Result<()> { + pub(crate) async fn run(mut self) -> Result<()> { let mut heartbeat = self.cfg.heartbeat; let mut now = Instant::now(); let mut snapshot_received = self.is_leader(); diff --git a/src/raft_server.rs b/src/raft_server.rs index d30a316..c7212c3 100644 --- a/src/raft_server.rs +++ b/src/raft_server.rs @@ -19,6 +19,7 @@ use crate::raft_service::{ }; use crate::{error, Config}; +/// A gRPC server that handles Raft-related requests. pub struct RaftServer { snd: mpsc::Sender, laddr: SocketAddr, @@ -27,6 +28,17 @@ pub struct RaftServer { } impl RaftServer { + /// Creates a new instance of `RaftServer`. + /// + /// This function initializes a new `RaftServer` with the specified parameters. + /// + /// # Parameters + /// - `snd`: A sender for Raft messages. + /// - `laddr`: The local address where the server will listen for incoming requests. + /// - `cfg`: Configuration for the server, including gRPC timeouts and other settings. + /// + /// # Returns + /// Returns a new `RaftServer` instance. pub fn new(snd: mpsc::Sender, laddr: SocketAddr, cfg: Arc) -> Self { RaftServer { snd, @@ -36,6 +48,13 @@ impl RaftServer { } } + /// Starts the gRPC server to handle Raft requests. + /// + /// This function sets up the gRPC server and listens for incoming requests. It uses + /// the `RaftServiceServer` to handle requests and manage configuration options. + /// + /// # Returns + /// Returns a `Result` indicating whether the server started successfully or if an error occurred. pub async fn run(self) -> error::Result<()> { let laddr = self.laddr; let _cfg = self.cfg.clone(); @@ -66,6 +85,16 @@ impl RaftServer { #[tonic::async_trait] impl RaftService for RaftServer { + /// Handles requests for a new Raft node ID. + /// + /// This method sends a `RequestId` message to the Raft node and waits for a response. + /// It returns the node ID if successful or an error status if not. + /// + /// # Parameters + /// - `req`: The incoming request containing no additional data. + /// + /// # Returns + /// Returns a `Response` containing the node ID or an error status. async fn request_id( &self, _: Request, @@ -99,6 +128,16 @@ impl RaftService for RaftServer { } } + /// Handles configuration change requests. + /// + /// This method processes a configuration change request by sending it to the Raft node + /// and waits for a response. It returns the result of the configuration change operation. + /// + /// # Parameters + /// - `req`: The incoming request containing the configuration change data. + /// + /// # Returns + /// Returns a `Response` containing the result of the configuration change or an error status. async fn change_config( &self, req: Request, @@ -134,13 +173,23 @@ impl RaftService for RaftServer { Ok(Response::new(reply)) } + /// Handles sending Raft messages. + /// + /// This method processes a Raft message by sending it to the Raft node and returns + /// the result of the send operation. + /// + /// # Parameters + /// - `request`: The incoming request containing the Raft message data. + /// + /// # Returns + /// Returns a `Response` indicating success or an error status. async fn send_message( &self, request: Request, ) -> Result, Status> { let message = RaftMessage::decode(request.into_inner().inner.as_ref()) .map_err(|e| Status::invalid_argument(e.to_string()))?; - let reply = match self.snd.clone().try_send(Message::Raft(Box::new(message))) { + match self.snd.clone().try_send(Message::Raft(Box::new(message))) { Ok(()) => { let response = RaftResponse::Ok; Ok(Response::new(raft_service::RaftResponse { @@ -148,10 +197,19 @@ impl RaftService for RaftServer { })) } Err(_) => Err(Status::unavailable("error for try send message")), - }; - reply + } } + /// Handles sending proposals. + /// + /// This method sends a proposal to the Raft node and waits for a response. It returns + /// the result of the proposal send operation. + /// + /// # Parameters + /// - `req`: The incoming request containing the proposal data. + /// + /// # Returns + /// Returns a `Response` containing the result of the proposal send operation or an error status. async fn send_proposal( &self, req: Request, @@ -161,7 +219,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); let message = Message::Propose { proposal, chan: tx }; - let reply = match sender.try_send(message) { + match sender.try_send(message) { Ok(()) => match timeout(self.timeout, rx).await { Ok(Ok(raft_response)) => match serialize(&raft_response) { Ok(resp) => Ok(Response::new(raft_service::RaftResponse { inner: resp })), @@ -183,10 +241,19 @@ impl RaftService for RaftServer { warn!("error for try send message, {}", e); Err(Status::unavailable("error for try send message")) } - }; - reply + } } + /// Handles sending queries. + /// + /// This method sends a query to the Raft node and waits for a response. It returns + /// the result of the query send operation. + /// + /// # Parameters + /// - `req`: The incoming request containing the query data. + /// + /// # Returns + /// Returns a `Response` containing the result of the query send operation or an error status. async fn send_query( &self, req: Request, diff --git a/src/raft_service.rs b/src/raft_service.rs index 1e96e3c..ef07627 100644 --- a/src/raft_service.rs +++ b/src/raft_service.rs @@ -10,6 +10,20 @@ tonic::include_proto!("raftservice"); pub(crate) type RaftServiceClientType = RaftServiceClient; +/// Creates a gRPC `Endpoint` for connecting to a Raft service. +/// +/// This function constructs a gRPC `Endpoint` configured with the specified address, concurrency +/// limit, and timeout settings. The `Endpoint` is used to establish a connection to the Raft +/// service. +/// +/// # Parameters +/// - `saddr`: The server address in the form of a string (e.g., "127.0.0.1:50051"). +/// - `concurrency_limit`: The maximum number of concurrent requests allowed. +/// - `timeout`: The connection timeout duration. +/// +/// # Returns +/// Returns a `Result` containing the configured `Endpoint` on success, or an error if the endpoint +/// creation fails. #[inline] pub(crate) fn endpoint( saddr: &str, @@ -27,6 +41,21 @@ pub(crate) fn endpoint( Ok(endpoint) } +/// Establishes a connection to the Raft service and returns a client. +/// +/// This asynchronous function creates a new `RaftServiceClient` instance, using the provided +/// address, concurrency limit, message size, and timeout settings. The client is configured with +/// the specified message size for both encoding and decoding. +/// +/// # Parameters +/// - `saddr`: The server address in the form of a string (e.g., "127.0.0.1:50051"). +/// - `concurrency_limit`: The maximum number of concurrent requests allowed. +/// - `message_size`: The maximum size of messages for encoding and decoding. +/// - `timeout`: The connection timeout duration. +/// +/// # Returns +/// Returns a `Result` containing the `RaftServiceClient` instance on success, or an error if the +/// connection fails. #[inline] pub(crate) async fn connect( saddr: &str, @@ -43,6 +72,20 @@ pub(crate) async fn connect( .max_encoding_message_size(message_size)) } +/// Binds a TCP listener to the specified address and returns a `TcpListenerStream`. +/// +/// This function sets up a TCP listener with options for socket reuse and a backlog queue. It +/// returns a `TcpListenerStream` that can be used to accept incoming connections. This is +/// particularly useful for scenarios requiring high-performance and customizable socket options. +/// +/// # Parameters +/// - `laddr`: The local address to bind in the form of `std::net::SocketAddr`. +/// - `backlog`: The maximum number of pending connections in the backlog queue. +/// - `_reuseaddr`: Whether to enable the `SO_REUSEADDR` option on Unix-like systems. +/// - `_reuseport`: Whether to enable the `SO_REUSEPORT` option on Unix-like systems. +/// +/// # Returns +/// Returns a `Result` containing the `TcpListenerStream` on success, or an error if the binding fails. #[inline] #[cfg(all(feature = "socket2", feature = "tokio-stream"))] pub fn bind( diff --git a/src/storage.rs b/src/storage.rs index 26e6a4e..0192aeb 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -4,22 +4,63 @@ use tikv_raft::GetEntriesContext; use crate::error::Result; +/// A trait defining operations for a log store in a Raft implementation. +/// +/// The `LogStore` trait extends the `Storage` trait with additional methods to manage Raft log entries, +/// hard state, configuration state, and snapshots. Implementations of this trait should support appending +/// log entries, updating the hard state and configuration state, creating and applying snapshots, and +/// compacting the log. +/// +/// # Methods +/// - `append`: Append a list of log entries to the log store. +/// - `set_hard_state`: Set the hard state for the Raft state machine. +/// - `set_hard_state_comit`: Set the commit index in the hard state. +/// - `set_conf_state`: Set the configuration state for the Raft state machine. +/// - `create_snapshot`: Create a snapshot with the given data. +/// - `apply_snapshot`: Apply a snapshot to the log store. +/// - `compact`: Compact the log store up to the given index. pub trait LogStore: Storage { + /// Appends a list of log entries to the log store. fn append(&mut self, entries: &[Entry]) -> Result<()>; + /// Sets the hard state for the Raft state machine. fn set_hard_state(&mut self, hard_state: &HardState) -> Result<()>; + + /// Sets the commit index in the hard state. fn set_hard_state_comit(&mut self, comit: u64) -> Result<()>; + + /// Sets the configuration state for the Raft state machine. fn set_conf_state(&mut self, conf_state: &ConfState) -> Result<()>; + + /// Creates a snapshot with the given data. fn create_snapshot(&mut self, data: prost::bytes::Bytes) -> Result<()>; + + /// Applies a snapshot to the log store. fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<()>; + + /// Compacts the log store up to the given index. fn compact(&mut self, index: u64) -> Result<()>; } +/// An in-memory implementation of the `LogStore` trait using Tikv's `MemStorage`. +/// +/// The `MemStorage` struct provides an in-memory storage backend for Raft logs and state. It uses Tikv's +/// `CoreMemStorage` as the underlying storage engine and includes additional methods for managing snapshots. +/// +/// # Fields +/// - `core`: The underlying `CoreMemStorage` used for log and state management. +/// - `snapshot`: The currently held snapshot. pub struct MemStorage { core: CoreMemStorage, snapshot: Snapshot, } impl MemStorage { + /// Creates a new `MemStorage` instance with default settings. + /// + /// This function initializes `CoreMemStorage` and sets the `snapshot` to its default value. + /// + /// # Returns + /// Returns a new `MemStorage` instance. #[inline] pub fn create() -> Self { let core = CoreMemStorage::default(); @@ -29,6 +70,16 @@ impl MemStorage { } impl LogStore for MemStorage { + /// Appends a list of log entries to the in-memory log store. + /// + /// This method acquires a write lock on the underlying `CoreMemStorage` and appends the provided + /// entries. + /// + /// # Parameters + /// - `entries`: The entries to be appended. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn append(&mut self, entries: &[Entry]) -> Result<()> { let mut store = self.core.wl(); @@ -36,6 +87,15 @@ impl LogStore for MemStorage { Ok(()) } + /// Sets the hard state for the Raft state machine. + /// + /// This method acquires a write lock on the underlying `CoreMemStorage` and updates the hard state. + /// + /// # Parameters + /// - `hard_state`: The new hard state to set. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn set_hard_state(&mut self, hard_state: &HardState) -> Result<()> { let mut store = self.core.wl(); @@ -43,6 +103,16 @@ impl LogStore for MemStorage { Ok(()) } + /// Sets the commit index in the hard state. + /// + /// This method updates the commit index in the hard state by first acquiring a write lock on the + /// underlying `CoreMemStorage`, modifying the commit index, and then setting the updated hard state. + /// + /// # Parameters + /// - `comit`: The commit index to set. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn set_hard_state_comit(&mut self, comit: u64) -> Result<()> { let mut store = self.core.wl(); @@ -52,6 +122,15 @@ impl LogStore for MemStorage { Ok(()) } + /// Sets the configuration state for the Raft state machine. + /// + /// This method acquires a write lock on the underlying `CoreMemStorage` and updates the configuration state. + /// + /// # Parameters + /// - `conf_state`: The new configuration state to set. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn set_conf_state(&mut self, conf_state: &ConfState) -> Result<()> { let mut store = self.core.wl(); @@ -59,6 +138,15 @@ impl LogStore for MemStorage { Ok(()) } + /// Creates a snapshot with the given data. + /// + /// This method initializes a new snapshot with the provided data and stores it in the `snapshot` field. + /// + /// # Parameters + /// - `data`: The data to be included in the snapshot. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn create_snapshot(&mut self, data: prost::bytes::Bytes) -> Result<()> { let mut snapshot = self.core.snapshot(0, 0)?; @@ -67,6 +155,15 @@ impl LogStore for MemStorage { Ok(()) } + /// Applies a snapshot to the in-memory log store. + /// + /// This method acquires a write lock on the underlying `CoreMemStorage` and applies the provided snapshot. + /// + /// # Parameters + /// - `snapshot`: The snapshot to apply. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<()> { let mut store = self.core.wl(); @@ -74,6 +171,15 @@ impl LogStore for MemStorage { Ok(()) } + /// Compacts the log store up to the given index. + /// + /// This method acquires a write lock on the underlying `CoreMemStorage` and compacts the log up to the specified index. + /// + /// # Parameters + /// - `index`: The index up to which to compact the log. + /// + /// # Returns + /// Returns a `Result` indicating success or failure. #[inline] fn compact(&mut self, index: u64) -> Result<()> { let mut store = self.core.wl(); @@ -83,12 +189,31 @@ impl LogStore for MemStorage { } impl Storage for MemStorage { + /// Retrieves the initial state of the Raft state machine. + /// + /// This method returns the initial state from the underlying `CoreMemStorage`. + /// + /// # Returns + /// Returns a `Result` containing the `RaftState` on success. #[inline] fn initial_state(&self) -> tikv_raft::Result { let raft_state = self.core.initial_state()?; Ok(raft_state) } + /// Retrieves a range of log entries. + /// + /// This method acquires a read lock on the underlying `CoreMemStorage` and returns log entries + /// in the specified range. + /// + /// # Parameters + /// - `low`: The start index of the range (inclusive). + /// - `high`: The end index of the range (exclusive). + /// - `max_size`: The maximum size of the entries to return (optional). + /// - `context`: Additional context for retrieving the entries. + /// + /// # Returns + /// Returns a `Result` containing a vector of `Entry` objects on success. #[inline] fn entries( &self, @@ -101,21 +226,52 @@ impl Storage for MemStorage { Ok(entries) } + /// Retrieves the term of the log entry at the specified index. + /// + /// This method returns the term of the log entry from the underlying `CoreMemStorage`. + /// + /// # Parameters + /// - `idx`: The index of the log entry. + /// + /// # Returns + /// Returns a `Result` containing the term of the entry on success. #[inline] fn term(&self, idx: u64) -> tikv_raft::Result { self.core.term(idx) } + /// Retrieves the first index of the log. + /// + /// This method returns the first index from the underlying `CoreMemStorage`. + /// + /// # Returns + /// Returns a `Result` containing the first index on success. #[inline] fn first_index(&self) -> tikv_raft::Result { self.core.first_index() } + /// Retrieves the last index of the log. + /// + /// This method returns the last index from the underlying `CoreMemStorage`. + /// + /// # Returns + /// Returns a `Result` containing the last index on success. #[inline] fn last_index(&self) -> tikv_raft::Result { self.core.last_index() } + /// Retrieves the current snapshot. + /// + /// This method returns a clone of the current snapshot held in the `snapshot` field. + /// + /// # Parameters + /// - `_request_index`: The index for the snapshot request (not used in this implementation). + /// - `_to`: The index up to which the snapshot is requested (not used in this implementation). + /// + /// # Returns + /// Returns a `Result` containing the current `Snapshot` on success. #[inline] fn snapshot(&self, _request_index: u64, _to: u64) -> tikv_raft::Result { Ok(self.snapshot.clone())