Skip to content

Commit

Permalink
feat(object store): introduce new s3 object store via OpenDAL (#14409) (
Browse files Browse the repository at this point in the history
#15215)

Co-authored-by: congyi wang <[email protected]>
Co-authored-by: William Wen <[email protected]>
  • Loading branch information
3 people authored Feb 23, 2024
1 parent 1e8b88e commit 4b1b616
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 31 deletions.
20 changes: 2 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] }
hyper-tls = "0.5.0"
itertools = "0.12"
madsim = "0.2.22"
opendal = "0.44"
opendal = "0.44.2"
prometheus = { version = "0.13", features = ["process"] }
risingwave_common = { workspace = true }
rustls = "0.21.8"
Expand Down
30 changes: 21 additions & 9 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,15 +818,27 @@ pub async fn build_remote_object_store(
config: ObjectStoreConfig,
) -> ObjectStoreImpl {
match url {
s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
)
.await
.monitored(metrics),
),
s3 if s3.starts_with("s3://") => {
if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() {
let bucket = s3.strip_prefix("s3://").unwrap();

ObjectStoreImpl::Opendal(
OpendalObjectStore::new_s3_engine(bucket.to_string(), config)
.unwrap()
.monitored(metrics),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
)
.await
.monitored(metrics),
)
}
}
#[cfg(feature = "hdfs-backend")]
hdfs if hdfs.starts_with("hdfs://") => {
let hdfs = hdfs.strip_prefix("hdfs://").unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub mod gcs;

pub mod obs;

pub mod oss;

pub mod azblob;
pub mod opendal_s3;
pub mod oss;

pub mod fs;
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum EngineType {
Memory,
Hdfs,
Gcs,
S3,
Obs,
Oss,
Webhdfs,
Expand Down Expand Up @@ -190,6 +191,7 @@ impl ObjectStore for OpendalObjectStore {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
EngineType::Oss => "Oss",
Expand All @@ -206,7 +208,11 @@ pub struct OpendalStreamingUploader {
}
impl OpendalStreamingUploader {
pub async fn new(op: Operator, path: String) -> ObjectResult<Self> {
let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?;
let writer = op
.writer_with(&path)
.concurrent(8)
.buffer(OPENDAL_BUFFER_SIZE)
.await?;
Ok(Self { writer })
}
}
Expand Down
82 changes: 82 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal s3 engine.
pub fn new_s3_engine(
bucket: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create s3 builder.
let mut builder = S3::default();
builder.bucket(&bucket);

// For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field.
if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
builder.endpoint(&endpoint_url);
}

if let Ok(region) = std::env::var("AWS_REGION") {
builder.region(&region);
} else {
tracing::error!("aws s3 region is not set, bucket {}", bucket);
}

if let Ok(access) = std::env::var("AWS_ACCESS_KEY_ID") {
builder.access_key_id(&access);
} else {
tracing::error!("access key id of aws s3 is not set, bucket {}", bucket);
}

if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
builder.secret_access_key(&secret);
} else {
tracing::error!("secret access key of aws s3 is not set, bucket {}", bucket);
}

if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
builder.enable_virtual_host_style();
}

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();
Ok(Self {
op,
engine_type: EngineType::S3,
})
}
}

0 comments on commit 4b1b616

Please sign in to comment.