diff --git a/Cargo.lock b/Cargo.lock index d18c726756bef..06ef3dbbbbb66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,19 +683,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-compat" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d" -dependencies = [ - "futures-core", - "futures-io", - "once_cell", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-compression" version = "0.4.5" @@ -6656,12 +6643,11 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.44.0" +version = "0.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" +checksum = "4af824652d4d2ffabf606d337a071677ae621b05622adf35df9562f69d9b4498" dependencies = [ "anyhow", - "async-compat", "async-trait", "backon", "base64 0.21.4", @@ -6674,9 +6660,7 @@ dependencies = [ "log", "md-5", "once_cell", - "parking_lot 0.12.1", "percent-encoding", - "pin-project", "prometheus", "quick-xml 0.30.0", "reqsign", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 61f526511c3ab..d1db63c5fdb30 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -27,7 +27,7 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" madsim = "0.2.22" -opendal = "0.44" +opendal = "0.44.2" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } rustls = "0.21.8" diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 5399b6d253b2f..d9ae0bc37b868 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -818,15 +818,27 @@ pub async fn build_remote_object_store( config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { - s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( - S3ObjectStore::new_with_config( - s3.strip_prefix("s3://").unwrap().to_string(), - metrics.clone(), - config, - ) - .await - .monitored(metrics), - ), + s3 if s3.starts_with("s3://") => { + if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { + let bucket = s3.strip_prefix("s3://").unwrap(); + + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), config) + .unwrap() + .monitored(metrics), + ) + } else { + ObjectStoreImpl::S3( + S3ObjectStore::new_with_config( + s3.strip_prefix("s3://").unwrap().to_string(), + metrics.clone(), + config, + ) + .await + .monitored(metrics), + ) + } + } #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index 1620ee30da7d7..ccaba375a1302 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -26,8 +26,8 @@ pub mod gcs; pub mod obs; -pub mod oss; - pub mod azblob; +pub mod opendal_s3; +pub mod oss; pub mod fs; diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index a6c68c45f6472..122506d37cdfa 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -38,6 +38,7 @@ pub enum EngineType { Memory, Hdfs, Gcs, + S3, Obs, Oss, Webhdfs, @@ -190,6 +191,7 @@ impl ObjectStore for OpendalObjectStore { match self.engine_type { EngineType::Memory => "Memory", EngineType::Hdfs => "Hdfs", + EngineType::S3 => "S3", EngineType::Gcs => "Gcs", EngineType::Obs => "Obs", EngineType::Oss => "Oss", @@ -206,7 +208,11 @@ pub struct OpendalStreamingUploader { } impl OpendalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { - let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?; + let writer = op + .writer_with(&path) + .concurrent(8) + .buffer(OPENDAL_BUFFER_SIZE) + .await?; Ok(Self { writer }) } } diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs new file mode 100644 index 0000000000000..425d0a7576691 --- /dev/null +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -0,0 +1,82 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::Operator; +use risingwave_common::config::ObjectStoreConfig; + +use super::{EngineType, OpendalObjectStore}; +use crate::object::ObjectResult; + +impl OpendalObjectStore { + /// create opendal s3 engine. + pub fn new_s3_engine( + bucket: String, + object_store_config: ObjectStoreConfig, + ) -> ObjectResult { + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&bucket); + + // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. + if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { + builder.endpoint(&endpoint_url); + } + + if let Ok(region) = std::env::var("AWS_REGION") { + builder.region(®ion); + } else { + tracing::error!("aws s3 region is not set, bucket {}", bucket); + } + + if let Ok(access) = std::env::var("AWS_ACCESS_KEY_ID") { + builder.access_key_id(&access); + } else { + tracing::error!("access key id of aws s3 is not set, bucket {}", bucket); + } + + if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") { + builder.secret_access_key(&secret); + } else { + tracing::error!("secret access key of aws s3 is not set, bucket {}", bucket); + } + + if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() { + builder.enable_virtual_host_style(); + } + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_interval_ms, + )) + .with_max_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_max_delay_ms, + )) + .with_max_times(object_store_config.s3.object_store_req_retry_max_attempts) + .with_factor(1.0) + .with_jitter(), + ) + .finish(); + Ok(Self { + op, + engine_type: EngineType::S3, + }) + } +}