Skip to content

Commit

Permalink
Failure if there are too many unfinished proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed May 29, 2022
1 parent 6cb4932 commit 7dff274
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ impl<S: Store + 'static> RaftNode<S> {
.entry(msg.client_id)
.or_insert((Arc::new(AtomicBool::new(false)), VecDeque::new()));
q.push_back(msg);
if q.len() > 300{ //@TODO configurable
warn!("There is too much backlog of unsent messages, {}", q.len())
}
sends(&mut queues);
}
Ok(None) => {
Expand Down Expand Up @@ -469,12 +472,19 @@ impl<S: Store + 'static> RaftNode<S> {
leader_id,
leader_addr,
};
// TODO handle error here
if let Err(e) = chan.send(raft_response) {
error!("send_wrong_leader, RaftResponse send error: {:?}", e);
}
}

#[inline]
fn send_error(&self, chan: oneshot::Sender<RaftResponse>) {
let raft_response = RaftResponse::Error;
if let Err(e) = chan.send(raft_response) {
error!("send_error, RaftResponse send error: {:?}", e);
}
}

pub async fn run(mut self) -> Result<()> {
let mut heartbeat = Duration::from_millis(100);
let mut now = Instant::now();
Expand Down Expand Up @@ -544,15 +554,23 @@ impl<S: Store + 'static> RaftNode<S> {
debug!("Message::Propose, send_wrong_leader {:?}", proposal);
self.send_wrong_leader(chan);
} else {
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
client_send.insert(seq, chan);
let seq = serialize(&seq).unwrap();
debug!(
"Message::Propose, seq: {:?}, client_send len: {}",
seq,
client_send.len()
);
self.propose(seq, proposal).unwrap();
if client_send.len() > 1000{ //@TODO configurable
warn!(
"Too many proposals, proposal in process: {}",
client_send.len()
);
self.send_error(chan);
}else {
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
client_send.insert(seq, chan);
let seq = serialize(&seq).unwrap();
debug!(
"Message::Propose, seq: {:?}, client_send len: {}",
seq,
client_send.len()
);
self.propose(seq, proposal).unwrap();
}
}
}

Expand Down Expand Up @@ -781,8 +799,8 @@ impl<S: Store + 'static> RaftNode<S> {
);
if let Err(resp) = sender.send(RaftResponse::Response { data }) {
warn!(
"[handle_normal] send RaftResponse error, {:?}, seq:{}",
resp, seq
"[handle_normal] send RaftResponse error, {:?}, seq:{}, senders.len(): {}",
resp, seq, senders.len()
);
}
}
Expand Down

0 comments on commit 7dff274

Please sign in to comment.