Skip to content

Commit

Permalink
refactor: expose configs for http clients used in object store (#5041)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield authored Nov 25, 2024
1 parent 1cd6abb commit 57f31d1
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 18 deletions.
10 changes: 10 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
Expand Down Expand Up @@ -432,6 +437,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
Expand Down
17 changes: 17 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"

## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]

## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024

## The timeout for only the connect phase of a http client.
connect_timeout = "30s"

## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"

## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"

# Custom storage options
# [[storage.providers]]
# name = "S3"
Expand Down
17 changes: 17 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"

## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]

## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024

## The timeout for only the connect phase of a http client.
connect_timeout = "30s"

## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"

## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"

# Custom storage options
# [[storage.providers]]
# name = "S3"
Expand Down
46 changes: 46 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Datanode configurations
use core::time::Duration;

use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
Expand Down Expand Up @@ -112,6 +114,38 @@ pub struct ObjectStorageCacheConfig {
pub cache_capacity: Option<ReadableSize>,
}

/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,

/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,

/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,

/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
}

impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
Expand All @@ -126,6 +160,7 @@ pub struct S3Config {
pub region: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}

impl PartialEq for S3Config {
Expand All @@ -138,6 +173,7 @@ impl PartialEq for S3Config {
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}

Expand All @@ -154,6 +190,7 @@ pub struct OssConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}

impl PartialEq for OssConfig {
Expand All @@ -165,6 +202,7 @@ impl PartialEq for OssConfig {
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}

Expand All @@ -182,6 +220,7 @@ pub struct AzblobConfig {
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}

impl PartialEq for AzblobConfig {
Expand All @@ -194,6 +233,7 @@ impl PartialEq for AzblobConfig {
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}

Expand All @@ -211,6 +251,7 @@ pub struct GcsConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}

impl PartialEq for GcsConfig {
Expand All @@ -223,6 +264,7 @@ impl PartialEq for GcsConfig {
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}

Expand All @@ -237,6 +279,7 @@ impl Default for S3Config {
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
Expand All @@ -251,6 +294,7 @@ impl Default for OssConfig {
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
Expand All @@ -266,6 +310,7 @@ impl Default for AzblobConfig {
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
Expand All @@ -281,6 +326,7 @@ impl Default for GcsConfig {
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
Expand Down
23 changes: 13 additions & 10 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_raw_object_store(
Expand Down Expand Up @@ -177,7 +177,7 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}

pub(crate) fn build_http_client() -> Result<HttpClient> {
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();

Expand All @@ -186,25 +186,28 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_max_idle_per_host as usize);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);

// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_CONNECT_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.connect_timeout);
builder = builder.connect_timeout(connect_timeout);

// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_idle_timeout);

builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
builder = builder.pool_idle_timeout(idle_timeout);

builder
builder.timeout(config.timeout)
};

HttpClient::build(http_builder).context(error::InitBackendSnafu)
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/store/azblob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);

let client = build_http_client(&azblob_config.http_client)?;

let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);

if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/store/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
gcs_config.bucket, &root
);

let client = build_http_client(&gcs_config.http_client);

let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(build_http_client()?);
.http_client(client?);

Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/store/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
oss_config.bucket, &root
);

let client = build_http_client(&oss_config.http_client)?;

let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(build_http_client()?);
.http_client(client);

Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
s3_config.bucket, &root
);

let client = build_http_client(&s3_config.http_client)?;

let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);

if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
Expand Down
27 changes: 23 additions & 4 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,28 @@ pub async fn test_config_api(store_type: StorageType) {
let res_get = client.get("/config").send().await;
assert_eq!(res_get.status(), StatusCode::OK);

let storage = if store_type != StorageType::File {
format!(
r#"[storage]
type = "{}"
providers = []
[storage.http_client]
pool_max_idle_per_host = 1024
connect_timeout = "30s"
timeout = "30s"
pool_idle_timeout = "1m 30s""#,
store_type
)
} else {
format!(
r#"[storage]
type = "{}"
providers = []"#,
store_type
)
};

let expected_toml_str = format!(
r#"
mode = "standalone"
Expand Down Expand Up @@ -867,9 +889,7 @@ sync_write = false
enable_log_recycle = true
prefill_log_files = false
[storage]
type = "{}"
providers = []
{storage}
[metadata_store]
file_size = "256MiB"
Expand Down Expand Up @@ -933,7 +953,6 @@ enable = false
write_interval = "30s"
[tracing]"#,
store_type
)
.trim()
.to_string();
Expand Down

0 comments on commit 57f31d1

Please sign in to comment.