Skip to content

Commit

Permalink
Optimize error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jun 19, 2023
1 parent 3a3a9e8 commit b5e244f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
6 changes: 3 additions & 3 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
self.store,
&self.logger,
self.cfg.clone(),
);
)?;

let server = RaftServer::new(self.tx, addr, self.cfg.grpc_timeout);
let server = RaftServer::new(self.tx, addr, self.cfg.grpc_timeout)?;
let _server_handle = tokio::spawn(server.run());
let node_handle = tokio::spawn(async {
if let Err(e) = node.run().await {
Expand Down Expand Up @@ -391,7 +391,7 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
)?;
let peer = node.add_peer(&leader_addr, leader_id);
let mut client = peer.client().await?;
let server = RaftServer::new(self.tx, addr, self.cfg.grpc_timeout);
let server = RaftServer::new(self.tx, addr, self.cfg.grpc_timeout)?;
let _server_handle = tokio::spawn(server.run());
// let node_handle = tokio::spawn(node.run());

Expand Down
15 changes: 8 additions & 7 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ impl<S: Store + 'static> RaftNode<S> {
store: S,
logger: &slog::Logger,
cfg: Arc<Config>,
) -> Self {
) -> Result<Self> {
let config = Self::new_config(id, &cfg.raft_cfg);
config.validate().unwrap();
config.validate()?;

let mut s = Snapshot::default();
// Because we don't use the same configuration to initialize every node, so we use
Expand All @@ -370,8 +370,8 @@ impl<S: Store + 'static> RaftNode<S> {
s.mut_metadata().mut_conf_state().voters = vec![id];

let mut storage: MemStorage = MemStorage::create();
storage.apply_snapshot(s).unwrap();
let mut inner = RawNode::new(&config, storage, logger).unwrap();
storage.apply_snapshot(s)?;
let mut inner = RawNode::new(&config, storage, logger)?;
let peers = HashMap::new();
let seq = AtomicU64::new(0);
let last_snap_time = Instant::now(); // + cfg.snapshot_interval;
Expand All @@ -381,7 +381,7 @@ impl<S: Store + 'static> RaftNode<S> {

// let msg_tx = Self::start_message_sender();
let uncommitteds = HashMap::new();
RaftNode {
let node = RaftNode {
inner,
rcv,
peers,
Expand All @@ -393,7 +393,8 @@ impl<S: Store + 'static> RaftNode<S> {
should_quit: false,
last_snap_time,
cfg,
}
};
Ok(node)
}

pub fn new_follower(
Expand All @@ -405,7 +406,7 @@ impl<S: Store + 'static> RaftNode<S> {
cfg: Arc<Config>,
) -> Result<Self> {
let config = Self::new_config(id, &cfg.raft_cfg);
config.validate().unwrap();
config.validate()?;

let storage = MemStorage::create();
let inner = RawNode::new(&config, storage, logger)?;
Expand Down
7 changes: 4 additions & 3 deletions src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::time::timeout;
use tonic::transport::Server;
use tonic::{Request, Response, Status};

use crate::error; //::{Error, Result};
use crate::message::{Message, RaftResponse};
use crate::raft_service::raft_service_server::{RaftService, RaftServiceServer};
use crate::raft_service::{
Expand All @@ -24,9 +25,9 @@ pub struct RaftServer {
}

impl RaftServer {
pub fn new<A: ToSocketAddrs>(snd: mpsc::Sender<Message>, addr: A, timeout: Duration) -> Self {
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
RaftServer { snd, addr, timeout }
pub fn new<A: ToSocketAddrs>(snd: mpsc::Sender<Message>, addr: A, timeout: Duration) -> error::Result<Self> {
let addr = addr.to_socket_addrs()?.next().ok_or(error::Error::from("None"))?;
Ok(RaftServer { snd, addr, timeout })
}

pub async fn run(self) {
Expand Down

0 comments on commit b5e244f

Please sign in to comment.