diff --git a/src/raft.rs b/src/raft.rs index 7a4bb45..f2cb55c 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -136,7 +136,7 @@ impl Mailbox { RaftResponse::WrongLeader { leader_id, leader_addr } => { (leader_id, leader_addr) } - RaftResponse::Error => return Err(Error::Unknown), + RaftResponse::Error(e) => return Err(Error::from(e)), _ => { warn!("Recv other raft response: {:?}", reply); return Err(Error::Unknown); @@ -157,6 +157,7 @@ impl Mailbox { warn!("The target node is not the Leader, leader_id: {}, leader_addr: {:?}", leader_id, leader_addr); Err(Error::NotLeader) }, + RaftResponse::Error(e) => Err(Error::from(e)), _ => { warn!("Recv other raft response, leader_id: {}, leader_addr: {:?}", leader_id, leader_addr); Err(Error::Unknown) @@ -181,11 +182,12 @@ impl Mailbox { let (tx, rx) = oneshot::channel(); let mut sender = self.sender.clone(); match sender.try_send(Message::Query { query, chan: tx }) { - Ok(_) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable + Ok(()) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable Ok(Ok(RaftResponse::Response { data })) => Ok(data), + Ok(Ok(RaftResponse::Error(e))) => Err(Error::from(e)), _ => Err(Error::Unknown), }, - _ => Err(Error::Unknown), + Err(e) => Err(Error::SendError(e.to_string())), } } @@ -198,11 +200,12 @@ impl Mailbox { let mut sender = self.sender.clone(); let (chan, rx) = oneshot::channel(); match sender.send(Message::ConfigChange { change, chan }).await { - Ok(_) => match rx.await { + Ok(()) => match rx.await { Ok(RaftResponse::Ok) => Ok(()), + Ok(RaftResponse::Error(e)) => Err(Error::from(e)), _ => Err(Error::Unknown), }, - _ => Err(Error::Unknown), + Err(e) => Err(Error::SendError(e.to_string())), } } @@ -213,9 +216,10 @@ impl Mailbox { match sender.send(Message::Status { chan: tx }).await { Ok(_) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable Ok(Ok(RaftResponse::Status(status))) => Ok(status), + Ok(Ok(RaftResponse::Error(e))) => Err(Error::from(e)), _ => Err(Error::Unknown), }, - _ => Err(Error::Unknown), + Err(e) => Err(Error::SendError(e.to_string())), } } } diff --git a/src/raft_node.rs b/src/raft_node.rs index 723fec4..ae5d70d 100644 --- a/src/raft_node.rs +++ b/src/raft_node.rs @@ -94,7 +94,7 @@ impl QuerySender { warn!( "error sending query after, {:?}", e ); - if let Err(e) = self.chan.send(RaftResponse::Error) { + if let Err(e) = self.chan.send(RaftResponse::Error(e.to_string())) { warn!( "send_query, Message::Query, RaftResponse send error: {:?}", e @@ -129,7 +129,7 @@ impl QuerySender { "error sending query after {} retries: {}", self.max_retries, e ); - if let Err(e) = self.chan.send(RaftResponse::Error) { + if let Err(e) = self.chan.send(RaftResponse::Error(e.to_string())) { warn!( "send_query, Message::Query, RaftResponse send error: {:?}", e @@ -625,8 +625,8 @@ impl RaftNode { } #[inline] - fn _send_error(&self, chan: oneshot::Sender) { - let raft_response = RaftResponse::Error; + fn _send_error(&self, chan: oneshot::Sender, e: String) { + let raft_response = RaftResponse::Error(e); if let Err(e) = chan.send(raft_response) { warn!("send_error, RaftResponse send error: {:?}", e); } @@ -775,7 +775,7 @@ impl RaftNode { now = Instant::now(); if elapsed > heartbeat { heartbeat = Duration::from_millis(100); - if elapsed > Duration::from_millis(300) { + if elapsed > Duration::from_millis(500) { warn!("raft tick elapsed: {:?}", elapsed); } self.tick(); @@ -788,7 +788,7 @@ impl RaftNode { error!("raft on_ready(..) error: {:?}, elapsed: {:?}", e, on_ready_now.elapsed()); return Err(e); } - if on_ready_now.elapsed() > Duration::from_millis(90) { + if on_ready_now.elapsed() > Duration::from_millis(200) { warn!("raft on_ready(..) elapsed: {:?}", on_ready_now.elapsed()); } } @@ -807,13 +807,13 @@ impl RaftNode { if !ready.entries().is_empty() { let entries = ready.entries(); let store = self.mut_store(); - store.append(entries).unwrap(); + store.append(entries)?; } if let Some(hs) = ready.hs() { // Raft HardState changed, and we need to persist it. let store = self.mut_store(); - store.set_hard_state(hs).unwrap(); + store.set_hard_state(hs)?; } //for message in ready.take_messages() { @@ -955,9 +955,13 @@ impl RaftNode { match (deserialize::(entry.get_data())?, self.uncommitteds.remove(&seq)) { (Proposals::One(data), chan) => { - let reply = self.store.apply(&data).await?; + let reply = self.store.apply(&data).await; if let Some(ReplyChan::One((chan, inst))) = chan { - if let Err(_resp) = chan.send(RaftResponse::Response { data: reply }) { + let res = match reply{ + Ok(data) => RaftResponse::Response { data }, + Err(e) => RaftResponse::Error(e.to_string()), + }; + if let Err(_resp) = chan.send(res) { warn!( "[handle_normal] send RaftResponse error, seq:{}, cost time: {:?}", seq, inst.elapsed() ); @@ -971,7 +975,7 @@ impl RaftNode { None }; while let Some(data) = datas.pop() { - let reply = self.store.apply(&data).await?; + let reply = self.store.apply(&data).await; if let Some((chan, inst)) = chans.as_mut().and_then(|cs| cs.pop()) { if inst.elapsed().as_secs() > 3{ warn!( @@ -979,7 +983,11 @@ impl RaftNode { inst.elapsed(), chan.is_canceled() ); } - if let Err(_resp) = chan.send(RaftResponse::Response { data: reply }) { + let res = match reply{ + Ok(data) => RaftResponse::Response { data }, + Err(e) => RaftResponse::Error(e.to_string()), + }; + if let Err(_resp) = chan.send(res) { warn!( "[handle_normal] send RaftResponse error, seq:{}, cost time: {:?}", seq, inst.elapsed() ); diff --git a/src/raft_server.rs b/src/raft_server.rs index 873e19f..d5d198c 100644 --- a/src/raft_server.rs +++ b/src/raft_server.rs @@ -103,7 +103,7 @@ impl RaftService for RaftServer { } Ok(_) => (), Err(e) => { - reply.inner = serialize(&RaftResponse::Error).unwrap(); + reply.inner = serialize(&RaftResponse::Error("timeout".into())).expect("serialize error"); warn!("timeout waiting for reply, {:?}", e); } } @@ -197,17 +197,17 @@ impl RaftService for RaftServer { reply.inner = serialize(&raft_response).expect("serialize error"); } Ok(Err(e)) => { - reply.inner = serialize(&RaftResponse::Error).expect("serialize error"); + reply.inner = serialize(&RaftResponse::Error(e.to_string())).expect("serialize error"); warn!("send query error, {}", e); } Err(_e) => { - reply.inner = serialize(&RaftResponse::Error).unwrap(); + reply.inner = serialize(&RaftResponse::Error("timeout".into())).expect("serialize error"); warn!("timeout waiting for send query reply"); } } } Err(e) => { - reply.inner = serialize(&RaftResponse::Error).unwrap(); + reply.inner = serialize(&RaftResponse::Error(e.to_string())).expect("serialize error"); warn!("send query error, {}", e) } }