Skip to content

Commit

Permalink
feat: expose connection options for sql meta store (#19040)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and yezizp2012 committed Oct 24, 2024
1 parent 219dffa commit e45e9d1
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 12 deletions.
51 changes: 51 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ pub struct MetaConfig {
/// Whether compactor should rewrite row to remove dropped column.
#[serde(default = "default::meta::enable_dropped_column_reclaim")]
pub enable_dropped_column_reclaim: bool,

#[serde(default)]
#[config_doc(nested)]
pub meta_store_config: MetaStoreConfig,
}

#[derive(Copy, Clone, Debug, Default)]
Expand Down Expand Up @@ -2271,6 +2275,34 @@ pub mod default {
}
}
}

pub mod meta_store_config {
const DEFAULT_MAX_CONNECTIONS: u32 = 10;
const DEFAULT_MIN_CONNECTIONS: u32 = 1;
const DEFAULT_CONNECTION_TIMEOUT_SEC: u64 = 10;
const DEFAULT_IDLE_TIMEOUT_SEC: u64 = 30;
const DEFAULT_ACQUIRE_TIMEOUT_SEC: u64 = 30;

pub fn max_connections() -> u32 {
DEFAULT_MAX_CONNECTIONS
}

pub fn min_connections() -> u32 {
DEFAULT_MIN_CONNECTIONS
}

pub fn connection_timeout_sec() -> u64 {
DEFAULT_CONNECTION_TIMEOUT_SEC
}

pub fn idle_timeout_sec() -> u64 {
DEFAULT_IDLE_TIMEOUT_SEC
}

pub fn acquire_timeout_sec() -> u64 {
DEFAULT_ACQUIRE_TIMEOUT_SEC
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -2485,6 +2517,25 @@ pub struct CompactionConfig {
pub max_level: u32,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
pub struct MetaStoreConfig {
/// Maximum number of connections for the meta store connection pool.
#[serde(default = "default::meta_store_config::max_connections")]
pub max_connections: u32,
/// Minimum number of connections for the meta store connection pool.
#[serde(default = "default::meta_store_config::min_connections")]
pub min_connections: u32,
/// Connection timeout in seconds for a meta store connection.
#[serde(default = "default::meta_store_config::connection_timeout_sec")]
pub connection_timeout_sec: u64,
/// Idle timeout in seconds for a meta store connection.
#[serde(default = "default::meta_store_config::idle_timeout_sec")]
pub idle_timeout_sec: u64,
/// Acquire timeout in seconds for a meta store connection.
#[serde(default = "default::meta_store_config::acquire_timeout_sec")]
pub acquire_timeout_sec: u64,
}

#[cfg(test)]
mod tests {
use risingwave_license::LicenseKey;
Expand Down
10 changes: 10 additions & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ This page is automatically generated by `./risedev generate-example-config`
| target_file_size_base | | 33554432 |
| tombstone_reclaim_ratio | | 40 |

## meta.meta_store_config

| Config | Description | Default |
|--------|-------------|---------|
| acquire_timeout_sec | Acquire timeout in seconds for a meta store connection. | 30 |
| connection_timeout_sec | Connection timeout in seconds for a meta store connection. | 10 |
| idle_timeout_sec | Idle timeout in seconds for a meta store connection. | 30 |
| max_connections | Maximum number of connections for the meta store connection pool. | 10 |
| min_connections | Minimum number of connections for the meta store connection pool. | 1 |

## server

| Config | Description | Default |
Expand Down
7 changes: 7 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400
meta_hummock_time_travel_sst_info_fetch_batch_size = 10000

[meta.meta_store_config]
max_connections = 10
min_connections = 1
connection_timeout_sec = 10
idle_timeout_sec = 30
acquire_timeout_sec = 30

[batch]
enable_barrier_read = false
statement_timeout_in_sec = 3600
Expand Down
5 changes: 5 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ pub fn start(
let listen_addr = opts.listen_addr.parse().unwrap();
let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
let meta_store_config = config.meta.meta_store_config.clone();
let backend = match config.meta.backend {
MetaBackend::Mem => MetaStoreBackend::Mem,
MetaBackend::Sql => MetaStoreBackend::Sql {
Expand All @@ -239,6 +240,7 @@ pub fn start(
.expect("sql endpoint is required")
.expose_secret()
.to_string(),
config: meta_store_config,
},
MetaBackend::Sqlite => MetaStoreBackend::Sql {
endpoint: format!(
Expand All @@ -247,6 +249,7 @@ pub fn start(
.expect("sql endpoint is required")
.expose_secret()
),
config: meta_store_config,
},
MetaBackend::Postgres => MetaStoreBackend::Sql {
endpoint: format!(
Expand All @@ -258,6 +261,7 @@ pub fn start(
.expose_secret(),
opts.sql_database
),
config: meta_store_config,
},
MetaBackend::Mysql => MetaStoreBackend::Sql {
endpoint: format!(
Expand All @@ -269,6 +273,7 @@ pub fn start(
.expose_secret(),
opts.sql_database
),
config: meta_store_config,
},
};
validate_config(&config);
Expand Down
11 changes: 6 additions & 5 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ pub async fn rpc_serve(
)
.await
}
MetaStoreBackend::Sql { endpoint } => {
MetaStoreBackend::Sql { endpoint, config } => {
let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint);
options
.max_connections(10)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30))
.acquire_timeout(Duration::from_secs(30));
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));

if is_sqlite {
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
Expand Down
18 changes: 12 additions & 6 deletions src/meta/src/backup_restore/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;

use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use sea_orm::DbBackend;
Expand All @@ -32,41 +32,47 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
MetaBackend::Mem => MetaStoreBackend::Mem,
MetaBackend::Sql => MetaStoreBackend::Sql {
endpoint: opts.sql_endpoint,
config: MetaStoreConfig::default(),
},
MetaBackend::Sqlite => MetaStoreBackend::Sql {
endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint),
config: MetaStoreConfig::default(),
},
MetaBackend::Postgres => MetaStoreBackend::Sql {
endpoint: format!(
"postgres://{}:{}@{}/{}",
opts.sql_username, opts.sql_password, opts.sql_endpoint, opts.sql_database
),
config: MetaStoreConfig::default(),
},
MetaBackend::Mysql => MetaStoreBackend::Sql {
endpoint: format!(
"mysql://{}:{}@{}/{}",
opts.sql_username, opts.sql_password, opts.sql_endpoint, opts.sql_database
),
config: MetaStoreConfig::default(),
},
};
match meta_store_backend {
MetaStoreBackend::Mem => {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Ok(SqlMetaStore::new(conn))
}
MetaStoreBackend::Sql { endpoint } => {
MetaStoreBackend::Sql { endpoint, config } => {
let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) {
// Due to the fact that Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
// here we forcibly specify the number of connections as 1.
1
} else {
10
config.max_connections
};
let mut options = sea_orm::ConnectOptions::new(endpoint);
options
.max_connections(max_connection)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connection_timeout_sec))
.idle_timeout(Duration::from_secs(config.idle_timeout_sec))
.acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
let conn = sea_orm::Database::connect(options)
.await
.map_err(|e| BackupError::MetaStorage(e.into()))?;
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ pub mod stream;
pub mod telemetry;

pub use error::{MetaError, MetaResult};
use risingwave_common::config::MetaStoreConfig;
pub use rpc::{ElectionClient, ElectionMember};

use crate::manager::MetaOpts;

#[derive(Debug)]
pub enum MetaStoreBackend {
Mem,
Sql { endpoint: String },
Sql {
endpoint: String,
config: MetaStoreConfig,
},
}

0 comments on commit e45e9d1

Please sign in to comment.