Skip to content

Commit

Permalink
Modify timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 30, 2022
1 parent 7a54ee4 commit 91009fa
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Mailbox {
let mut sender = self.sender.clone();
sender.try_send(proposal)
.map_err(|e| Error::SendError(e.to_string()))?;
let reply = timeout(Duration::from_secs(3), rx).await; //@TODO configurable
let reply = timeout(Duration::from_secs(6), rx).await; //@TODO configurable
let reply = reply.map_err(|e| Error::RecvError(e.to_string()))?
.map_err(|e| Error::RecvError(e.to_string()))?;
match reply {
Expand Down Expand Up @@ -183,7 +183,7 @@ impl Mailbox {
let (tx, rx) = oneshot::channel();
let mut sender = self.sender.clone();
match sender.try_send(Message::Query { query, chan: tx }) {
Ok(()) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable
Ok(()) => match timeout(Duration::from_secs(6), rx).await { //@TODO configurable
Ok(Ok(RaftResponse::Response { data })) => Ok(data),
Ok(Ok(RaftResponse::Error(e))) => Err(Error::from(e)),
_ => Err(Error::Unknown),
Expand Down Expand Up @@ -215,7 +215,7 @@ impl Mailbox {
let (tx, rx) = oneshot::channel();
let mut sender = self.sender.clone();
match sender.send(Message::Status { chan: tx }).await {
Ok(_) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable
Ok(_) => match timeout(Duration::from_secs(6), rx).await { //@TODO configurable
Ok(Ok(RaftResponse::Status(status))) => Ok(status),
Ok(Ok(RaftResponse::Error(e))) => Err(Error::from(e)),
_ => Err(Error::Unknown),
Expand Down
20 changes: 18 additions & 2 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub struct Peer {
impl Peer {
pub fn new(addr: String) -> Peer {
debug!("connecting to node at {}...", addr);
let crw_timeout = Duration::from_secs(5); //@TODO configurable
let crw_timeout = Duration::from_secs(6); //@TODO configurable
let max_concurrency = 200;
Peer {
addr,
Expand Down Expand Up @@ -726,23 +726,31 @@ impl<S: Store + 'static> RaftNode<S> {
}
}
Ok(Some(Message::Propose { proposal, chan })) => {
let now = Instant::now();
if !self.is_leader() {
debug!("Message::Propose, send_wrong_leader {:?}", proposal);
self.send_wrong_leader(chan);
} else {
merger.add(proposal, chan);
self.take_and_propose(&mut merger);
}
if now.elapsed() > Duration::from_millis(100) {
info!("Message::Propose elapsed: {:?}", now.elapsed());
}
}

Ok(Some(Message::Query { query, chan })) => {
let now = Instant::now();
if !self.is_leader() {
debug!("[forward_query] query.len: {:?}", query.len());
self.forward_query(query, chan).await;
} else {
debug!("Message::Query, {:?}", query);
self.send_query(&query, chan).await;
}
if now.elapsed() > Duration::from_millis(100) {
info!("Message::Query elapsed: {:?}", now.elapsed());
}
}

Ok(Some(Message::RequestId { chan })) => {
Expand Down Expand Up @@ -773,7 +781,7 @@ impl<S: Store + 'static> RaftNode<S> {

let elapsed = now.elapsed();
now = Instant::now();
if elapsed > heartbeat {
if elapsed >= heartbeat {
heartbeat = Duration::from_millis(100);
if elapsed > Duration::from_millis(500) {
warn!("raft tick elapsed: {:?}", elapsed);
Expand Down Expand Up @@ -955,7 +963,11 @@ impl<S: Store + 'static> RaftNode<S> {

match (deserialize::<Proposals>(entry.get_data())?, self.uncommitteds.remove(&seq)) {
(Proposals::One(data), chan) => {
let apply_start = std::time::Instant::now();
let reply = self.store.apply(&data).await;
if apply_start.elapsed().as_secs() > 3 {
log::warn!("apply, cost time: {:?}", apply_start.elapsed());
}
if let Some(ReplyChan::One((chan, inst))) = chan {
let res = match reply{
Ok(data) => RaftResponse::Response { data },
Expand All @@ -975,7 +987,11 @@ impl<S: Store + 'static> RaftNode<S> {
None
};
while let Some(data) = datas.pop() {
let apply_start = std::time::Instant::now();
let reply = self.store.apply(&data).await;
if apply_start.elapsed().as_secs() > 3 {
log::warn!("apply, cost time: {:?}", apply_start.elapsed());
}
if let Some((chan, inst)) = chans.as_mut().and_then(|cs| cs.pop()) {
if inst.elapsed().as_secs() > 3{
warn!(
Expand Down
8 changes: 4 additions & 4 deletions src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl RaftService for RaftServer {
let (tx, rx) = oneshot::channel();
let _ = sender.send(Message::RequestId { chan: tx }).await;
//let response = rx.await;
let reply = timeout(Duration::from_secs(3), rx).await //@TODO configurable
let reply = timeout(Duration::from_secs(6), rx).await //@TODO configurable
.map_err(|_e| Status::unavailable("recv timeout for reply"))?
.map_err(|_e| Status::unavailable("recv canceled for reply"))?;
match reply {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl RaftService for RaftServer {
let mut reply = raft_service::RaftResponse::default();

// if we don't receive a response after 3secs, we timeout
match timeout(Duration::from_secs(3), rx).await {
match timeout(Duration::from_secs(6), rx).await {
Ok(Ok(raft_response)) => {
reply.inner = serialize(&raft_response).expect("serialize error");
}
Expand Down Expand Up @@ -145,7 +145,7 @@ impl RaftService for RaftServer {

let reply = match sender.try_send(message) {
Ok(()) => {
let reply = match timeout(Duration::from_secs(3), rx).await { //@TODO configurable
let reply = match timeout(Duration::from_secs(6), rx).await { //@TODO configurable
Ok(Ok(raft_response)) => {
match serialize(&raft_response) {
Ok(resp) => {
Expand Down Expand Up @@ -192,7 +192,7 @@ impl RaftService for RaftServer {
match sender.try_send(message) {
Ok(()) => {
// if we don't receive a response after 2secs, we timeout
match timeout(Duration::from_secs(3), rx).await {
match timeout(Duration::from_secs(6), rx).await {
Ok(Ok(raft_response)) => {
reply.inner = serialize(&raft_response).expect("serialize error");
}
Expand Down

0 comments on commit 91009fa

Please sign in to comment.