From d6d8d9514dcd56aefd4a814ec79daf78c3a535bf Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 27 Nov 2023 17:40:25 +0800 Subject: [PATCH 1/4] improve connection pool value --- src/rpc_client/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 52f299aa8813b..49e6b7dc0031f 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -37,7 +37,7 @@ use anyhow::anyhow; use async_trait::async_trait; use futures::future::try_join_all; use futures::stream::{BoxStream, Peekable}; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryFutureExt}; use moka::future::Cache; use rand::prelude::SliceRandom; use risingwave_common::util::addr::HostAddr; @@ -78,7 +78,7 @@ pub trait RpcClient: Send + Sync + 'static + Clone { pub struct RpcClientPool { connection_pool_size: u16, - clients: Cache>, + clients: Cache>>, } impl Default for RpcClientPool @@ -115,7 +115,8 @@ where .clients .try_get_with( addr.clone(), - S::new_clients(addr.clone(), self.connection_pool_size as usize), + S::new_clients(addr.clone(), self.connection_pool_size as usize) + .map_ok(|v| Arc::new(v)), ) .await .map_err(|e| -> RpcError { From eae8e6d035cdf5de7a4b953f909187b9e3b86fb4 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 27 Nov 2023 17:48:09 +0800 Subject: [PATCH 2/4] fmt --- src/rpc_client/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 49e6b7dc0031f..b524039817797 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -116,7 +116,7 @@ where .try_get_with( addr.clone(), S::new_clients(addr.clone(), self.connection_pool_size as usize) - .map_ok(|v| Arc::new(v)), + .map_ok(Arc::new), ) .await .map_err(|e| -> RpcError { From adc1721427ee6a0a2292428e37b543cb713ccb07 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 27 Nov 2023 17:51:42 +0800 Subject: [PATCH 3/4] fmt --- src/rpc_client/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index b524039817797..0e035cd99eeaf 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -115,8 +115,7 @@ where .clients .try_get_with( addr.clone(), - S::new_clients(addr.clone(), self.connection_pool_size as usize) - .map_ok(Arc::new), + S::new_clients(addr.clone(), self.connection_pool_size as usize).map_ok(Arc::new), ) .await .map_err(|e| -> RpcError { From eec1e00942323bd3cb83447ee4addecabd5525b4 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 27 Nov 2023 17:58:08 +0800 Subject: [PATCH 4/4] refine --- src/rpc_client/src/lib.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 0e035cd99eeaf..0a8c774298b3c 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -37,7 +37,7 @@ use anyhow::anyhow; use async_trait::async_trait; use futures::future::try_join_all; use futures::stream::{BoxStream, Peekable}; -use futures::{Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt}; use moka::future::Cache; use rand::prelude::SliceRandom; use risingwave_common::util::addr::HostAddr; @@ -69,8 +69,10 @@ pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; pub trait RpcClient: Send + Sync + 'static + Clone { async fn new_client(host_addr: HostAddr) -> Result; - async fn new_clients(host_addr: HostAddr, size: usize) -> Result> { - try_join_all(repeat(host_addr).take(size).map(Self::new_client)).await + async fn new_clients(host_addr: HostAddr, size: usize) -> Result>> { + try_join_all(repeat(host_addr).take(size).map(Self::new_client)) + .await + .map(Arc::new) } } @@ -115,7 +117,7 @@ where .clients .try_get_with( addr.clone(), - S::new_clients(addr.clone(), self.connection_pool_size as usize).map_ok(Arc::new), + S::new_clients(addr.clone(), self.connection_pool_size as usize), ) .await .map_err(|e| -> RpcError {