diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8d6bd7f34b38..2a4466e4739b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -483,8 +483,14 @@ pub struct ServerConfig { #[serde(default = "default::server::heartbeat_interval_ms")] pub heartbeat_interval_ms: u32, + /// The default number of the connections when connecting to a gRPC server. + /// + /// For the connections used in streaming or batch exchange, please refer to the entries in + /// `[stream.developer]` and `[batch.developer]` sections. This value will be used if they + /// are not specified. #[serde(default = "default::server::connection_pool_size")] - pub connection_pool_size: u16, + // Intentionally made private to avoid abuse. Check the related methods on `RwConfig`. + connection_pool_size: u16, /// Used for control the metrics level, similar to log level. #[serde(default = "default::server::metrics_level")] @@ -1012,6 +1018,11 @@ pub struct StreamingDeveloperConfig { /// Actor tokio metrics is enabled if `enable_actor_tokio_metrics` is set or metrics level >= Debug. #[serde(default = "default::developer::enable_actor_tokio_metrics")] pub enable_actor_tokio_metrics: bool, + + /// The number of the connections for streaming remote exchange between two nodes. + /// If not specified, the value of `server.connection_pool_size` will be used. + #[serde(default = "default::developer::stream_exchange_connection_pool_size")] + pub exchange_connection_pool_size: Option, } /// The subsections `[batch.developer]`. @@ -1031,6 +1042,11 @@ pub struct BatchDeveloperConfig { /// The size of a chunk produced by `RowSeqScanExecutor` #[serde(default = "default::developer::batch_chunk_size")] pub chunk_size: usize, + + /// The number of the connections for batch remote exchange between two nodes. + /// If not specified, the value of `server.connection_pool_size` will be used. + #[serde(default = "default::developer::batch_exchange_connection_pool_size")] + exchange_connection_pool_size: Option, } macro_rules! define_system_config { @@ -1267,6 +1283,30 @@ impl SystemConfig { } } +impl RwConfig { + pub const fn default_connection_pool_size(&self) -> u16 { + self.server.connection_pool_size + } + + /// Returns [`StreamingDeveloperConfig::exchange_connection_pool_size`] if set, + /// otherwise [`ServerConfig::connection_pool_size`]. + pub fn streaming_exchange_connection_pool_size(&self) -> u16 { + self.streaming + .developer + .exchange_connection_pool_size + .unwrap_or_else(|| self.default_connection_pool_size()) + } + + /// Returns [`BatchDeveloperConfig::exchange_connection_pool_size`] if set, + /// otherwise [`ServerConfig::connection_pool_size`]. + pub fn batch_exchange_connection_pool_size(&self) -> u16 { + self.batch + .developer + .exchange_connection_pool_size + .unwrap_or_else(|| self.default_connection_pool_size()) + } +} + pub mod default { pub mod meta { use crate::config::{DefaultParallelism, MetaBackend}; @@ -1748,6 +1788,12 @@ pub mod default { 1024 } + /// Default to unset to be compatible with the behavior before this config is introduced, + /// that is, follow the value of `server.connection_pool_size`. + pub fn batch_exchange_connection_pool_size() -> Option { + None + } + pub fn stream_enable_executor_row_count() -> bool { false } @@ -1844,6 +1890,11 @@ pub mod default { 2048 } + /// Default to 1 to be compatible with the behavior before this config is introduced. + pub fn stream_exchange_connection_pool_size() -> Option { + Some(1) + } + pub fn enable_actor_tokio_metrics() -> bool { false } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 7ed2eb8abbe2..530810ef6a69 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -345,7 +345,9 @@ pub async fn compute_node_serve( )); // Initialize batch environment. - let batch_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); + let batch_client_pool = Arc::new(ComputeClientPool::new( + config.batch_exchange_connection_pool_size(), + )); let batch_env = BatchEnvironment::new( batch_mgr.clone(), advertise_addr.clone(), @@ -362,7 +364,9 @@ pub async fn compute_node_serve( ); // Initialize the streaming environment. - let stream_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); + let stream_client_pool = Arc::new(ComputeClientPool::new( + config.streaming_exchange_connection_pool_size(), + )); let stream_env = StreamEnvironment::new( advertise_addr.clone(), stream_config, diff --git a/src/config/docs.md b/src/config/docs.md index 563d9c5e1cbb..230d43648e3e 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -91,7 +91,7 @@ This page is automatically generated by `./risedev generate-example-config` | Config | Description | Default | |--------|-------------|---------| -| connection_pool_size | | 16 | +| connection_pool_size | The default number of the connections when connecting to a gRPC server. For the connections used in streaming or batch exchange, please refer to the entries in `[stream.developer]` and `[batch.developer]` sections. This value will be used if they are not specified. | 16 | | grpc_max_reset_stream | | 200 | | heap_profiling | Enable heap profile dump when memory usage is high. | | | heartbeat_interval_ms | The interval for periodic heartbeat from worker to the meta service. | 1000 | diff --git a/src/config/example.toml b/src/config/example.toml index 1623bc114ccd..8b689aa171f3 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -125,6 +125,7 @@ stream_memory_controller_sequence_tls_lag = 32 stream_enable_arrangement_backfill = true stream_high_join_amplification_threshold = 2048 stream_enable_actor_tokio_metrics = false +stream_exchange_connection_pool_size = 1 [storage] share_buffers_sync_parallelism = 1 diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7dffb5d34bea..8266fd48fcbf 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -248,10 +248,6 @@ impl FrontendEnv { ); info!("> version: {} ({})", RW_VERSION, GIT_SHA); - let batch_config = config.batch; - let meta_config = config.meta; - let streaming_config = config.streaming; - let frontend_address: HostAddr = opts .advertise_addr .as_ref() @@ -269,7 +265,7 @@ impl FrontendEnv { WorkerType::Frontend, &frontend_address, Default::default(), - &meta_config, + &config.meta, ) .await?; @@ -297,15 +293,16 @@ impl FrontendEnv { let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); - let compute_client_pool = - Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); + let compute_client_pool = Arc::new(ComputeClientPool::new( + config.batch_exchange_connection_pool_size(), + )); let query_manager = QueryManager::new( worker_node_manager.clone(), compute_client_pool.clone(), catalog_reader.clone(), Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()), - batch_config.distributed_query_limit, - batch_config.max_batch_queries_per_frontend_node, + config.batch.distributed_query_limit, + config.batch.max_batch_queries_per_frontend_node, ); let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default())); @@ -336,7 +333,7 @@ impl FrontendEnv { hummock_snapshot_manager.clone(), system_params_manager.clone(), session_params.clone(), - compute_client_pool, + compute_client_pool.clone(), ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node) @@ -346,8 +343,6 @@ impl FrontendEnv { meta_client.activate(&frontend_address).await?; - let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); - let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone()); let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone()); let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone()); @@ -391,7 +386,7 @@ impl FrontendEnv { let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( Builder::new_multi_thread() - .worker_threads(batch_config.frontend_compute_runtime_worker_threads) + .worker_threads(config.batch.frontend_compute_runtime_worker_threads) .thread_name("rw-batch-local") .enable_all() .build() @@ -446,13 +441,13 @@ impl FrontendEnv { system_params_manager, session_params, server_addr: frontend_address, - client_pool, + client_pool: compute_client_pool, frontend_metrics, spill_metrics, sessions_map, - batch_config, - meta_config, - streaming_config, + batch_config: config.batch, + meta_config: config.meta, + streaming_config: config.streaming, source_metrics, creating_streaming_job_tracker, compute_runtime, diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index e721567cf96a..c065bb693595 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -48,6 +48,12 @@ use tonic::Streaming; use crate::error::{Result, RpcError}; use crate::{RpcClient, RpcClientPool}; +// TODO: this client has too many roles, e.g. +// - batch MPP task query execution +// - batch exchange +// - streaming exchange +// - general services specific to compute node, like monitoring, profiling, debugging, etc. +// We should consider splitting them into different clients. #[derive(Clone)] pub struct ComputeClient { pub exchange_client: ExchangeServiceClient,