Skip to content

Commit

Permalink
Fix the response result of store.apply(...)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jul 16, 2022
1 parent b692339 commit 1c9cdd4
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
16 changes: 10 additions & 6 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Mailbox {
RaftResponse::WrongLeader { leader_id, leader_addr } => {
(leader_id, leader_addr)
}
RaftResponse::Error => return Err(Error::Unknown),
RaftResponse::Error(e) => return Err(Error::from(e)),
_ => {
warn!("Recv other raft response: {:?}", reply);
return Err(Error::Unknown);
Expand All @@ -157,6 +157,7 @@ impl Mailbox {
warn!("The target node is not the Leader, leader_id: {}, leader_addr: {:?}", leader_id, leader_addr);
Err(Error::NotLeader)
},
RaftResponse::Error(e) => Err(Error::from(e)),
_ => {
warn!("Recv other raft response, leader_id: {}, leader_addr: {:?}", leader_id, leader_addr);
Err(Error::Unknown)
Expand All @@ -181,11 +182,12 @@ impl Mailbox {
let (tx, rx) = oneshot::channel();
let mut sender = self.sender.clone();
match sender.try_send(Message::Query { query, chan: tx }) {
Ok(_) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable
Ok(()) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable
Ok(Ok(RaftResponse::Response { data })) => Ok(data),
Ok(Ok(RaftResponse::Error(e))) => Err(Error::from(e)),
_ => Err(Error::Unknown),
},
_ => Err(Error::Unknown),
Err(e) => Err(Error::SendError(e.to_string())),
}
}

Expand All @@ -198,11 +200,12 @@ impl Mailbox {
let mut sender = self.sender.clone();
let (chan, rx) = oneshot::channel();
match sender.send(Message::ConfigChange { change, chan }).await {
Ok(_) => match rx.await {
Ok(()) => match rx.await {
Ok(RaftResponse::Ok) => Ok(()),
Ok(RaftResponse::Error(e)) => Err(Error::from(e)),
_ => Err(Error::Unknown),
},
_ => Err(Error::Unknown),
Err(e) => Err(Error::SendError(e.to_string())),
}
}

Expand All @@ -213,9 +216,10 @@ impl Mailbox {
match sender.send(Message::Status { chan: tx }).await {
Ok(_) => match timeout(Duration::from_secs(5), rx).await { //@TODO configurable
Ok(Ok(RaftResponse::Status(status))) => Ok(status),
Ok(Ok(RaftResponse::Error(e))) => Err(Error::from(e)),
_ => Err(Error::Unknown),
},
_ => Err(Error::Unknown),
Err(e) => Err(Error::SendError(e.to_string())),
}
}
}
Expand Down
32 changes: 20 additions & 12 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl QuerySender {
warn!(
"error sending query after, {:?}", e
);
if let Err(e) = self.chan.send(RaftResponse::Error) {
if let Err(e) = self.chan.send(RaftResponse::Error(e.to_string())) {
warn!(
"send_query, Message::Query, RaftResponse send error: {:?}",
e
Expand Down Expand Up @@ -129,7 +129,7 @@ impl QuerySender {
"error sending query after {} retries: {}",
self.max_retries, e
);
if let Err(e) = self.chan.send(RaftResponse::Error) {
if let Err(e) = self.chan.send(RaftResponse::Error(e.to_string())) {
warn!(
"send_query, Message::Query, RaftResponse send error: {:?}",
e
Expand Down Expand Up @@ -625,8 +625,8 @@ impl<S: Store + 'static> RaftNode<S> {
}

#[inline]
fn _send_error(&self, chan: oneshot::Sender<RaftResponse>) {
let raft_response = RaftResponse::Error;
fn _send_error(&self, chan: oneshot::Sender<RaftResponse>, e: String) {
let raft_response = RaftResponse::Error(e);
if let Err(e) = chan.send(raft_response) {
warn!("send_error, RaftResponse send error: {:?}", e);
}
Expand Down Expand Up @@ -775,7 +775,7 @@ impl<S: Store + 'static> RaftNode<S> {
now = Instant::now();
if elapsed > heartbeat {
heartbeat = Duration::from_millis(100);
if elapsed > Duration::from_millis(300) {
if elapsed > Duration::from_millis(500) {
warn!("raft tick elapsed: {:?}", elapsed);
}
self.tick();
Expand All @@ -788,7 +788,7 @@ impl<S: Store + 'static> RaftNode<S> {
error!("raft on_ready(..) error: {:?}, elapsed: {:?}", e, on_ready_now.elapsed());
return Err(e);
}
if on_ready_now.elapsed() > Duration::from_millis(90) {
if on_ready_now.elapsed() > Duration::from_millis(200) {
warn!("raft on_ready(..) elapsed: {:?}", on_ready_now.elapsed());
}
}
Expand All @@ -807,13 +807,13 @@ impl<S: Store + 'static> RaftNode<S> {
if !ready.entries().is_empty() {
let entries = ready.entries();
let store = self.mut_store();
store.append(entries).unwrap();
store.append(entries)?;
}

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
let store = self.mut_store();
store.set_hard_state(hs).unwrap();
store.set_hard_state(hs)?;
}

//for message in ready.take_messages() {
Expand Down Expand Up @@ -955,9 +955,13 @@ impl<S: Store + 'static> RaftNode<S> {

match (deserialize::<Proposals>(entry.get_data())?, self.uncommitteds.remove(&seq)) {
(Proposals::One(data), chan) => {
let reply = self.store.apply(&data).await?;
let reply = self.store.apply(&data).await;
if let Some(ReplyChan::One((chan, inst))) = chan {
if let Err(_resp) = chan.send(RaftResponse::Response { data: reply }) {
let res = match reply{
Ok(data) => RaftResponse::Response { data },
Err(e) => RaftResponse::Error(e.to_string()),
};
if let Err(_resp) = chan.send(res) {
warn!(
"[handle_normal] send RaftResponse error, seq:{}, cost time: {:?}", seq, inst.elapsed()
);
Expand All @@ -971,15 +975,19 @@ impl<S: Store + 'static> RaftNode<S> {
None
};
while let Some(data) = datas.pop() {
let reply = self.store.apply(&data).await?;
let reply = self.store.apply(&data).await;
if let Some((chan, inst)) = chans.as_mut().and_then(|cs| cs.pop()) {
if inst.elapsed().as_secs() > 3{
warn!(
"[handle_normal] cost time, {:?}, chan is canceled: {}",
inst.elapsed(), chan.is_canceled()
);
}
if let Err(_resp) = chan.send(RaftResponse::Response { data: reply }) {
let res = match reply{
Ok(data) => RaftResponse::Response { data },
Err(e) => RaftResponse::Error(e.to_string()),
};
if let Err(_resp) = chan.send(res) {
warn!(
"[handle_normal] send RaftResponse error, seq:{}, cost time: {:?}", seq, inst.elapsed()
);
Expand Down
8 changes: 4 additions & 4 deletions src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl RaftService for RaftServer {
}
Ok(_) => (),
Err(e) => {
reply.inner = serialize(&RaftResponse::Error).unwrap();
reply.inner = serialize(&RaftResponse::Error("timeout".into())).expect("serialize error");
warn!("timeout waiting for reply, {:?}", e);
}
}
Expand Down Expand Up @@ -197,17 +197,17 @@ impl RaftService for RaftServer {
reply.inner = serialize(&raft_response).expect("serialize error");
}
Ok(Err(e)) => {
reply.inner = serialize(&RaftResponse::Error).expect("serialize error");
reply.inner = serialize(&RaftResponse::Error(e.to_string())).expect("serialize error");
warn!("send query error, {}", e);
}
Err(_e) => {
reply.inner = serialize(&RaftResponse::Error).unwrap();
reply.inner = serialize(&RaftResponse::Error("timeout".into())).expect("serialize error");
warn!("timeout waiting for send query reply");
}
}
}
Err(e) => {
reply.inner = serialize(&RaftResponse::Error).unwrap();
reply.inner = serialize(&RaftResponse::Error(e.to_string())).expect("serialize error");
warn!("send query error, {}", e)
}
}
Expand Down

0 comments on commit 1c9cdd4

Please sign in to comment.