From 7dff274533f03bac401a27fed2450f7e7b6454a9 Mon Sep 17 00:00:00 2001 From: rmqtt Date: Sun, 29 May 2022 19:22:33 +0800 Subject: [PATCH] Failure if there are too many unfinished proposals --- src/raft_node.rs | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/src/raft_node.rs b/src/raft_node.rs index 7e1e85c..74fe68d 100644 --- a/src/raft_node.rs +++ b/src/raft_node.rs @@ -319,6 +319,9 @@ impl RaftNode { .entry(msg.client_id) .or_insert((Arc::new(AtomicBool::new(false)), VecDeque::new())); q.push_back(msg); + if q.len() > 300{ //@TODO configurable + warn!("There is too much backlog of unsent messages, {}", q.len()) + } sends(&mut queues); } Ok(None) => { @@ -469,12 +472,19 @@ impl RaftNode { leader_id, leader_addr, }; - // TODO handle error here if let Err(e) = chan.send(raft_response) { error!("send_wrong_leader, RaftResponse send error: {:?}", e); } } + #[inline] + fn send_error(&self, chan: oneshot::Sender) { + let raft_response = RaftResponse::Error; + if let Err(e) = chan.send(raft_response) { + error!("send_error, RaftResponse send error: {:?}", e); + } + } + pub async fn run(mut self) -> Result<()> { let mut heartbeat = Duration::from_millis(100); let mut now = Instant::now(); @@ -544,15 +554,23 @@ impl RaftNode { debug!("Message::Propose, send_wrong_leader {:?}", proposal); self.send_wrong_leader(chan); } else { - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - client_send.insert(seq, chan); - let seq = serialize(&seq).unwrap(); - debug!( - "Message::Propose, seq: {:?}, client_send len: {}", - seq, - client_send.len() - ); - self.propose(seq, proposal).unwrap(); + if client_send.len() > 1000{ //@TODO configurable + warn!( + "Too many proposals, proposal in process: {}", + client_send.len() + ); + self.send_error(chan); + }else { + let seq = self.seq.fetch_add(1, Ordering::Relaxed); + client_send.insert(seq, chan); + let seq = serialize(&seq).unwrap(); + debug!( + "Message::Propose, seq: {:?}, client_send len: {}", + seq, + client_send.len() + ); + self.propose(seq, proposal).unwrap(); + } } } @@ -781,8 +799,8 @@ impl RaftNode { ); if let Err(resp) = sender.send(RaftResponse::Response { data }) { warn!( - "[handle_normal] send RaftResponse error, {:?}, seq:{}", - resp, seq + "[handle_normal] send RaftResponse error, {:?}, seq:{}, senders.len(): {}", + resp, seq, senders.len() ); } }