From 6cb4932c803cb0fe4735bc15c54333aa7f86b12c Mon Sep 17 00:00:00 2001 From: rmqtt-rs Date: Sun, 29 May 2022 21:24:45 +0800 Subject: [PATCH] Optimize: Peer --- src/raft_node.rs | 188 +++++++++++++---------------------------------- 1 file changed, 51 insertions(+), 137 deletions(-) diff --git a/src/raft_node.rs b/src/raft_node.rs index 1fc9596..7e1e85c 100644 --- a/src/raft_node.rs +++ b/src/raft_node.rs @@ -1,24 +1,26 @@ -use std::collections::HashMap; +use parking_lot::RwLock; use std::collections::vec_deque::VecDeque; +use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::time::{Duration, Instant}; use bincode::{deserialize, serialize}; use log::*; -use raft::{Config, prelude::*, raw_node::RawNode}; use raft::eraftpb::{ConfChange, ConfChangeType, Entry, EntryType, Message as RaftMessage}; +use raft::{prelude::*, raw_node::RawNode, Config}; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::time::timeout; -use tonic::Request; use tonic::transport::{Channel, Endpoint}; +use tonic::Request; use crate::error::{Error, Result}; use crate::message::{Message, RaftResponse, Status}; use crate::raft::Store; -use crate::raft_service::{Message as RiteraftMessage, Query}; use crate::raft_service::raft_service_client::RaftServiceClient; +use crate::raft_service::{Message as RiteraftMessage, Query}; use crate::storage::{LogStore, MemStorage}; pub type RaftGrpcClient = RaftServiceClient; @@ -79,56 +81,6 @@ impl MessageSender { } } -// struct ProposalSender { -// proposal: Vec, -// client: RaftGrpcClient, -// chan: oneshot::Sender, -// max_retries: usize, -// timeout: Duration, -// } -// -// impl ProposalSender { -// async fn send(mut self) { -// let mut current_retry = 0usize; -// loop { -// let message_request = Request::new(Proposal { -// inner: self.proposal.clone(), -// }); -// match self.client.forward(message_request).await { -// Ok(grpc_response) => { -// let raft_response = -// deserialize(&grpc_response.into_inner().inner).expect("deserialize error"); -// if let Err(e) = self.chan.send(raft_response) { -// error!( -// "error sending proposal after {} retries: {:?}", -// self.max_retries, e -// ); -// } -// return; -// } -// Err(e) => { -// if current_retry < self.max_retries { -// current_retry += 1; -// tokio::time::sleep(self.timeout).await; -// } else { -// error!( -// "error sending proposal after {} retries: {}", -// self.max_retries, e -// ); -// if let Err(e) = self.chan.send(RaftResponse::Error) { -// error!( -// "error sending proposal after {} retries: {:?}", -// self.max_retries, e -// ); -// } -// return; -// } -// } -// } -// } -// } -// } - struct QuerySender { query: Vec, client: RaftGrpcClient, @@ -179,30 +131,33 @@ impl QuerySender { } } +#[derive(Clone)] pub struct Peer { addr: String, - endpoint: Endpoint, - client: Option>, + client: Arc>>>, } impl Peer { - pub fn new(addr: String) -> Result { + pub fn new(addr: String) -> Peer { debug!("connecting to node at {}...", addr); + Peer { + addr, + client: Arc::new(RwLock::new(None)), + } + } + + #[inline] + fn _endpoint(&self) -> Result { let concurrency_limit = 10; let client_timeout = Duration::from_secs(8); - let endpoint = Channel::from_shared(format!("http://{}", addr)) + let endpoint = Channel::from_shared(format!("http://{}", self.addr)) .map(|endpoint| { endpoint .concurrency_limit(concurrency_limit) .timeout(client_timeout) }) .map_err(|e| Error::Other(Box::new(e)))?; - //let addr = addr.to_string(); - Ok(Peer { - addr, - endpoint, - client: None, - }) + Ok(endpoint) } #[inline] @@ -216,17 +171,18 @@ impl Peer { } #[inline] - async fn connect(&mut self) -> Result> { - if let Some(c) = self.client.as_ref() { + async fn connect(&self) -> Result> { + if let Some(c) = self.client.read().as_ref() { return Ok(c.clone()); } - let c = Self::_connect(&self.endpoint).await?; - self.client.replace(c.clone()); + let endpoint = self._endpoint()?; + let c = Self::_connect(&endpoint).await?; + self.client.write().replace(c.clone()); Ok(c) } #[inline] - pub async fn client(&mut self) -> Result> { + pub async fn client(&self) -> Result> { self.connect().await } @@ -329,14 +285,11 @@ impl RaftNode { tokio::spawn(async move { use std::sync::atomic::AtomicBool; - use std::sync::Arc; type Queues = HashMap, VecDeque)>; let mut queues: Queues = HashMap::new(); let sends = |queues: &mut Queues| { - //log::info!("queues.len: {}", queues.len()); for (to, (sending, q)) in queues.iter_mut() { - //log::info!("to: {}, sending: {}, q.len: {}", to, sending.load(Ordering::SeqCst), q.len()); if sending.load(Ordering::SeqCst) { continue; } @@ -394,11 +347,19 @@ impl RaftNode { } } + // #[inline] + // pub fn peer_mut(&mut self, id: u64) -> Option<&mut Peer> { + // match self.peers.get_mut(&id) { + // None => None, + // Some(v) => v.as_mut(), + // } + // } + #[inline] - pub fn peer_mut(&mut self, id: u64) -> Option<&mut Peer> { - match self.peers.get_mut(&id) { - None => None, - Some(v) => v.as_mut(), + pub fn peer(&self, id: u64) -> Option { + match self.peers.get(&id) { + Some(Some(p)) => Some(p.clone()), + _ => None, } } @@ -413,10 +374,10 @@ impl RaftNode { } #[inline] - pub async fn add_peer(&mut self, addr: &str, id: u64) -> Result<()> { - let peer = Peer::new(addr.to_string())?; - self.peers.insert(id, Some(peer)); - Ok(()) + pub fn add_peer(&mut self, addr: &str, id: u64) -> Peer { + let peer = Peer::new(addr.to_string()); + self.peers.insert(id, Some(peer.clone())); + peer } #[inline] @@ -450,56 +411,11 @@ impl RaftNode { } } - // reserve a slot to insert node on next node addition commit - // fn _reserve_next_peer_id(&mut self) -> u64 { - // let next_id = self.peers.keys().max().cloned().unwrap_or(1); - // // if assigned id is ourself, return next one - // let next_id = std::cmp::max(next_id + 1, self.id()); - // self.peers.insert(next_id, None); - // debug!("reserving id {}", next_id); - // next_id - // } - - // forward proposal to leader - // async fn forward_proposal(&mut self, proposal: Vec, chan: oneshot::Sender) { - // let id = self.leader(); - // let leader_client = match self.peer_mut(id) { - // Some(peer) => peer.client().await, - // None => { - // let _ = chan.send(RaftResponse::WrongLeader { - // leader_id: id, - // leader_addr: None, - // }); - // return; - // } - // }; - // - // let leader_client = match leader_client { - // Ok(leader_client) => leader_client, - // Err(_e) => { - // let _ = chan.send(RaftResponse::WrongLeader { - // leader_id: id, - // leader_addr: None, - // }); - // return; - // } - // }; - // - // let proposal_sender = ProposalSender { - // proposal, - // client: leader_client, - // chan, - // timeout: Duration::from_millis(300), - // max_retries: 5, - // }; - // tokio::spawn(proposal_sender.send()); - // } - // forward query request to leader #[inline] - async fn forward_query(&mut self, query: Vec, chan: oneshot::Sender) { + async fn forward_query(&self, query: Vec, chan: oneshot::Sender) { let id = self.leader(); - let leader_client = match self.peer_mut(id) { + let leader_client = match self.peer(id) { Some(peer) => peer.client().await, None => { if let Err(e) = chan.send(RaftResponse::WrongLeader { @@ -542,7 +458,7 @@ impl RaftNode { } #[inline] - fn send_wrong_leader(&self, channel: oneshot::Sender) { + fn send_wrong_leader(&self, chan: oneshot::Sender) { let leader_id = self.leader(); // leader can't be an empty node let leader_addr = self @@ -554,7 +470,7 @@ impl RaftNode { leader_addr, }; // TODO handle error here - if let Err(e) = channel.send(raft_response) { + if let Err(e) = chan.send(raft_response) { error!("send_wrong_leader, RaftResponse send error: {:?}", e); } } @@ -625,12 +541,10 @@ impl RaftNode { } Ok(Some(Message::Propose { proposal, chan })) => { if !self.is_leader() { - debug!("Message::Propose, forward_proposal {:?}", proposal); - //self.forward_proposal(proposal, chan).await; + debug!("Message::Propose, send_wrong_leader {:?}", proposal); self.send_wrong_leader(chan); } else { let seq = self.seq.fetch_add(1, Ordering::Relaxed); - debug!("Message::Propose, {:?}, seq: {}", proposal, seq); client_send.insert(seq, chan); let seq = serialize(&seq).unwrap(); debug!( @@ -644,7 +558,7 @@ impl RaftNode { Ok(Some(Message::Query { query, chan })) => { if !self.is_leader() { - info!("[forward_query] query.len: {:?}", query.len()); + debug!("[forward_query] query.len: {:?}", query.len()); self.forward_query(query, chan).await; } else { debug!("Message::Query, {:?}", query); @@ -721,7 +635,7 @@ impl RaftNode { for message in ready.messages.drain(..) { //for message in ready.take_messages() { - let client = match self.peer_mut(message.get_to()) { + let client = match self.peer(message.get_to()) { Some(peer) => peer.client().await, None => continue, }; @@ -747,14 +661,14 @@ impl RaftNode { } // tokio::spawn(message_sender.send()); } - // log::info!("5 on_ready"); + if !ready.snapshot().is_empty() { let snapshot = ready.snapshot(); self.store.restore(snapshot.get_data()).await?; let store = self.mut_store(); store.apply_snapshot(snapshot.clone())?; } - // log::info!("6 on_ready"); + if let Some(hs) = ready.hs() { // Raft HardState changed, and we need to persist it. let store = self.mut_store(); @@ -811,7 +725,7 @@ impl RaftNode { ConfChangeType::AddNode => { let addr: String = deserialize(change.get_context())?; info!("adding {} ({}) to peers", addr, id); - self.add_peer(&addr, id).await?; + self.add_peer(&addr, id); } ConfChangeType::RemoveNode => { if change.get_node_id() == self.id() {