Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use separate configuration for exchange connection pool for batch and streaming (#17768) #17808

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,14 @@ pub struct ServerConfig {
#[serde(default = "default::server::heartbeat_interval_ms")]
pub heartbeat_interval_ms: u32,

/// The default number of the connections when connecting to a gRPC server.
///
/// For the connections used in streaming or batch exchange, please refer to the entries in
/// `[stream.developer]` and `[batch.developer]` sections. This value will be used if they
/// are not specified.
#[serde(default = "default::server::connection_pool_size")]
pub connection_pool_size: u16,
// Intentionally made private to avoid abuse. Check the related methods on `RwConfig`.
connection_pool_size: u16,

/// Used for control the metrics level, similar to log level.
#[serde(default = "default::server::metrics_level")]
Expand Down Expand Up @@ -991,6 +997,11 @@ pub struct StreamingDeveloperConfig {
/// If number of hash join matches exceeds this threshold number,
/// it will be logged.
pub high_join_amplification_threshold: usize,

/// The number of the connections for streaming remote exchange between two nodes.
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::stream_exchange_connection_pool_size")]
pub exchange_connection_pool_size: Option<u16>,
}

/// The subsections `[batch.developer]`.
Expand All @@ -1010,6 +1021,11 @@ pub struct BatchDeveloperConfig {
/// The size of a chunk produced by `RowSeqScanExecutor`
#[serde(default = "default::developer::batch_chunk_size")]
pub chunk_size: usize,

/// The number of the connections for batch remote exchange between two nodes.
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::batch_exchange_connection_pool_size")]
exchange_connection_pool_size: Option<u16>,
}

macro_rules! define_system_config {
Expand Down Expand Up @@ -1249,6 +1265,30 @@ impl SystemConfig {
}
}

impl RwConfig {
pub const fn default_connection_pool_size(&self) -> u16 {
self.server.connection_pool_size
}

/// Returns [`StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
/// otherwise [`ServerConfig::connection_pool_size`].
pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
self.streaming
.developer
.exchange_connection_pool_size
.unwrap_or_else(|| self.default_connection_pool_size())
}

/// Returns [`BatchDeveloperConfig::exchange_connection_pool_size`] if set,
/// otherwise [`ServerConfig::connection_pool_size`].
pub fn batch_exchange_connection_pool_size(&self) -> u16 {
self.batch
.developer
.exchange_connection_pool_size
.unwrap_or_else(|| self.default_connection_pool_size())
}
}

pub mod default {
pub mod meta {
use crate::config::{DefaultParallelism, MetaBackend};
Expand Down Expand Up @@ -1722,6 +1762,12 @@ pub mod default {
1024
}

/// Default to unset to be compatible with the behavior before this config is introduced,
/// that is, follow the value of `server.connection_pool_size`.
pub fn batch_exchange_connection_pool_size() -> Option<u16> {
None
}

pub fn stream_enable_executor_row_count() -> bool {
false
}
Expand Down Expand Up @@ -1817,6 +1863,11 @@ pub mod default {
pub fn stream_high_join_amplification_threshold() -> usize {
2048
}

/// Default to 1 to be compatible with the behavior before this config is introduced.
pub fn stream_exchange_connection_pool_size() -> Option<u16> {
Some(1)
}
}

pub use crate::system_param::default as system;
Expand Down
8 changes: 6 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ pub async fn compute_node_serve(
));

// Initialize batch environment.
let batch_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_client_pool = Arc::new(ComputeClientPool::new(
config.batch_exchange_connection_pool_size(),
));
let batch_env = BatchEnvironment::new(
batch_mgr.clone(),
advertise_addr.clone(),
Expand All @@ -352,7 +354,9 @@ pub async fn compute_node_serve(
);

// Initialize the streaming environment.
let stream_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let stream_client_pool = Arc::new(ComputeClientPool::new(
config.streaming_exchange_connection_pool_size(),
));
let stream_env = StreamEnvironment::new(
advertise_addr.clone(),
stream_config,
Expand Down
2 changes: 1 addition & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ This page is automatically generated by `./risedev generate-example-config`

| Config | Description | Default |
|--------|-------------|---------|
| connection_pool_size | | 16 |
| connection_pool_size | The default number of the connections when connecting to a gRPC server. For the connections used in streaming or batch exchange, please refer to the entries in `[stream.developer]` and `[batch.developer]` sections. This value will be used if they are not specified. | 16 |
| grpc_max_reset_stream | | 200 |
| heap_profiling | Enable heap profile dump when memory usage is high. | |
| heartbeat_interval_ms | The interval for periodic heartbeat from worker to the meta service. | 1000 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ stream_memory_controller_sequence_tls_step = 128
stream_memory_controller_sequence_tls_lag = 32
stream_enable_arrangement_backfill = true
stream_high_join_amplification_threshold = 2048
stream_exchange_connection_pool_size = 1

[storage]
share_buffers_sync_parallelism = 1
Expand Down
29 changes: 12 additions & 17 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ impl FrontendEnv {
);
info!("> version: {} ({})", RW_VERSION, GIT_SHA);

let batch_config = config.batch;
let meta_config = config.meta;
let streaming_config = config.streaming;

let frontend_address: HostAddr = opts
.advertise_addr
.as_ref()
Expand All @@ -263,7 +259,7 @@ impl FrontendEnv {
WorkerType::Frontend,
&frontend_address,
Default::default(),
&meta_config,
&config.meta,
)
.await?;

Expand Down Expand Up @@ -291,15 +287,16 @@ impl FrontendEnv {
let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
let hummock_snapshot_manager =
Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
let compute_client_pool =
Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let compute_client_pool = Arc::new(ComputeClientPool::new(
config.batch_exchange_connection_pool_size(),
));
let query_manager = QueryManager::new(
worker_node_manager.clone(),
compute_client_pool.clone(),
catalog_reader.clone(),
Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
batch_config.distributed_query_limit,
batch_config.max_batch_queries_per_frontend_node,
config.batch.distributed_query_limit,
config.batch.max_batch_queries_per_frontend_node,
);

let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
Expand All @@ -324,7 +321,7 @@ impl FrontendEnv {
hummock_snapshot_manager.clone(),
system_params_manager.clone(),
session_params.clone(),
compute_client_pool,
compute_client_pool.clone(),
);
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
Expand All @@ -334,8 +331,6 @@ impl FrontendEnv {

meta_client.activate(&frontend_address).await?;

let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));

let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());

Expand Down Expand Up @@ -378,7 +373,7 @@ impl FrontendEnv {

let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
.worker_threads(batch_config.frontend_compute_runtime_worker_threads)
.worker_threads(config.batch.frontend_compute_runtime_worker_threads)
.thread_name("rw-batch-local")
.enable_all()
.build()
Expand Down Expand Up @@ -433,12 +428,12 @@ impl FrontendEnv {
system_params_manager,
session_params,
server_addr: frontend_address,
client_pool,
client_pool: compute_client_pool,
frontend_metrics,
sessions_map,
batch_config,
meta_config,
streaming_config,
batch_config: config.batch,
meta_config: config.meta,
streaming_config: config.streaming,
source_metrics,
creating_streaming_job_tracker,
compute_runtime,
Expand Down
6 changes: 6 additions & 0 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ use tonic::Streaming;
use crate::error::{Result, RpcError};
use crate::{RpcClient, RpcClientPool};

// TODO: this client has too many roles, e.g.
// - batch MPP task query execution
// - batch exchange
// - streaming exchange
// - general services specific to compute node, like monitoring, profiling, debugging, etc.
// We should consider splitting them into different clients.
#[derive(Clone)]
pub struct ComputeClient {
pub exchange_client: ExchangeServiceClient<Channel>,
Expand Down
Loading