Skip to content

Commit

Permalink
Optimize: Peer
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt-rs committed May 29, 2022
1 parent af3e525 commit 6cb4932
Showing 1 changed file with 51 additions and 137 deletions.
188 changes: 51 additions & 137 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use std::collections::HashMap;
use parking_lot::RwLock;
use std::collections::vec_deque::VecDeque;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use bincode::{deserialize, serialize};
use log::*;
use raft::{Config, prelude::*, raw_node::RawNode};
use raft::eraftpb::{ConfChange, ConfChangeType, Entry, EntryType, Message as RaftMessage};
use raft::{prelude::*, raw_node::RawNode, Config};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tonic::Request;
use tonic::transport::{Channel, Endpoint};
use tonic::Request;

use crate::error::{Error, Result};
use crate::message::{Message, RaftResponse, Status};
use crate::raft::Store;
use crate::raft_service::{Message as RiteraftMessage, Query};
use crate::raft_service::raft_service_client::RaftServiceClient;
use crate::raft_service::{Message as RiteraftMessage, Query};
use crate::storage::{LogStore, MemStorage};

pub type RaftGrpcClient = RaftServiceClient<tonic::transport::channel::Channel>;
Expand Down Expand Up @@ -79,56 +81,6 @@ impl MessageSender {
}
}

// struct ProposalSender {
// proposal: Vec<u8>,
// client: RaftGrpcClient,
// chan: oneshot::Sender<RaftResponse>,
// max_retries: usize,
// timeout: Duration,
// }
//
// impl ProposalSender {
// async fn send(mut self) {
// let mut current_retry = 0usize;
// loop {
// let message_request = Request::new(Proposal {
// inner: self.proposal.clone(),
// });
// match self.client.forward(message_request).await {
// Ok(grpc_response) => {
// let raft_response =
// deserialize(&grpc_response.into_inner().inner).expect("deserialize error");
// if let Err(e) = self.chan.send(raft_response) {
// error!(
// "error sending proposal after {} retries: {:?}",
// self.max_retries, e
// );
// }
// return;
// }
// Err(e) => {
// if current_retry < self.max_retries {
// current_retry += 1;
// tokio::time::sleep(self.timeout).await;
// } else {
// error!(
// "error sending proposal after {} retries: {}",
// self.max_retries, e
// );
// if let Err(e) = self.chan.send(RaftResponse::Error) {
// error!(
// "error sending proposal after {} retries: {:?}",
// self.max_retries, e
// );
// }
// return;
// }
// }
// }
// }
// }
// }

struct QuerySender {
query: Vec<u8>,
client: RaftGrpcClient,
Expand Down Expand Up @@ -179,30 +131,33 @@ impl QuerySender {
}
}

#[derive(Clone)]
pub struct Peer {
addr: String,
endpoint: Endpoint,
client: Option<RaftServiceClient<Channel>>,
client: Arc<RwLock<Option<RaftServiceClient<Channel>>>>,
}

impl Peer {
pub fn new(addr: String) -> Result<Peer> {
pub fn new(addr: String) -> Peer {
debug!("connecting to node at {}...", addr);
Peer {
addr,
client: Arc::new(RwLock::new(None)),
}
}

#[inline]
fn _endpoint(&self) -> Result<Endpoint> {
let concurrency_limit = 10;
let client_timeout = Duration::from_secs(8);
let endpoint = Channel::from_shared(format!("http://{}", addr))
let endpoint = Channel::from_shared(format!("http://{}", self.addr))
.map(|endpoint| {
endpoint
.concurrency_limit(concurrency_limit)
.timeout(client_timeout)
})
.map_err(|e| Error::Other(Box::new(e)))?;
//let addr = addr.to_string();
Ok(Peer {
addr,
endpoint,
client: None,
})
Ok(endpoint)
}

#[inline]
Expand All @@ -216,17 +171,18 @@ impl Peer {
}

#[inline]
async fn connect(&mut self) -> Result<RaftServiceClient<Channel>> {
if let Some(c) = self.client.as_ref() {
async fn connect(&self) -> Result<RaftServiceClient<Channel>> {
if let Some(c) = self.client.read().as_ref() {
return Ok(c.clone());
}
let c = Self::_connect(&self.endpoint).await?;
self.client.replace(c.clone());
let endpoint = self._endpoint()?;
let c = Self::_connect(&endpoint).await?;
self.client.write().replace(c.clone());
Ok(c)
}

#[inline]
pub async fn client(&mut self) -> Result<RaftServiceClient<Channel>> {
pub async fn client(&self) -> Result<RaftServiceClient<Channel>> {
self.connect().await
}

Expand Down Expand Up @@ -329,14 +285,11 @@ impl<S: Store + 'static> RaftNode<S> {

tokio::spawn(async move {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
type Queues = HashMap<u64, (Arc<AtomicBool>, VecDeque<MessageSender>)>;
let mut queues: Queues = HashMap::new();

let sends = |queues: &mut Queues| {
//log::info!("queues.len: {}", queues.len());
for (to, (sending, q)) in queues.iter_mut() {
//log::info!("to: {}, sending: {}, q.len: {}", to, sending.load(Ordering::SeqCst), q.len());
if sending.load(Ordering::SeqCst) {
continue;
}
Expand Down Expand Up @@ -394,11 +347,19 @@ impl<S: Store + 'static> RaftNode<S> {
}
}

// #[inline]
// pub fn peer_mut(&mut self, id: u64) -> Option<&mut Peer> {
// match self.peers.get_mut(&id) {
// None => None,
// Some(v) => v.as_mut(),
// }
// }

#[inline]
pub fn peer_mut(&mut self, id: u64) -> Option<&mut Peer> {
match self.peers.get_mut(&id) {
None => None,
Some(v) => v.as_mut(),
pub fn peer(&self, id: u64) -> Option<Peer> {
match self.peers.get(&id) {
Some(Some(p)) => Some(p.clone()),
_ => None,
}
}

Expand All @@ -413,10 +374,10 @@ impl<S: Store + 'static> RaftNode<S> {
}

#[inline]
pub async fn add_peer(&mut self, addr: &str, id: u64) -> Result<()> {
let peer = Peer::new(addr.to_string())?;
self.peers.insert(id, Some(peer));
Ok(())
pub fn add_peer(&mut self, addr: &str, id: u64) -> Peer {
let peer = Peer::new(addr.to_string());
self.peers.insert(id, Some(peer.clone()));
peer
}

#[inline]
Expand Down Expand Up @@ -450,56 +411,11 @@ impl<S: Store + 'static> RaftNode<S> {
}
}

// reserve a slot to insert node on next node addition commit
// fn _reserve_next_peer_id(&mut self) -> u64 {
// let next_id = self.peers.keys().max().cloned().unwrap_or(1);
// // if assigned id is ourself, return next one
// let next_id = std::cmp::max(next_id + 1, self.id());
// self.peers.insert(next_id, None);
// debug!("reserving id {}", next_id);
// next_id
// }

// forward proposal to leader
// async fn forward_proposal(&mut self, proposal: Vec<u8>, chan: oneshot::Sender<RaftResponse>) {
// let id = self.leader();
// let leader_client = match self.peer_mut(id) {
// Some(peer) => peer.client().await,
// None => {
// let _ = chan.send(RaftResponse::WrongLeader {
// leader_id: id,
// leader_addr: None,
// });
// return;
// }
// };
//
// let leader_client = match leader_client {
// Ok(leader_client) => leader_client,
// Err(_e) => {
// let _ = chan.send(RaftResponse::WrongLeader {
// leader_id: id,
// leader_addr: None,
// });
// return;
// }
// };
//
// let proposal_sender = ProposalSender {
// proposal,
// client: leader_client,
// chan,
// timeout: Duration::from_millis(300),
// max_retries: 5,
// };
// tokio::spawn(proposal_sender.send());
// }

// forward query request to leader
#[inline]
async fn forward_query(&mut self, query: Vec<u8>, chan: oneshot::Sender<RaftResponse>) {
async fn forward_query(&self, query: Vec<u8>, chan: oneshot::Sender<RaftResponse>) {
let id = self.leader();
let leader_client = match self.peer_mut(id) {
let leader_client = match self.peer(id) {
Some(peer) => peer.client().await,
None => {
if let Err(e) = chan.send(RaftResponse::WrongLeader {
Expand Down Expand Up @@ -542,7 +458,7 @@ impl<S: Store + 'static> RaftNode<S> {
}

#[inline]
fn send_wrong_leader(&self, channel: oneshot::Sender<RaftResponse>) {
fn send_wrong_leader(&self, chan: oneshot::Sender<RaftResponse>) {
let leader_id = self.leader();
// leader can't be an empty node
let leader_addr = self
Expand All @@ -554,7 +470,7 @@ impl<S: Store + 'static> RaftNode<S> {
leader_addr,
};
// TODO handle error here
if let Err(e) = channel.send(raft_response) {
if let Err(e) = chan.send(raft_response) {
error!("send_wrong_leader, RaftResponse send error: {:?}", e);
}
}
Expand Down Expand Up @@ -625,12 +541,10 @@ impl<S: Store + 'static> RaftNode<S> {
}
Ok(Some(Message::Propose { proposal, chan })) => {
if !self.is_leader() {
debug!("Message::Propose, forward_proposal {:?}", proposal);
//self.forward_proposal(proposal, chan).await;
debug!("Message::Propose, send_wrong_leader {:?}", proposal);
self.send_wrong_leader(chan);
} else {
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
debug!("Message::Propose, {:?}, seq: {}", proposal, seq);
client_send.insert(seq, chan);
let seq = serialize(&seq).unwrap();
debug!(
Expand All @@ -644,7 +558,7 @@ impl<S: Store + 'static> RaftNode<S> {

Ok(Some(Message::Query { query, chan })) => {
if !self.is_leader() {
info!("[forward_query] query.len: {:?}", query.len());
debug!("[forward_query] query.len: {:?}", query.len());
self.forward_query(query, chan).await;
} else {
debug!("Message::Query, {:?}", query);
Expand Down Expand Up @@ -721,7 +635,7 @@ impl<S: Store + 'static> RaftNode<S> {

for message in ready.messages.drain(..) {
//for message in ready.take_messages() {
let client = match self.peer_mut(message.get_to()) {
let client = match self.peer(message.get_to()) {
Some(peer) => peer.client().await,
None => continue,
};
Expand All @@ -747,14 +661,14 @@ impl<S: Store + 'static> RaftNode<S> {
}
// tokio::spawn(message_sender.send());
}
// log::info!("5 on_ready");

if !ready.snapshot().is_empty() {
let snapshot = ready.snapshot();
self.store.restore(snapshot.get_data()).await?;
let store = self.mut_store();
store.apply_snapshot(snapshot.clone())?;
}
// log::info!("6 on_ready");

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
let store = self.mut_store();
Expand Down Expand Up @@ -811,7 +725,7 @@ impl<S: Store + 'static> RaftNode<S> {
ConfChangeType::AddNode => {
let addr: String = deserialize(change.get_context())?;
info!("adding {} ({}) to peers", addr, id);
self.add_peer(&addr, id).await?;
self.add_peer(&addr, id);
}
ConfChangeType::RemoveNode => {
if change.get_node_id() == self.id() {
Expand Down

0 comments on commit 6cb4932

Please sign in to comment.