From b50d33434e79dbc42a664706b112769736d8dccc Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:38:29 +0800 Subject: [PATCH] refactor: use separate configuration for exchange connection pool for batch and streaming (#17768) (#17808) Co-authored-by: Bugen Zhao --- src/common/src/config.rs | 53 +++++++++++++++++++++++++++- src/compute/src/server.rs | 8 +++-- src/config/docs.md | 2 +- src/config/example.toml | 1 + src/frontend/src/session.rs | 29 +++++++-------- src/rpc_client/src/compute_client.rs | 6 ++++ 6 files changed, 78 insertions(+), 21 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index a4d827db84dfd..3310a204de0a8 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -472,8 +472,14 @@ pub struct ServerConfig { #[serde(default = "default::server::heartbeat_interval_ms")] pub heartbeat_interval_ms: u32, + /// The default number of the connections when connecting to a gRPC server. + /// + /// For the connections used in streaming or batch exchange, please refer to the entries in + /// `[stream.developer]` and `[batch.developer]` sections. This value will be used if they + /// are not specified. #[serde(default = "default::server::connection_pool_size")] - pub connection_pool_size: u16, + // Intentionally made private to avoid abuse. Check the related methods on `RwConfig`. + connection_pool_size: u16, /// Used for control the metrics level, similar to log level. #[serde(default = "default::server::metrics_level")] @@ -991,6 +997,11 @@ pub struct StreamingDeveloperConfig { /// If number of hash join matches exceeds this threshold number, /// it will be logged. pub high_join_amplification_threshold: usize, + + /// The number of the connections for streaming remote exchange between two nodes. + /// If not specified, the value of `server.connection_pool_size` will be used. + #[serde(default = "default::developer::stream_exchange_connection_pool_size")] + pub exchange_connection_pool_size: Option, } /// The subsections `[batch.developer]`. @@ -1010,6 +1021,11 @@ pub struct BatchDeveloperConfig { /// The size of a chunk produced by `RowSeqScanExecutor` #[serde(default = "default::developer::batch_chunk_size")] pub chunk_size: usize, + + /// The number of the connections for batch remote exchange between two nodes. + /// If not specified, the value of `server.connection_pool_size` will be used. + #[serde(default = "default::developer::batch_exchange_connection_pool_size")] + exchange_connection_pool_size: Option, } macro_rules! define_system_config { @@ -1249,6 +1265,30 @@ impl SystemConfig { } } +impl RwConfig { + pub const fn default_connection_pool_size(&self) -> u16 { + self.server.connection_pool_size + } + + /// Returns [`StreamingDeveloperConfig::exchange_connection_pool_size`] if set, + /// otherwise [`ServerConfig::connection_pool_size`]. + pub fn streaming_exchange_connection_pool_size(&self) -> u16 { + self.streaming + .developer + .exchange_connection_pool_size + .unwrap_or_else(|| self.default_connection_pool_size()) + } + + /// Returns [`BatchDeveloperConfig::exchange_connection_pool_size`] if set, + /// otherwise [`ServerConfig::connection_pool_size`]. + pub fn batch_exchange_connection_pool_size(&self) -> u16 { + self.batch + .developer + .exchange_connection_pool_size + .unwrap_or_else(|| self.default_connection_pool_size()) + } +} + pub mod default { pub mod meta { use crate::config::{DefaultParallelism, MetaBackend}; @@ -1722,6 +1762,12 @@ pub mod default { 1024 } + /// Default to unset to be compatible with the behavior before this config is introduced, + /// that is, follow the value of `server.connection_pool_size`. + pub fn batch_exchange_connection_pool_size() -> Option { + None + } + pub fn stream_enable_executor_row_count() -> bool { false } @@ -1817,6 +1863,11 @@ pub mod default { pub fn stream_high_join_amplification_threshold() -> usize { 2048 } + + /// Default to 1 to be compatible with the behavior before this config is introduced. + pub fn stream_exchange_connection_pool_size() -> Option { + Some(1) + } } pub use crate::system_param::default as system; diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index b4b915087c140..ef62422be25dc 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -336,7 +336,9 @@ pub async fn compute_node_serve( )); // Initialize batch environment. - let batch_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); + let batch_client_pool = Arc::new(ComputeClientPool::new( + config.batch_exchange_connection_pool_size(), + )); let batch_env = BatchEnvironment::new( batch_mgr.clone(), advertise_addr.clone(), @@ -352,7 +354,9 @@ pub async fn compute_node_serve( ); // Initialize the streaming environment. - let stream_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); + let stream_client_pool = Arc::new(ComputeClientPool::new( + config.streaming_exchange_connection_pool_size(), + )); let stream_env = StreamEnvironment::new( advertise_addr.clone(), stream_config, diff --git a/src/config/docs.md b/src/config/docs.md index ab33559260162..df90c7dc6b6c5 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -89,7 +89,7 @@ This page is automatically generated by `./risedev generate-example-config` | Config | Description | Default | |--------|-------------|---------| -| connection_pool_size | | 16 | +| connection_pool_size | The default number of the connections when connecting to a gRPC server. For the connections used in streaming or batch exchange, please refer to the entries in `[stream.developer]` and `[batch.developer]` sections. This value will be used if they are not specified. | 16 | | grpc_max_reset_stream | | 200 | | heap_profiling | Enable heap profile dump when memory usage is high. | | | heartbeat_interval_ms | The interval for periodic heartbeat from worker to the meta service. | 1000 | diff --git a/src/config/example.toml b/src/config/example.toml index 67acc0acf5afe..c41d8e5c28a8a 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -122,6 +122,7 @@ stream_memory_controller_sequence_tls_step = 128 stream_memory_controller_sequence_tls_lag = 32 stream_enable_arrangement_backfill = true stream_high_join_amplification_threshold = 2048 +stream_exchange_connection_pool_size = 1 [storage] share_buffers_sync_parallelism = 1 diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 36f062cb1ba8f..615495c583274 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -242,10 +242,6 @@ impl FrontendEnv { ); info!("> version: {} ({})", RW_VERSION, GIT_SHA); - let batch_config = config.batch; - let meta_config = config.meta; - let streaming_config = config.streaming; - let frontend_address: HostAddr = opts .advertise_addr .as_ref() @@ -263,7 +259,7 @@ impl FrontendEnv { WorkerType::Frontend, &frontend_address, Default::default(), - &meta_config, + &config.meta, ) .await?; @@ -291,15 +287,16 @@ impl FrontendEnv { let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); - let compute_client_pool = - Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); + let compute_client_pool = Arc::new(ComputeClientPool::new( + config.batch_exchange_connection_pool_size(), + )); let query_manager = QueryManager::new( worker_node_manager.clone(), compute_client_pool.clone(), catalog_reader.clone(), Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()), - batch_config.distributed_query_limit, - batch_config.max_batch_queries_per_frontend_node, + config.batch.distributed_query_limit, + config.batch.max_batch_queries_per_frontend_node, ); let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default())); @@ -324,7 +321,7 @@ impl FrontendEnv { hummock_snapshot_manager.clone(), system_params_manager.clone(), session_params.clone(), - compute_client_pool, + compute_client_pool.clone(), ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node) @@ -334,8 +331,6 @@ impl FrontendEnv { meta_client.activate(&frontend_address).await?; - let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); - let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone()); let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone()); @@ -378,7 +373,7 @@ impl FrontendEnv { let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( Builder::new_multi_thread() - .worker_threads(batch_config.frontend_compute_runtime_worker_threads) + .worker_threads(config.batch.frontend_compute_runtime_worker_threads) .thread_name("rw-batch-local") .enable_all() .build() @@ -433,12 +428,12 @@ impl FrontendEnv { system_params_manager, session_params, server_addr: frontend_address, - client_pool, + client_pool: compute_client_pool, frontend_metrics, sessions_map, - batch_config, - meta_config, - streaming_config, + batch_config: config.batch, + meta_config: config.meta, + streaming_config: config.streaming, source_metrics, creating_streaming_job_tracker, compute_runtime, diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 641f56324d47b..eec4c88ba7286 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -48,6 +48,12 @@ use tonic::Streaming; use crate::error::{Result, RpcError}; use crate::{RpcClient, RpcClientPool}; +// TODO: this client has too many roles, e.g. +// - batch MPP task query execution +// - batch exchange +// - streaming exchange +// - general services specific to compute node, like monitoring, profiling, debugging, etc. +// We should consider splitting them into different clients. #[derive(Clone)] pub struct ComputeClient { pub exchange_client: ExchangeServiceClient,