From 57f31d14c8e9eeeb49363fa7d8df39bb92e88626 Mon Sep 17 00:00:00 2001
From: LFC <990479+MichaelScofield@users.noreply.github.com>
Date: Mon, 25 Nov 2024 11:49:54 +0800
Subject: [PATCH] refactor: expose configs for http clients used in object
store (#5041)
---
config/config.md | 10 +++++++
config/datanode.example.toml | 17 ++++++++++++
config/standalone.example.toml | 17 ++++++++++++
src/datanode/src/config.rs | 46 ++++++++++++++++++++++++++++++++
src/datanode/src/store.rs | 23 +++++++++-------
src/datanode/src/store/azblob.rs | 4 ++-
src/datanode/src/store/gcs.rs | 4 ++-
src/datanode/src/store/oss.rs | 4 ++-
src/datanode/src/store/s3.rs | 4 ++-
tests-integration/tests/http.rs | 27 ++++++++++++++++---
10 files changed, 138 insertions(+), 18 deletions(-)
diff --git a/config/config.md b/config/config.md
index d8164c11d493..15025b871125 100644
--- a/config/config.md
+++ b/config/config.md
@@ -109,6 +109,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.
**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.
**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.
**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
+| `storage.http_client` | -- | -- | The http client options to the storage.
**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
+| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
+| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
+| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.
Also considered a total deadline. |
+| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -432,6 +437,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.
**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.
**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.
**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
+| `storage.http_client` | -- | -- | The http client options to the storage.
**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
+| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
+| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
+| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.
Also considered a total deadline. |
+| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 1bc084ad40e4..c5fdd24ebe14 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -375,6 +375,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
+## The http client options to the storage.
+## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
+[storage.http_client]
+
+## The maximum idle connection per host allowed in the pool.
+pool_max_idle_per_host = 1024
+
+## The timeout for only the connect phase of a http client.
+connect_timeout = "30s"
+
+## The total request timeout, applied from when the request starts connecting until the response body has finished.
+## Also considered a total deadline.
+timeout = "30s"
+
+## The timeout for idle sockets being kept-alive.
+pool_idle_timeout = "90s"
+
# Custom storage options
# [[storage.providers]]
# name = "S3"
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 6d7755c7e6cd..deaf8900f213 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -413,6 +413,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
+## The http client options to the storage.
+## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
+[storage.http_client]
+
+## The maximum idle connection per host allowed in the pool.
+pool_max_idle_per_host = 1024
+
+## The timeout for only the connect phase of a http client.
+connect_timeout = "30s"
+
+## The total request timeout, applied from when the request starts connecting until the response body has finished.
+## Also considered a total deadline.
+timeout = "30s"
+
+## The timeout for idle sockets being kept-alive.
+pool_idle_timeout = "90s"
+
# Custom storage options
# [[storage.providers]]
# name = "S3"
diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs
index dae9a65581ba..4fedb9ea2cc0 100644
--- a/src/datanode/src/config.rs
+++ b/src/datanode/src/config.rs
@@ -14,6 +14,8 @@
//! Datanode configurations
+use core::time::Duration;
+
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
@@ -112,6 +114,38 @@ pub struct ObjectStorageCacheConfig {
pub cache_capacity: Option,
}
+/// The http client options to the storage.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[serde(default)]
+pub struct HttpClientConfig {
+ /// The maximum idle connection per host allowed in the pool.
+ pub(crate) pool_max_idle_per_host: u32,
+
+ /// The timeout for only the connect phase of a http client.
+ #[serde(with = "humantime_serde")]
+ pub(crate) connect_timeout: Duration,
+
+ /// The total request timeout, applied from when the request starts connecting until the response body has finished.
+ /// Also considered a total deadline.
+ #[serde(with = "humantime_serde")]
+ pub(crate) timeout: Duration,
+
+ /// The timeout for idle sockets being kept-alive.
+ #[serde(with = "humantime_serde")]
+ pub(crate) pool_idle_timeout: Duration,
+}
+
+impl Default for HttpClientConfig {
+ fn default() -> Self {
+ Self {
+ pool_max_idle_per_host: 1024,
+ connect_timeout: Duration::from_secs(30),
+ timeout: Duration::from_secs(30),
+ pool_idle_timeout: Duration::from_secs(90),
+ }
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
@@ -126,6 +160,7 @@ pub struct S3Config {
pub region: Option,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
+ pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
@@ -138,6 +173,7 @@ impl PartialEq for S3Config {
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.cache == other.cache
+ && self.http_client == other.http_client
}
}
@@ -154,6 +190,7 @@ pub struct OssConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
+ pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
@@ -165,6 +202,7 @@ impl PartialEq for OssConfig {
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
+ && self.http_client == other.http_client
}
}
@@ -182,6 +220,7 @@ pub struct AzblobConfig {
pub sas_token: Option,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
+ pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
@@ -194,6 +233,7 @@ impl PartialEq for AzblobConfig {
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
+ && self.http_client == other.http_client
}
}
@@ -211,6 +251,7 @@ pub struct GcsConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
+ pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
@@ -223,6 +264,7 @@ impl PartialEq for GcsConfig {
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
+ && self.http_client == other.http_client
}
}
@@ -237,6 +279,7 @@ impl Default for S3Config {
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
+ http_client: HttpClientConfig::default(),
}
}
}
@@ -251,6 +294,7 @@ impl Default for OssConfig {
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
+ http_client: HttpClientConfig::default(),
}
}
}
@@ -266,6 +310,7 @@ impl Default for AzblobConfig {
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
+ http_client: HttpClientConfig::default(),
}
}
}
@@ -281,6 +326,7 @@ impl Default for GcsConfig {
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
+ http_client: HttpClientConfig::default(),
}
}
}
diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs
index 16b6e0bc8be5..d55f9fa57b30 100644
--- a/src/datanode/src/store.rs
+++ b/src/datanode/src/store.rs
@@ -32,7 +32,7 @@ use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
-use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
+use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_raw_object_store(
@@ -177,7 +177,7 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}
-pub(crate) fn build_http_client() -> Result {
+pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();
@@ -186,25 +186,28 @@ pub(crate) fn build_http_client() -> Result {
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::().ok())
- .unwrap_or(usize::MAX);
+ .inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST' might be deprecated in the future. Please set it in the config file instead."))
+ .unwrap_or(config.pool_max_idle_per_host as usize);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
- .and_then(|v| v.parse::().ok())
- .unwrap_or(30);
- builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
+ .and_then(|v| v.parse::().ok().map(Duration::from_secs))
+ .inspect(|_| warn!("'_GREPTIMEDB_HTTP_CONNECT_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
+ .unwrap_or(config.connect_timeout);
+ builder = builder.connect_timeout(connect_timeout);
// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
- .and_then(|v| v.parse::().ok())
- .unwrap_or(90);
+ .and_then(|v| v.parse::().ok().map(Duration::from_secs))
+ .inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
+ .unwrap_or(config.pool_idle_timeout);
- builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
+ builder = builder.pool_idle_timeout(idle_timeout);
- builder
+ builder.timeout(config.timeout)
};
HttpClient::build(http_builder).context(error::InitBackendSnafu)
diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs
index ca7a5023a90e..bf7b885de9ff 100644
--- a/src/datanode/src/store/azblob.rs
+++ b/src/datanode/src/store/azblob.rs
@@ -30,13 +30,15 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);
+ let client = build_http_client(&azblob_config.http_client)?;
+
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
- .http_client(build_http_client()?);
+ .http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);
diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs
index f982ec5f1578..60d25d7c7eed 100644
--- a/src/datanode/src/store/gcs.rs
+++ b/src/datanode/src/store/gcs.rs
@@ -29,6 +29,8 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result Result Result Result