Skip to content

Commit

Permalink
feat(streaming): follow server.connection_pool_size config for exch…
Browse files Browse the repository at this point in the history
…ange service (#17755) (#17798)

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
github-actions[bot] and BugenZhao authored Jul 24, 2024
1 parent a2ef134 commit fd40d91
Show file tree
Hide file tree
Showing 16 changed files with 64 additions and 37 deletions.
4 changes: 2 additions & 2 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct BatchEnvironment {
/// Executor level metrics.
executor_metrics: Arc<BatchExecutorMetrics>,

/// Compute client pool for grpc exchange.
/// Compute client pool for batch gRPC exchange.
client_pool: ComputeClientPoolRef,

/// Manages dml information.
Expand Down Expand Up @@ -112,7 +112,7 @@ impl BatchEnvironment {
MonitoredStorageMetrics::unused(),
)),
task_metrics: Arc::new(BatchTaskMetrics::for_test()),
client_pool: Arc::new(ComputeClientPool::default()),
client_pool: Arc::new(ComputeClientPool::for_test()),
dml_manager: Arc::new(DmlManager::for_test()),
source_metrics: Arc::new(SourceMetrics::default()),
executor_metrics: Arc::new(BatchExecutorMetrics::for_test()),
Expand Down
6 changes: 4 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub async fn compute_node_serve(
));

// Initialize batch environment.
let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_env = BatchEnvironment::new(
batch_mgr.clone(),
advertise_addr.clone(),
Expand All @@ -345,13 +345,14 @@ pub async fn compute_node_serve(
state_store.clone(),
batch_task_metrics.clone(),
batch_executor_metrics.clone(),
client_pool,
batch_client_pool,
dml_mgr.clone(),
source_metrics.clone(),
config.server.metrics_level,
);

// Initialize the streaming environment.
let stream_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let stream_env = StreamEnvironment::new(
advertise_addr.clone(),
stream_config,
Expand All @@ -361,6 +362,7 @@ pub async fn compute_node_serve(
system_params_manager.clone(),
source_metrics,
meta_client.clone(),
stream_client_pool,
);

let stream_mgr = LocalStreamManager::new(
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> {
let compute_nodes = meta_client
.list_worker_nodes(Some(WorkerType::ComputeNode))
.await?;
let clients = ComputeClientPool::default();
let clients = ComputeClientPool::adhoc();

// FIXME: the compute node may not be accessible directly from risectl, we may let the meta
// service collect the reports from all compute nodes in the future.
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn cpu_profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<(
.into_iter()
.filter(|w| w.r#type() == WorkerType::ComputeNode);

let clients = ComputeClientPool::default();
let clients = ComputeClientPool::adhoc();

let profile_root_path = std::env::var("PREFIX_PROFILING").unwrap_or_else(|_| {
tracing::info!("PREFIX_PROFILING is not set, using current directory");
Expand Down Expand Up @@ -96,7 +96,7 @@ pub async fn heap_profile(context: &CtlContext, dir: Option<String>) -> anyhow::
.into_iter()
.filter(|w| w.r#type() == WorkerType::ComputeNode);

let clients = ComputeClientPool::default();
let clients = ComputeClientPool::adhoc();

let mut profile_futs = vec![];

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ pub(crate) mod tests {
async fn test_query_should_not_hang_with_empty_worker() {
let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false);
let compute_client_pool = Arc::new(ComputeClientPool::default());
let compute_client_pool = Arc::new(ComputeClientPool::for_test());
let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(Arc::new(
MockFrontendMetaClient {},
)));
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl FrontendEnv {
let meta_client = Arc::new(MockFrontendMetaClient {});
let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
let compute_client_pool = Arc::new(ComputeClientPool::default());
let compute_client_pool = Arc::new(ComputeClientPool::for_test());
let query_manager = QueryManager::new(
worker_node_manager.clone(),
compute_client_pool,
Expand All @@ -193,7 +193,7 @@ impl FrontendEnv {
None,
);
let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
let client_pool = Arc::new(ComputeClientPool::default());
let client_pool = Arc::new(ComputeClientPool::for_test());
let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ pub async fn start_service_as_election_leader(
prometheus_client,
prometheus_selector,
metadata_manager: metadata_manager.clone(),
compute_clients: ComputeClientPool::default(),
compute_clients: ComputeClientPool::new(1), // typically no need for plural clients
diagnose_command,
trace_state,
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ impl DiagnoseCommand {

let mut all = StackTraceResponse::default();

let compute_clients = ComputeClientPool::default();
let compute_clients = ComputeClientPool::adhoc();
for worker_node in &worker_nodes {
if let Ok(client) = compute_clients.get(worker_node).await
&& let Ok(result) = client.stack_trace().await
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl MetaSrvEnv {
let notification_manager =
Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
let stream_client_pool = Arc::new(StreamClientPool::default());
let stream_client_pool = Arc::new(StreamClientPool::new(1)); // typically no need for plural clients
let event_log_manager = Arc::new(start_event_log_manager(
opts.event_log_enabled,
opts.event_log_channel_max_size,
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,4 @@ impl RpcClient for ComputeClient {
}

pub type ComputeClientPool = RpcClientPool<ComputeClient>;
pub type ComputeClientPoolRef = Arc<ComputeClientPool>;
pub type ComputeClientPoolRef = Arc<ComputeClientPool>; // TODO: no need for `Arc` since clone is cheap and shared
28 changes: 22 additions & 6 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,42 @@ pub struct RpcClientPool<S> {
clients: Cache<HostAddr, Arc<Vec<S>>>,
}

impl<S> Default for RpcClientPool<S>
where
S: RpcClient,
{
fn default() -> Self {
Self::new(1)
impl<S> std::fmt::Debug for RpcClientPool<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RpcClientPool")
.field("connection_pool_size", &self.connection_pool_size)
.field("type", &type_name::<S>())
.field("len", &self.clients.entry_count())
.finish()
}
}

/// Intentionally not implementing `Default` to let callers be explicit about the pool size.
impl<S> !Default for RpcClientPool<S> {}

impl<S> RpcClientPool<S>
where
S: RpcClient,
{
/// Create a new pool with the given `connection_pool_size`, which is the number of
/// connections to each node that will be reused.
pub fn new(connection_pool_size: u16) -> Self {
Self {
connection_pool_size,
clients: Cache::new(u64::MAX),
}
}

/// Create a pool for testing purposes. Same as [`Self::adhoc`].
pub fn for_test() -> Self {
Self::adhoc()
}

/// Create a pool for ad-hoc usage, where the number of connections to each node is 1.
pub fn adhoc() -> Self {
Self::new(1)
}

/// Gets the RPC client for the given node. If the connection is not established, a
/// new client will be created and returned.
pub async fn get(&self, node: &WorkerNode) -> Result<S> {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub(crate) fn new_input(
} else {
RemoteInput::new(
context.local_barrier_manager.clone(),
context.compute_client_pool.clone(),
context.compute_client_pool.as_ref().to_owned(),
upstream_addr,
(upstream_actor_id, actor_id),
(upstream_fragment_id, fragment_id),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ mod tests {
assert!(server_run.load(Ordering::SeqCst));

let remote_input = {
let pool = ComputeClientPool::default();
let pool = ComputeClientPool::for_test();
RemoteInput::new(
LocalBarrierManager::for_test(),
pool,
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,7 @@ impl LocalBarrierWorker {
let (event_tx, event_rx) = unbounded_channel();
let (failure_tx, failure_rx) = unbounded_channel();
let shared_context = Arc::new(SharedContext::new(
actor_manager.env.server_address().clone(),
actor_manager.env.config(),
&actor_manager.env,
LocalBarrierManager {
barrier_event_sender: event_tx,
actor_failure_sender: failure_tx,
Expand Down
13 changes: 12 additions & 1 deletion src/stream/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common::util::addr::HostAddr;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_rpc_client::MetaClient;
use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
use risingwave_storage::StateStoreImpl;

pub(crate) type WorkerNodeId = u32;
Expand Down Expand Up @@ -55,6 +55,9 @@ pub struct StreamEnvironment {

/// Meta client. Use `None` for test only
meta_client: Option<MetaClient>,

/// Compute client pool for streaming gRPC exchange.
client_pool: ComputeClientPoolRef,
}

impl StreamEnvironment {
Expand All @@ -68,6 +71,7 @@ impl StreamEnvironment {
system_params_manager: LocalSystemParamsManagerRef,
source_metrics: Arc<SourceMetrics>,
meta_client: MetaClient,
client_pool: ComputeClientPoolRef,
) -> Self {
StreamEnvironment {
server_addr,
Expand All @@ -79,6 +83,7 @@ impl StreamEnvironment {
source_metrics,
total_mem_val: Arc::new(TrAdder::new()),
meta_client: Some(meta_client),
client_pool,
}
}

Expand All @@ -87,6 +92,7 @@ impl StreamEnvironment {
pub fn for_test() -> Self {
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_storage::monitor::MonitoredStorageMetrics;
StreamEnvironment {
server_addr: "127.0.0.1:5688".parse().unwrap(),
Expand All @@ -100,6 +106,7 @@ impl StreamEnvironment {
source_metrics: Arc::new(SourceMetrics::default()),
total_mem_val: Arc::new(TrAdder::new()),
meta_client: None,
client_pool: Arc::new(ComputeClientPool::for_test()),
}
}

Expand Down Expand Up @@ -138,4 +145,8 @@ impl StreamEnvironment {
pub fn meta_client(&self) -> Option<MetaClient> {
self.meta_client.clone()
}

pub fn client_pool(&self) -> ComputeClientPoolRef {
self.client_pool.clone()
}
}
23 changes: 11 additions & 12 deletions src/stream/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock};
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::ActorInfo;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_rpc_client::ComputeClientPoolRef;

use crate::error::StreamResult;
use crate::executor::exchange::permit::{self, Receiver, Sender};
Expand Down Expand Up @@ -75,10 +75,10 @@ pub struct SharedContext {
/// between two actors/actors.
pub(crate) addr: HostAddr,

/// The pool of compute clients.
/// Compute client pool for streaming gRPC exchange.
// TODO: currently the client pool won't be cleared. Should remove compute clients when
// disconnected.
pub(crate) compute_client_pool: ComputeClientPool,
pub(crate) compute_client_pool: ComputeClientPoolRef,

pub(crate) config: StreamingConfig,

Expand All @@ -94,30 +94,28 @@ impl std::fmt::Debug for SharedContext {
}

impl SharedContext {
pub fn new(
addr: HostAddr,
config: &StreamingConfig,
local_barrier_manager: LocalBarrierManager,
) -> Self {
pub fn new(env: &StreamEnvironment, local_barrier_manager: LocalBarrierManager) -> Self {
Self {
channel_map: Default::default(),
actor_infos: Default::default(),
addr,
compute_client_pool: ComputeClientPool::default(),
config: config.clone(),
addr: env.server_address().clone(),
config: env.config().as_ref().to_owned(),
compute_client_pool: env.client_pool(),
local_barrier_manager,
}
}

#[cfg(test)]
pub fn for_test() -> Self {
use std::sync::Arc;

use risingwave_common::config::StreamingDeveloperConfig;
use risingwave_rpc_client::ComputeClientPool;

Self {
channel_map: Default::default(),
actor_infos: Default::default(),
addr: LOCAL_TEST_ADDR.clone(),
compute_client_pool: ComputeClientPool::default(),
config: StreamingConfig {
developer: StreamingDeveloperConfig {
exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
Expand All @@ -127,6 +125,7 @@ impl SharedContext {
},
..Default::default()
},
compute_client_pool: Arc::new(ComputeClientPool::for_test()),
local_barrier_manager: LocalBarrierManager::for_test(),
}
}
Expand Down

0 comments on commit fd40d91

Please sign in to comment.