diff --git a/examples/rmqttraft-warp-memstore/src/main.rs b/examples/rmqttraft-warp-memstore/src/main.rs index 076d280..3aa45b0 100644 --- a/examples/rmqttraft-warp-memstore/src/main.rs +++ b/examples/rmqttraft-warp-memstore/src/main.rs @@ -184,6 +184,7 @@ async fn main() -> std::result::Result<(), Box> { let cfg = Config { reuseaddr: true, reuseport: true, + // grpc_message_size: 50 * 1024 * 1024, ..Default::default() }; let raft = Raft::new(options.raft_laddr, store.clone(), logger.clone(), cfg)?; diff --git a/src/lib.rs b/src/lib.rs index 3c3e3b7..6051d84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ pub struct Config { pub reuseport: bool, pub grpc_timeout: Duration, pub grpc_concurrency_limit: usize, + pub grpc_message_size: usize, //GRPC failed to fuse threshold pub grpc_breaker_threshold: u64, pub grpc_breaker_retry_interval: Duration, @@ -48,6 +49,7 @@ impl Default for Config { reuseport: false, grpc_timeout: Duration::from_secs(6), grpc_concurrency_limit: 200, + grpc_message_size: 50 * 1024 * 1024, grpc_breaker_threshold: 4, grpc_breaker_retry_interval: Duration::from_millis(2500), proposal_batch_size: 50, diff --git a/src/raft.rs b/src/raft.rs index 65b72b1..9216fc2 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -58,6 +58,7 @@ pub struct Mailbox { sender: mpsc::Sender, grpc_timeout: Duration, grpc_concurrency_limit: usize, + grpc_message_size: usize, grpc_breaker_threshold: u64, grpc_breaker_retry_interval: i64, } @@ -83,6 +84,7 @@ impl Mailbox { leader_addr, self.grpc_timeout, self.grpc_concurrency_limit, + self.grpc_message_size, self.grpc_breaker_threshold, self.grpc_breaker_retry_interval, ) @@ -276,6 +278,7 @@ impl Raft { sender: self.tx.clone(), grpc_timeout: self.cfg.grpc_timeout, grpc_concurrency_limit: self.cfg.grpc_concurrency_limit, + grpc_message_size: self.cfg.grpc_message_size, grpc_breaker_threshold: self.cfg.grpc_breaker_threshold, grpc_breaker_retry_interval: self.cfg.grpc_breaker_retry_interval.as_millis() as i64, } @@ -310,7 +313,13 @@ impl Raft { async fn request_leader(&self, peer_addr: String) -> Result> { let (leader_id, leader_addr): (u64, String) = { - let mut client = connect(&peer_addr, 1, self.cfg.grpc_timeout).await?; + let mut client = connect( + &peer_addr, + 1, + self.cfg.grpc_message_size, + self.cfg.grpc_timeout, + ) + .await?; let response = client .request_id(Request::new(Empty::default())) .await? diff --git a/src/raft_node.rs b/src/raft_node.rs index d07a484..67b96ff 100644 --- a/src/raft_node.rs +++ b/src/raft_node.rs @@ -150,6 +150,7 @@ pub struct Peer { grpc_fail_time: Arc, crw_timeout: Duration, concurrency_limit: usize, + grpc_message_size: usize, grpc_breaker_threshold: u64, grpc_breaker_retry_interval: i64, active_tasks: Arc, @@ -160,6 +161,7 @@ impl Peer { addr: String, crw_timeout: Duration, concurrency_limit: usize, + grpc_message_size: usize, grpc_breaker_threshold: u64, grpc_breaker_retry_interval: i64, ) -> Peer { @@ -171,6 +173,7 @@ impl Peer { grpc_fail_time: Arc::new(AtomicI64::new(0)), crw_timeout, concurrency_limit, + grpc_message_size, grpc_breaker_threshold, grpc_breaker_retry_interval, active_tasks: Arc::new(AtomicI64::new(0)), @@ -198,7 +201,13 @@ impl Peer { return Ok(c.clone()); } - let c = connect(&self.addr, self.concurrency_limit, self.crw_timeout).await?; + let c = connect( + &self.addr, + self.concurrency_limit, + self.grpc_message_size, + self.crw_timeout, + ) + .await?; client.replace(c.clone()); Ok(c) } @@ -442,6 +451,7 @@ impl RaftNode { addr.to_string(), self.cfg.grpc_timeout, self.cfg.grpc_concurrency_limit, + self.cfg.grpc_message_size, self.cfg.grpc_breaker_threshold, self.cfg.grpc_breaker_retry_interval.as_millis() as i64, ); diff --git a/src/raft_server.rs b/src/raft_server.rs index ebdaad0..d30a316 100644 --- a/src/raft_server.rs +++ b/src/raft_server.rs @@ -40,7 +40,9 @@ impl RaftServer { let laddr = self.laddr; let _cfg = self.cfg.clone(); info!("listening gRPC requests on: {}", laddr); - let svc = RaftServiceServer::new(self); + let svc = RaftServiceServer::new(self) + .max_decoding_message_size(_cfg.grpc_message_size) + .max_encoding_message_size(_cfg.grpc_message_size); let server = Server::builder().add_service(svc); #[cfg(any(feature = "reuseport", feature = "reuseaddr"))] diff --git a/src/raft_service.rs b/src/raft_service.rs index f8a8e7e..1e96e3c 100644 --- a/src/raft_service.rs +++ b/src/raft_service.rs @@ -31,13 +31,16 @@ pub(crate) fn endpoint( pub(crate) async fn connect( saddr: &str, concurrency_limit: usize, + message_size: usize, timeout: Duration, ) -> Result { Ok(RaftServiceClientType::new( endpoint(saddr, concurrency_limit, timeout)? .connect() .await?, - )) + ) + .max_decoding_message_size(message_size) + .max_encoding_message_size(message_size)) } #[inline] @@ -63,4 +66,4 @@ pub fn bind( tokio::net::TcpListener::from_std(std::net::TcpListener::from(builder))?, ); Ok(listener) -} \ No newline at end of file +}