diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 541d4896a6def..9766f5b601a3f 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -385,6 +385,10 @@ pub struct MetaConfig { /// Whether compactor should rewrite row to remove dropped column. #[serde(default = "default::meta::enable_dropped_column_reclaim")] pub enable_dropped_column_reclaim: bool, + + #[serde(default)] + #[config_doc(nested)] + pub meta_store_config: MetaStoreConfig, } #[derive(Copy, Clone, Debug, Default)] @@ -2271,6 +2275,34 @@ pub mod default { } } } + + pub mod meta_store_config { + const DEFAULT_MAX_CONNECTIONS: u32 = 10; + const DEFAULT_MIN_CONNECTIONS: u32 = 1; + const DEFAULT_CONNECTION_TIMEOUT_SEC: u64 = 10; + const DEFAULT_IDLE_TIMEOUT_SEC: u64 = 30; + const DEFAULT_ACQUIRE_TIMEOUT_SEC: u64 = 30; + + pub fn max_connections() -> u32 { + DEFAULT_MAX_CONNECTIONS + } + + pub fn min_connections() -> u32 { + DEFAULT_MIN_CONNECTIONS + } + + pub fn connection_timeout_sec() -> u64 { + DEFAULT_CONNECTION_TIMEOUT_SEC + } + + pub fn idle_timeout_sec() -> u64 { + DEFAULT_IDLE_TIMEOUT_SEC + } + + pub fn acquire_timeout_sec() -> u64 { + DEFAULT_ACQUIRE_TIMEOUT_SEC + } + } } #[derive(Debug, Clone)] @@ -2485,6 +2517,25 @@ pub struct CompactionConfig { pub max_level: u32, } +#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)] +pub struct MetaStoreConfig { + /// Maximum number of connections for the meta store connection pool. + #[serde(default = "default::meta_store_config::max_connections")] + pub max_connections: u32, + /// Minimum number of connections for the meta store connection pool. + #[serde(default = "default::meta_store_config::min_connections")] + pub min_connections: u32, + /// Connection timeout in seconds for a meta store connection. + #[serde(default = "default::meta_store_config::connection_timeout_sec")] + pub connection_timeout_sec: u64, + /// Idle timeout in seconds for a meta store connection. + #[serde(default = "default::meta_store_config::idle_timeout_sec")] + pub idle_timeout_sec: u64, + /// Acquire timeout in seconds for a meta store connection. + #[serde(default = "default::meta_store_config::acquire_timeout_sec")] + pub acquire_timeout_sec: u64, +} + #[cfg(test)] mod tests { use risingwave_license::LicenseKey; diff --git a/src/config/docs.md b/src/config/docs.md index 1c4d6a351ae52..99cd5f2b4dd2e 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -87,6 +87,16 @@ This page is automatically generated by `./risedev generate-example-config` | target_file_size_base | | 33554432 | | tombstone_reclaim_ratio | | 40 | +## meta.meta_store_config + +| Config | Description | Default | +|--------|-------------|---------| +| acquire_timeout_sec | Acquire timeout in seconds for a meta store connection. | 30 | +| connection_timeout_sec | Connection timeout in seconds for a meta store connection. | 10 | +| idle_timeout_sec | Idle timeout in seconds for a meta store connection. | 30 | +| max_connections | Maximum number of connections for the meta store connection pool. | 10 | +| min_connections | Minimum number of connections for the meta store connection pool. | 1 | + ## server | Config | Description | Default | diff --git a/src/config/example.toml b/src/config/example.toml index f285720dc6b21..f097df23fc528 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -87,6 +87,13 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100 meta_actor_cnt_per_worker_parallelism_hard_limit = 400 meta_hummock_time_travel_sst_info_fetch_batch_size = 10000 +[meta.meta_store_config] +max_connections = 10 +min_connections = 1 +connection_timeout_sec = 10 +idle_timeout_sec = 30 +acquire_timeout_sec = 30 + [batch] enable_barrier_read = false statement_timeout_in_sec = 3600 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index eacd1a65517ee..1b28f0eb63875 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -231,6 +231,7 @@ pub fn start( let listen_addr = opts.listen_addr.parse().unwrap(); let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap()); + let meta_store_config = config.meta.meta_store_config.clone(); let backend = match config.meta.backend { MetaBackend::Mem => MetaStoreBackend::Mem, MetaBackend::Sql => MetaStoreBackend::Sql { @@ -239,6 +240,7 @@ pub fn start( .expect("sql endpoint is required") .expose_secret() .to_string(), + config: meta_store_config, }, MetaBackend::Sqlite => MetaStoreBackend::Sql { endpoint: format!( @@ -247,6 +249,7 @@ pub fn start( .expect("sql endpoint is required") .expose_secret() ), + config: meta_store_config, }, MetaBackend::Postgres => MetaStoreBackend::Sql { endpoint: format!( @@ -258,6 +261,7 @@ pub fn start( .expose_secret(), opts.sql_database ), + config: meta_store_config, }, MetaBackend::Mysql => MetaStoreBackend::Sql { endpoint: format!( @@ -269,6 +273,7 @@ pub fn start( .expose_secret(), opts.sql_database ), + config: meta_store_config, }, }; validate_config(&config); diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index b556c4ca34524..c03914498d81a 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -147,14 +147,15 @@ pub async fn rpc_serve( ) .await } - MetaStoreBackend::Sql { endpoint } => { + MetaStoreBackend::Sql { endpoint, config } => { let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint); let mut options = sea_orm::ConnectOptions::new(endpoint); options - .max_connections(10) - .connect_timeout(Duration::from_secs(10)) - .idle_timeout(Duration::from_secs(30)) - .acquire_timeout(Duration::from_secs(30)); + .max_connections(config.max_connections) + .min_connections(config.min_connections) + .connect_timeout(Duration::from_secs(config.connection_timeout_sec)) + .idle_timeout(Duration::from_secs(config.idle_timeout_sec)) + .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec)); if is_sqlite { // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index fd7039af95fca..0946ce74e5fbf 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -17,7 +17,7 @@ use std::time::Duration; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; -use risingwave_common::config::{MetaBackend, ObjectStoreConfig}; +use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig}; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use sea_orm::DbBackend; @@ -32,21 +32,25 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult { MetaBackend::Mem => MetaStoreBackend::Mem, MetaBackend::Sql => MetaStoreBackend::Sql { endpoint: opts.sql_endpoint, + config: MetaStoreConfig::default(), }, MetaBackend::Sqlite => MetaStoreBackend::Sql { endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint), + config: MetaStoreConfig::default(), }, MetaBackend::Postgres => MetaStoreBackend::Sql { endpoint: format!( "postgres://{}:{}@{}/{}", opts.sql_username, opts.sql_password, opts.sql_endpoint, opts.sql_database ), + config: MetaStoreConfig::default(), }, MetaBackend::Mysql => MetaStoreBackend::Sql { endpoint: format!( "mysql://{}:{}@{}/{}", opts.sql_username, opts.sql_password, opts.sql_endpoint, opts.sql_database ), + config: MetaStoreConfig::default(), }, }; match meta_store_backend { @@ -54,19 +58,21 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult { let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap(); Ok(SqlMetaStore::new(conn)) } - MetaStoreBackend::Sql { endpoint } => { + MetaStoreBackend::Sql { endpoint, config } => { let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) { - // Due to the fact that Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, + // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, // here we forcibly specify the number of connections as 1. 1 } else { - 10 + config.max_connections }; let mut options = sea_orm::ConnectOptions::new(endpoint); options .max_connections(max_connection) - .connect_timeout(Duration::from_secs(10)) - .idle_timeout(Duration::from_secs(30)); + .min_connections(config.min_connections) + .connect_timeout(Duration::from_secs(config.connection_timeout_sec)) + .idle_timeout(Duration::from_secs(config.idle_timeout_sec)) + .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec)); let conn = sea_orm::Database::connect(options) .await .map_err(|e| BackupError::MetaStorage(e.into()))?; diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 69d9dc21a075a..8d65cf00f1a4d 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -49,6 +49,7 @@ pub mod stream; pub mod telemetry; pub use error::{MetaError, MetaResult}; +use risingwave_common::config::MetaStoreConfig; pub use rpc::{ElectionClient, ElectionMember}; use crate::manager::MetaOpts; @@ -56,5 +57,8 @@ use crate::manager::MetaOpts; #[derive(Debug)] pub enum MetaStoreBackend { Mem, - Sql { endpoint: String }, + Sql { + endpoint: String, + config: MetaStoreConfig, + }, }