Skip to content

Commit

Permalink
cargo clippy & fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jul 30, 2023
1 parent 8536257 commit 66bf377
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::time::Duration;

pub use crate::error::{Error, Result};
pub use crate::raft::{Mailbox, Raft, Store};
pub use crate::message::Status;
pub use crate::raft::{Mailbox, Raft, Store};
pub use tikv_raft::ReadOnlyOption;

mod error;
Expand Down
23 changes: 18 additions & 5 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::error::{Error, Result};
use crate::message::{Message, RaftResponse, Status};
use crate::raft_node::{Peer, RaftNode};
use crate::raft_server::RaftServer;
use crate::raft_service::{ConfChange as RiteraftConfChange, Empty, ResultCode};
use crate::raft_service::connect;
use crate::raft_service::{ConfChange as RiteraftConfChange, Empty, ResultCode};
use crate::Config;

type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
Expand Down Expand Up @@ -254,8 +254,16 @@ pub struct Raft<S: Store + 'static> {

impl<S: Store + Send + Sync + 'static> Raft<S> {
/// creates a new node with the given address and store.
pub fn new<A: ToSocketAddrs>(addr: A, store: S, logger: slog::Logger, cfg: Config) -> Result<Self> {
let addr = addr.to_socket_addrs()?.next().ok_or(Error::from("None"))?;
pub fn new<A: ToSocketAddrs>(
addr: A,
store: S,
logger: slog::Logger,
cfg: Config,
) -> Result<Self> {
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| Error::from("None"))?;
let (tx, rx) = mpsc::channel(100_000);
let cfg = Arc::new(cfg);
Ok(Self {
Expand Down Expand Up @@ -432,7 +440,9 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
let mut change = ConfChange::default();
change.set_node_id(node_id);
change.set_change_type(ConfChangeType::AddNode);
change.set_context(prost::bytes::Bytes::from(serialize(&self.addr.to_string())?));
change.set_context(prost::bytes::Bytes::from(serialize(
&self.addr.to_string(),
)?));
// change.set_context(serialize(&self.addr)?);

let change = RiteraftConfChange {
Expand All @@ -447,7 +457,10 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
peer_addrs,
} = deserialize(&raft_response.inner)?
{
info!("change_config response.assigned_id: {:?}, peer_addrs: {:?}", assigned_id, peer_addrs);
info!(
"change_config response.assigned_id: {:?}, peer_addrs: {:?}",
assigned_id, peer_addrs
);
for (id, addr) in peer_addrs {
if id != assigned_id {
node.add_peer(&addr, id);
Expand Down
38 changes: 16 additions & 22 deletions src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ impl RaftServer {
let addr = self.addr;
info!("listening gRPC requests on: {}", addr);
let svc = RaftServiceServer::new(self);
Server::builder()
.add_service(svc)
.serve(addr)
.await?;
Server::builder().add_service(svc).serve(addr).await?;
info!("server has quit");
Ok(())
}
Expand Down Expand Up @@ -143,26 +140,23 @@ impl RaftService for RaftServer {
let message = Message::Propose { proposal, chan: tx };

let reply = match sender.try_send(message) {
Ok(()) => {
let reply = match timeout(self.timeout, rx).await {
Ok(Ok(raft_response)) => match serialize(&raft_response) {
Ok(resp) => Ok(Response::new(raft_service::RaftResponse { inner: resp })),
Err(e) => {
warn!("serialize error, {}", e);
Err(Status::unavailable("serialize error"))
}
},
Ok(Err(e)) => {
warn!("recv error for reply, {}", e);
Err(Status::unavailable("recv error for reply"))
}
Ok(()) => match timeout(self.timeout, rx).await {
Ok(Ok(raft_response)) => match serialize(&raft_response) {
Ok(resp) => Ok(Response::new(raft_service::RaftResponse { inner: resp })),
Err(e) => {
warn!("timeout waiting for reply, {}", e);
Err(Status::unavailable("timeout waiting for reply"))
warn!("serialize error, {}", e);
Err(Status::unavailable("serialize error"))
}
};
reply
}
},
Ok(Err(e)) => {
warn!("recv error for reply, {}", e);
Err(Status::unavailable("recv error for reply"))
}
Err(e) => {
warn!("timeout waiting for reply, {}", e);
Err(Status::unavailable("timeout waiting for reply"))
}
},
Err(e) => {
warn!("error for try send message, {}", e);
Err(Status::unavailable("error for try send message"))
Expand Down
23 changes: 18 additions & 5 deletions src/raft_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,31 @@ tonic::include_proto!("raftservice");
pub(crate) type RaftServiceClientType = RaftServiceClient<Channel>;

#[inline]
pub(crate) fn endpoint(saddr: &str, concurrency_limit: usize, timeout: Duration) -> Result<Endpoint> {
pub(crate) fn endpoint(
saddr: &str,
concurrency_limit: usize,
timeout: Duration,
) -> Result<Endpoint> {
let endpoint = Channel::from_shared(format!("http://{}", saddr))
.map(|endpoint| {
endpoint
.concurrency_limit(concurrency_limit)
.connect_timeout(timeout)
.timeout(timeout)
}).map_err(anyhow::Error::new)?;
})
.map_err(anyhow::Error::new)?;
Ok(endpoint)
}

#[inline]
pub(crate) async fn connect(saddr: &str, concurrency_limit: usize, timeout: Duration) -> Result<RaftServiceClientType> {
Ok(RaftServiceClientType::new(endpoint(saddr, concurrency_limit, timeout)?.connect().await?))
}
pub(crate) async fn connect(
saddr: &str,
concurrency_limit: usize,
timeout: Duration,
) -> Result<RaftServiceClientType> {
Ok(RaftServiceClientType::new(
endpoint(saddr, concurrency_limit, timeout)?
.connect()
.await?,
))
}
2 changes: 1 addition & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Storage for MemStorage {
low: u64,
high: u64,
max_size: impl Into<Option<u64>>,
context: GetEntriesContext
context: GetEntriesContext,
) -> tikv_raft::Result<Vec<Entry>> {
let entries = self.core.entries(low, high, max_size, context)?;
Ok(entries)
Expand Down

0 comments on commit 66bf377

Please sign in to comment.