diff --git a/Cargo.lock b/Cargo.lock index 5b73909cc9797..5f82db1e78aee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,19 +683,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-compat" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d" -dependencies = [ - "futures-core", - "futures-io", - "once_cell", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-compression" version = "0.4.5" @@ -6646,12 +6633,11 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.44.0" +version = "0.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" +checksum = "4af824652d4d2ffabf606d337a071677ae621b05622adf35df9562f69d9b4498" dependencies = [ "anyhow", - "async-compat", "async-trait", "backon", "base64 0.21.4", @@ -6664,9 +6650,7 @@ dependencies = [ "log", "md-5", "once_cell", - "parking_lot 0.12.1", "percent-encoding", - "pin-project", "prometheus", "quick-xml 0.30.0", "reqsign", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 61f526511c3ab..d1db63c5fdb30 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -27,7 +27,7 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" madsim = "0.2.22" -opendal = "0.44" +opendal = "0.44.2" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } rustls = "0.21.8" diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 7ccee1cb50682..5399b6d253b2f 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -818,27 +818,15 @@ pub async fn build_remote_object_store( config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { - s3 if s3.starts_with("s3://") => { - if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { - let s3 = s3.strip_prefix("s3://").unwrap(); - let (bucket, root) = s3.split_once('@').unwrap_or((s3, "")); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string()) - .unwrap() - .monitored(metrics), - ) - } else { - ObjectStoreImpl::S3( - S3ObjectStore::new_with_config( - s3.strip_prefix("s3://").unwrap().to_string(), - metrics.clone(), - config, - ) - .await - .monitored(metrics), - ) - } - } + s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( + S3ObjectStore::new_with_config( + s3.strip_prefix("s3://").unwrap().to_string(), + metrics.clone(), + config, + ) + .await + .monitored(metrics), + ), #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 32c63ca3d214e..d70bbb6c7bb9f 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -208,7 +208,11 @@ pub struct OpendalStreamingUploader { } impl OpendalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { - let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?; + let writer = op + .writer_with(&path) + .concurrent(8) + .buffer(OPENDAL_BUFFER_SIZE) + .await?; Ok(Self { writer }) } } diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6257560c1c4c9..acbbf2eb64209 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -12,16 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::S3; use opendal::Operator; +use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal s3 engine. - pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { + pub fn new_s3_engine( + bucket: String, + root: String, + object_store_config: ObjectStoreConfig, + ) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); @@ -57,7 +64,18 @@ impl OpendalObjectStore { builder.disable_config_load(); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) - .layer(RetryLayer::default()) + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_interval_ms, + )) + .with_max_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_max_delay_ms, + )) + .with_max_times(object_store_config.s3.object_store_req_retry_max_attempts) + .with_factor(1.0) + .with_jitter(), + ) .finish(); Ok(Self { op,