diff --git a/Cargo.lock b/Cargo.lock index 429d62308abd4..acfb7b14e9025 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10054,6 +10054,7 @@ dependencies = [ "madsim-tokio", "opendal", "prometheus", + "reqwest", "risingwave_common", "rustls 0.23.4", "spin 0.9.8", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index accdc1ab09652..29144c2cab882 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -29,6 +29,7 @@ itertools = "0.12" madsim = "0.2.22" opendal = "0.45.1" prometheus = { version = "0.13", features = ["process"] } +reqwest = "0.11" risingwave_common = { workspace = true } rustls = "0.23.4" spin = "0.9" diff --git a/src/object_store/src/object/opendal_engine/azblob.rs b/src/object_store/src/object/opendal_engine/azblob.rs index ae09a0fe2d333..8ac8caf09fda3 100644 --- a/src/object_store/src/object/opendal_engine/azblob.rs +++ b/src/object_store/src/object/opendal_engine/azblob.rs @@ -32,6 +32,7 @@ impl OpendalObjectStore { .unwrap_or_else(|_| panic!("AZBLOB_ENDPOINT not found from environment variables")); builder.endpoint(&endpoint); + let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs index 238db7ff51a00..0577288005fca 100644 --- a/src/object_store/src/object/opendal_engine/gcs.rs +++ b/src/object_store/src/object/opendal_engine/gcs.rs @@ -34,6 +34,7 @@ impl OpendalObjectStore { if let Ok(cred) = cred { builder.credential(&cred); } + let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/obs.rs b/src/object_store/src/object/opendal_engine/obs.rs index 6acd79ddbed4e..4ddf9579685f1 100644 --- a/src/object_store/src/object/opendal_engine/obs.rs +++ b/src/object_store/src/object/opendal_engine/obs.rs @@ -40,6 +40,7 @@ impl OpendalObjectStore { builder.endpoint(&endpoint); builder.access_key_id(&access_key_id); builder.secret_access_key(&secret_access_key); + let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index c10aff55d342b..db2c7732d8fbf 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -15,6 +15,7 @@ use std::time::Duration; use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::raw::HttpClient; use opendal::services::S3; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; @@ -40,6 +41,9 @@ impl OpendalObjectStore { builder.enable_virtual_host_style(); } + let http_client = Self::new_http_client(&object_store_config)?; + builder.http_client(http_client); + let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer( @@ -60,4 +64,18 @@ impl OpendalObjectStore { engine_type: EngineType::S3, }) } + + pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult { + let mut client_builder = reqwest::ClientBuilder::new(); + + if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() { + client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms)); + } + + if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() { + client_builder = client_builder.tcp_nodelay(*nodelay); + } + + Ok(HttpClient::build(client_builder)?) + } } diff --git a/src/object_store/src/object/opendal_engine/oss.rs b/src/object_store/src/object/opendal_engine/oss.rs index 0436a4e9b548e..e215b6f93d31e 100644 --- a/src/object_store/src/object/opendal_engine/oss.rs +++ b/src/object_store/src/object/opendal_engine/oss.rs @@ -40,6 +40,7 @@ impl OpendalObjectStore { builder.endpoint(&endpoint); builder.access_key_id(&access_key_id); builder.access_key_secret(&access_key_secret); + let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default())