Skip to content

Commit

Permalink
enhance opendal s3
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Feb 1, 2024
1 parent fb42759 commit 1295931
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 43 deletions.
20 changes: 2 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] }
hyper-tls = "0.5.0"
itertools = "0.12"
madsim = "0.2.22"
opendal = "0.44"
opendal = "0.44.2"
prometheus = { version = "0.13", features = ["process"] }
risingwave_common = { workspace = true }
rustls = "0.21.8"
Expand Down
30 changes: 9 additions & 21 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,27 +818,15 @@ pub async fn build_remote_object_store(
config: ObjectStoreConfig,
) -> ObjectStoreImpl {
match url {
s3 if s3.starts_with("s3://") => {
if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() {
let s3 = s3.strip_prefix("s3://").unwrap();
let (bucket, root) = s3.split_once('@').unwrap_or((s3, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
)
.await
.monitored(metrics),
)
}
}
s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
)
.await
.monitored(metrics),
),
#[cfg(feature = "hdfs-backend")]
hdfs if hdfs.starts_with("hdfs://") => {
let hdfs = hdfs.strip_prefix("hdfs://").unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ pub struct OpendalStreamingUploader {
}
impl OpendalStreamingUploader {
pub async fn new(op: Operator, path: String) -> ObjectResult<Self> {
let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?;
let writer = op
.writer_with(&path)
.concurrent(8)
.buffer(OPENDAL_BUFFER_SIZE)
.await?;
Ok(Self { writer })
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal s3 engine.
pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult<Self> {
pub fn new_s3_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create s3 builder.
let mut builder = S3::default();
builder.bucket(&bucket);
Expand Down Expand Up @@ -57,7 +64,18 @@ impl OpendalObjectStore {
builder.disable_config_load();
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();
Ok(Self {
op,
Expand Down

0 comments on commit 1295931

Please sign in to comment.