Skip to content

Commit

Permalink
refactor: use separate configuration for exchange connection pool for…
Browse files Browse the repository at this point in the history
… batch and streaming (#17768) (#17808)

Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
kwannoel and BugenZhao authored Jul 25, 2024
1 parent c87ac64 commit b50d334
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 21 deletions.
53 changes: 52 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,14 @@ pub struct ServerConfig {
#[serde(default = "default::server::heartbeat_interval_ms")]
pub heartbeat_interval_ms: u32,

/// The default number of the connections when connecting to a gRPC server.
///
/// For the connections used in streaming or batch exchange, please refer to the entries in
/// `[stream.developer]` and `[batch.developer]` sections. This value will be used if they
/// are not specified.
#[serde(default = "default::server::connection_pool_size")]
pub connection_pool_size: u16,
// Intentionally made private to avoid abuse. Check the related methods on `RwConfig`.
connection_pool_size: u16,

/// Used for control the metrics level, similar to log level.
#[serde(default = "default::server::metrics_level")]
Expand Down Expand Up @@ -991,6 +997,11 @@ pub struct StreamingDeveloperConfig {
/// If number of hash join matches exceeds this threshold number,
/// it will be logged.
pub high_join_amplification_threshold: usize,

/// The number of the connections for streaming remote exchange between two nodes.
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::stream_exchange_connection_pool_size")]
pub exchange_connection_pool_size: Option<u16>,
}

/// The subsections `[batch.developer]`.
Expand All @@ -1010,6 +1021,11 @@ pub struct BatchDeveloperConfig {
/// The size of a chunk produced by `RowSeqScanExecutor`
#[serde(default = "default::developer::batch_chunk_size")]
pub chunk_size: usize,

/// The number of the connections for batch remote exchange between two nodes.
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::batch_exchange_connection_pool_size")]
exchange_connection_pool_size: Option<u16>,
}

macro_rules! define_system_config {
Expand Down Expand Up @@ -1249,6 +1265,30 @@ impl SystemConfig {
}
}

impl RwConfig {
pub const fn default_connection_pool_size(&self) -> u16 {
self.server.connection_pool_size
}

/// Returns [`StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
/// otherwise [`ServerConfig::connection_pool_size`].
pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
self.streaming
.developer
.exchange_connection_pool_size
.unwrap_or_else(|| self.default_connection_pool_size())
}

/// Returns [`BatchDeveloperConfig::exchange_connection_pool_size`] if set,
/// otherwise [`ServerConfig::connection_pool_size`].
pub fn batch_exchange_connection_pool_size(&self) -> u16 {
self.batch
.developer
.exchange_connection_pool_size
.unwrap_or_else(|| self.default_connection_pool_size())
}
}

pub mod default {
pub mod meta {
use crate::config::{DefaultParallelism, MetaBackend};
Expand Down Expand Up @@ -1722,6 +1762,12 @@ pub mod default {
1024
}

/// Default to unset to be compatible with the behavior before this config is introduced,
/// that is, follow the value of `server.connection_pool_size`.
pub fn batch_exchange_connection_pool_size() -> Option<u16> {
None
}

pub fn stream_enable_executor_row_count() -> bool {
false
}
Expand Down Expand Up @@ -1817,6 +1863,11 @@ pub mod default {
pub fn stream_high_join_amplification_threshold() -> usize {
2048
}

/// Default to 1 to be compatible with the behavior before this config is introduced.
pub fn stream_exchange_connection_pool_size() -> Option<u16> {
Some(1)
}
}

pub use crate::system_param::default as system;
Expand Down
8 changes: 6 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ pub async fn compute_node_serve(
));

// Initialize batch environment.
let batch_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_client_pool = Arc::new(ComputeClientPool::new(
config.batch_exchange_connection_pool_size(),
));
let batch_env = BatchEnvironment::new(
batch_mgr.clone(),
advertise_addr.clone(),
Expand All @@ -352,7 +354,9 @@ pub async fn compute_node_serve(
);

// Initialize the streaming environment.
let stream_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let stream_client_pool = Arc::new(ComputeClientPool::new(
config.streaming_exchange_connection_pool_size(),
));
let stream_env = StreamEnvironment::new(
advertise_addr.clone(),
stream_config,
Expand Down
2 changes: 1 addition & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ This page is automatically generated by `./risedev generate-example-config`

| Config | Description | Default |
|--------|-------------|---------|
| connection_pool_size | | 16 |
| connection_pool_size | The default number of the connections when connecting to a gRPC server. For the connections used in streaming or batch exchange, please refer to the entries in `[stream.developer]` and `[batch.developer]` sections. This value will be used if they are not specified. | 16 |
| grpc_max_reset_stream | | 200 |
| heap_profiling | Enable heap profile dump when memory usage is high. | |
| heartbeat_interval_ms | The interval for periodic heartbeat from worker to the meta service. | 1000 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ stream_memory_controller_sequence_tls_step = 128
stream_memory_controller_sequence_tls_lag = 32
stream_enable_arrangement_backfill = true
stream_high_join_amplification_threshold = 2048
stream_exchange_connection_pool_size = 1

[storage]
share_buffers_sync_parallelism = 1
Expand Down
29 changes: 12 additions & 17 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ impl FrontendEnv {
);
info!("> version: {} ({})", RW_VERSION, GIT_SHA);

let batch_config = config.batch;
let meta_config = config.meta;
let streaming_config = config.streaming;

let frontend_address: HostAddr = opts
.advertise_addr
.as_ref()
Expand All @@ -263,7 +259,7 @@ impl FrontendEnv {
WorkerType::Frontend,
&frontend_address,
Default::default(),
&meta_config,
&config.meta,
)
.await?;

Expand Down Expand Up @@ -291,15 +287,16 @@ impl FrontendEnv {
let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
let hummock_snapshot_manager =
Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
let compute_client_pool =
Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let compute_client_pool = Arc::new(ComputeClientPool::new(
config.batch_exchange_connection_pool_size(),
));
let query_manager = QueryManager::new(
worker_node_manager.clone(),
compute_client_pool.clone(),
catalog_reader.clone(),
Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
batch_config.distributed_query_limit,
batch_config.max_batch_queries_per_frontend_node,
config.batch.distributed_query_limit,
config.batch.max_batch_queries_per_frontend_node,
);

let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
Expand All @@ -324,7 +321,7 @@ impl FrontendEnv {
hummock_snapshot_manager.clone(),
system_params_manager.clone(),
session_params.clone(),
compute_client_pool,
compute_client_pool.clone(),
);
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
Expand All @@ -334,8 +331,6 @@ impl FrontendEnv {

meta_client.activate(&frontend_address).await?;

let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));

let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());

Expand Down Expand Up @@ -378,7 +373,7 @@ impl FrontendEnv {

let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
.worker_threads(batch_config.frontend_compute_runtime_worker_threads)
.worker_threads(config.batch.frontend_compute_runtime_worker_threads)
.thread_name("rw-batch-local")
.enable_all()
.build()
Expand Down Expand Up @@ -433,12 +428,12 @@ impl FrontendEnv {
system_params_manager,
session_params,
server_addr: frontend_address,
client_pool,
client_pool: compute_client_pool,
frontend_metrics,
sessions_map,
batch_config,
meta_config,
streaming_config,
batch_config: config.batch,
meta_config: config.meta,
streaming_config: config.streaming,
source_metrics,
creating_streaming_job_tracker,
compute_runtime,
Expand Down
6 changes: 6 additions & 0 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ use tonic::Streaming;
use crate::error::{Result, RpcError};
use crate::{RpcClient, RpcClientPool};

// TODO: this client has too many roles, e.g.
// - batch MPP task query execution
// - batch exchange
// - streaming exchange
// - general services specific to compute node, like monitoring, profiling, debugging, etc.
// We should consider splitting them into different clients.
#[derive(Clone)]
pub struct ComputeClient {
pub exchange_client: ExchangeServiceClient<Channel>,
Expand Down

0 comments on commit b50d334

Please sign in to comment.