Skip to content

Commit

Permalink
Add config 'grpc_message_size', default: 50M
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Dec 1, 2023
1 parent ee663db commit 84b7880
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 5 deletions.
1 change: 1 addition & 0 deletions examples/rmqttraft-warp-memstore/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let cfg = Config {
reuseaddr: true,
reuseport: true,
// grpc_message_size: 50 * 1024 * 1024,
..Default::default()
};
let raft = Raft::new(options.raft_laddr, store.clone(), logger.clone(), cfg)?;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Config {
pub reuseport: bool,
pub grpc_timeout: Duration,
pub grpc_concurrency_limit: usize,
pub grpc_message_size: usize,
//GRPC failed to fuse threshold
pub grpc_breaker_threshold: u64,
pub grpc_breaker_retry_interval: Duration,
Expand All @@ -48,6 +49,7 @@ impl Default for Config {
reuseport: false,
grpc_timeout: Duration::from_secs(6),
grpc_concurrency_limit: 200,
grpc_message_size: 50 * 1024 * 1024,
grpc_breaker_threshold: 4,
grpc_breaker_retry_interval: Duration::from_millis(2500),
proposal_batch_size: 50,
Expand Down
11 changes: 10 additions & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct Mailbox {
sender: mpsc::Sender<Message>,
grpc_timeout: Duration,
grpc_concurrency_limit: usize,
grpc_message_size: usize,
grpc_breaker_threshold: u64,
grpc_breaker_retry_interval: i64,
}
Expand All @@ -83,6 +84,7 @@ impl Mailbox {
leader_addr,
self.grpc_timeout,
self.grpc_concurrency_limit,
self.grpc_message_size,
self.grpc_breaker_threshold,
self.grpc_breaker_retry_interval,
)
Expand Down Expand Up @@ -276,6 +278,7 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
sender: self.tx.clone(),
grpc_timeout: self.cfg.grpc_timeout,
grpc_concurrency_limit: self.cfg.grpc_concurrency_limit,
grpc_message_size: self.cfg.grpc_message_size,
grpc_breaker_threshold: self.cfg.grpc_breaker_threshold,
grpc_breaker_retry_interval: self.cfg.grpc_breaker_retry_interval.as_millis() as i64,
}
Expand Down Expand Up @@ -310,7 +313,13 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {

async fn request_leader(&self, peer_addr: String) -> Result<Option<(u64, String)>> {
let (leader_id, leader_addr): (u64, String) = {
let mut client = connect(&peer_addr, 1, self.cfg.grpc_timeout).await?;
let mut client = connect(
&peer_addr,
1,
self.cfg.grpc_message_size,
self.cfg.grpc_timeout,
)
.await?;
let response = client
.request_id(Request::new(Empty::default()))
.await?
Expand Down
12 changes: 11 additions & 1 deletion src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub struct Peer {
grpc_fail_time: Arc<AtomicI64>,
crw_timeout: Duration,
concurrency_limit: usize,
grpc_message_size: usize,
grpc_breaker_threshold: u64,
grpc_breaker_retry_interval: i64,
active_tasks: Arc<AtomicI64>,
Expand All @@ -160,6 +161,7 @@ impl Peer {
addr: String,
crw_timeout: Duration,
concurrency_limit: usize,
grpc_message_size: usize,
grpc_breaker_threshold: u64,
grpc_breaker_retry_interval: i64,
) -> Peer {
Expand All @@ -171,6 +173,7 @@ impl Peer {
grpc_fail_time: Arc::new(AtomicI64::new(0)),
crw_timeout,
concurrency_limit,
grpc_message_size,
grpc_breaker_threshold,
grpc_breaker_retry_interval,
active_tasks: Arc::new(AtomicI64::new(0)),
Expand Down Expand Up @@ -198,7 +201,13 @@ impl Peer {
return Ok(c.clone());
}

let c = connect(&self.addr, self.concurrency_limit, self.crw_timeout).await?;
let c = connect(
&self.addr,
self.concurrency_limit,
self.grpc_message_size,
self.crw_timeout,
)
.await?;
client.replace(c.clone());
Ok(c)
}
Expand Down Expand Up @@ -442,6 +451,7 @@ impl<S: Store + 'static> RaftNode<S> {
addr.to_string(),
self.cfg.grpc_timeout,
self.cfg.grpc_concurrency_limit,
self.cfg.grpc_message_size,
self.cfg.grpc_breaker_threshold,
self.cfg.grpc_breaker_retry_interval.as_millis() as i64,
);
Expand Down
4 changes: 3 additions & 1 deletion src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ impl RaftServer {
let laddr = self.laddr;
let _cfg = self.cfg.clone();
info!("listening gRPC requests on: {}", laddr);
let svc = RaftServiceServer::new(self);
let svc = RaftServiceServer::new(self)
.max_decoding_message_size(_cfg.grpc_message_size)
.max_encoding_message_size(_cfg.grpc_message_size);
let server = Server::builder().add_service(svc);

#[cfg(any(feature = "reuseport", feature = "reuseaddr"))]
Expand Down
7 changes: 5 additions & 2 deletions src/raft_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ pub(crate) fn endpoint(
pub(crate) async fn connect(
saddr: &str,
concurrency_limit: usize,
message_size: usize,
timeout: Duration,
) -> Result<RaftServiceClientType> {
Ok(RaftServiceClientType::new(
endpoint(saddr, concurrency_limit, timeout)?
.connect()
.await?,
))
)
.max_decoding_message_size(message_size)
.max_encoding_message_size(message_size))
}

#[inline]
Expand All @@ -63,4 +66,4 @@ pub fn bind(
tokio::net::TcpListener::from_std(std::net::TcpListener::from(builder))?,
);
Ok(listener)
}
}

0 comments on commit 84b7880

Please sign in to comment.