From a379cdb658a188a1db1dc1eab646084a02233647 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 3 Jan 2024 11:17:05 +0800 Subject: [PATCH 01/14] save work --- .../src/object/opendal_engine/mod.rs | 2 +- .../src/object/opendal_engine/opendal_s3.rs | 75 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 src/object_store/src/object/opendal_engine/opendal_s3.rs diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index f2dbc24d4bbab..c11f4fd2a25e7 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -27,7 +27,7 @@ pub mod gcs; pub mod obs; pub mod oss; - +pub mod opendal_s3; pub mod azblob; pub mod fs; diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs new file mode 100644 index 0000000000000..c6b493fc1213d --- /dev/null +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -0,0 +1,75 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use opendal::{layers::{LoggingLayer, RetryLayer}, services::S3}; +use opendal::services::Gcs; +use opendal::Operator; + +use super::{EngineType, OpendalObjectStore}; +use crate::object::ObjectResult; + +impl OpendalObjectStore { + /// create opendal gcs engine. + pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { + // Create gcs backend builder. + let mut builder = Gcs::default(); + + builder.bucket(&bucket); + + builder.root(&root); + + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&bucket); + builder.region(&s3_properties.region_name); + + if let Some(endpoint_url) = s3_properties.endpoint_url { + builder.endpoint(&endpoint_url); + } + + if let Some(access) = s3_properties.access { + builder.access_key_id(&access); + } else { + tracing::error!( + "access key id of aws s3 is not set, bucket {}", + s3_properties.bucket_name + ); + } + + if let Some(secret) = s3_properties.secret { + builder.secret_access_key(&secret); + } else { + tracing::error!( + "secret access key of aws s3 is not set, bucket {}", + s3_properties.bucket_name + ); + } + + builder.enable_virtual_host_style(); + + if let Some(assume_role) = assume_role { + builder.role_arn(&assume_role); + } + + builder.disable_config_load(); + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(Self { + op, + engine_type: EngineType::Gcs, + }) + } +} From 4e524193c814f90f27db0bb1174ef34844a8f395 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jan 2024 14:26:46 +0800 Subject: [PATCH 02/14] implement new s3 via OpenDAL --- risedev.yml | 13 +++++++ src/object_store/src/object/mod.rs | 9 +++++ .../src/object/opendal_engine/mod.rs | 4 +- .../opendal_engine/opendal_object_store.rs | 2 + .../src/object/opendal_engine/opendal_s3.rs | 37 ++++++------------- src/risedevtool/src/task/utils.rs | 4 ++ 6 files changed, 41 insertions(+), 28 deletions(-) diff --git a/risedev.yml b/risedev.yml index 17076e4f2da8e..97a8d0bd48b03 100644 --- a/risedev.yml +++ b/risedev.yml @@ -191,6 +191,19 @@ profile: - use: compactor # - use: prometheus # - use: grafana + opendal_s3: + steps: + # - use: etcd + - use: meta-node + - use: compute-node + - use: frontend + # If you want to use google cloud storage as storage backend, configure bucket name: + - use: opendal + engine: opendal_s3 + bucket: bucket-name + - use: compactor + # - use: prometheus + # - use: grafana obs: steps: # - use: etcd diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index d2010a676a481..8b7dac6d90371 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -795,6 +795,15 @@ pub async fn build_remote_object_store( .await .monitored(metrics), ), + opendal_s3 if opendal_s3.starts_with("opendal_s3://") => { + let opendal_s3 = opendal_s3.strip_prefix("opendal_s3://").unwrap(); + let (bucket, root) = opendal_s3.split_once('@').unwrap_or((opendal_s3, "")); + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string()) + .unwrap() + .monitored(metrics), + ) + } #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index c11f4fd2a25e7..9744a578582ae 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -26,8 +26,8 @@ pub mod gcs; pub mod obs; -pub mod oss; -pub mod opendal_s3; pub mod azblob; +pub mod opendal_s3; +pub mod oss; pub mod fs; diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 9005f06981b28..3489603026a88 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -37,6 +37,7 @@ pub enum EngineType { Memory, Hdfs, Gcs, + OpendalS3, Obs, Oss, Webhdfs, @@ -189,6 +190,7 @@ impl ObjectStore for OpendalObjectStore { match self.engine_type { EngineType::Memory => "Memory", EngineType::Hdfs => "Hdfs", + EngineType::OpendalS3 => "OpendalS3", EngineType::Gcs => "Gcs", EngineType::Obs => "Obs", EngineType::Oss => "Oss", diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index c6b493fc1213d..93ff0ce2d0ebe 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use opendal::{layers::{LoggingLayer, RetryLayer}, services::S3}; -use opendal::services::Gcs; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; @@ -22,44 +22,29 @@ use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal gcs engine. pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { - // Create gcs backend builder. - let mut builder = Gcs::default(); - - builder.bucket(&bucket); - - builder.root(&root); - // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); - builder.region(&s3_properties.region_name); + builder.root(&root); - if let Some(endpoint_url) = s3_properties.endpoint_url { + if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { builder.endpoint(&endpoint_url); } - if let Some(access) = s3_properties.access { + if let Ok(access) = std::env::var("AWS_ACCESS_KEY_ID") { builder.access_key_id(&access); } else { - tracing::error!( - "access key id of aws s3 is not set, bucket {}", - s3_properties.bucket_name - ); + tracing::error!("access key id of aws s3 is not set, bucket {}", bucket); } - if let Some(secret) = s3_properties.secret { + if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") { builder.secret_access_key(&secret); } else { - tracing::error!( - "secret access key of aws s3 is not set, bucket {}", - s3_properties.bucket_name - ); + tracing::error!("secret access key of aws s3 is not set, bucket {}", bucket); } - builder.enable_virtual_host_style(); - - if let Some(assume_role) = assume_role { - builder.role_arn(&assume_role); + if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() { + builder.enable_virtual_host_style(); } builder.disable_config_load(); @@ -69,7 +54,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Gcs, + engine_type: EngineType::OpendalS3, }) } } diff --git a/src/risedevtool/src/task/utils.rs b/src/risedevtool/src/task/utils.rs index e4a6322c28588..0e563774e577a 100644 --- a/src/risedevtool/src/task/utils.rs +++ b/src/risedevtool/src/task/utils.rs @@ -118,6 +118,10 @@ pub fn add_hummock_backend( cmd.arg("--state-store") .arg(format!("hummock+hdfs://{}", opendal.namenode)); } + else if opendal.engine == "opendal_s3"{ + cmd.arg("--state-store") + .arg(format!("hummock+opendal_s3://{}", opendal.bucket)); + } else if opendal.engine == "gcs"{ cmd.arg("--state-store") .arg(format!("hummock+gcs://{}", opendal.bucket)); From 95ae8460bba4c0d7fee8c28511724440d4efc39d Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jan 2024 15:15:06 +0800 Subject: [PATCH 03/14] update Copyright --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 93ff0ce2d0ebe..af940bedd923e 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RisingWave Labs +// Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From c2d8280388aeaec9b36599d37d675e270d125e41 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jan 2024 15:21:20 +0800 Subject: [PATCH 04/14] typo --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index af940bedd923e..6b805bf9217df 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -20,7 +20,7 @@ use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; impl OpendalObjectStore { - /// create opendal gcs engine. + /// create opendal s3 engine. pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); From c89867829644bd5531ed5b7da47bb40c2f013fb0 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 19 Jan 2024 13:49:27 +0800 Subject: [PATCH 05/14] add region --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6b805bf9217df..6257560c1c4c9 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -27,10 +27,17 @@ impl OpendalObjectStore { builder.bucket(&bucket); builder.root(&root); + // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { builder.endpoint(&endpoint_url); } + if let Ok(region) = std::env::var("AWS_REGION") { + builder.region(®ion); + } else { + tracing::error!("aws s3 region is not set, bucket {}", bucket); + } + if let Ok(access) = std::env::var("AWS_ACCESS_KEY_ID") { builder.access_key_id(&access); } else { From b1a9adf3838ce44f3141af1ae7fcae8c4c1280fb Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 19 Jan 2024 14:29:42 +0800 Subject: [PATCH 06/14] switch minio to new s3 --- src/object_store/src/object/mod.rs | 6 +-- .../opendal_engine/opendal_object_store.rs | 2 + .../src/object/opendal_engine/opendal_s3.rs | 34 ++++++++++++ src/object_store/src/object/s3.rs | 54 ------------------- 4 files changed, 39 insertions(+), 57 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 8b7dac6d90371..00ff0e8c7eac9 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -875,9 +875,9 @@ pub async fn build_remote_object_store( set your endpoint to the environment variable RW_S3_ENDPOINT."); panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3."); } - minio if minio.starts_with("minio://") => ObjectStoreImpl::S3( - S3ObjectStore::with_minio(minio, metrics.clone()) - .await + minio if minio.starts_with("minio://") => ObjectStoreImpl::Opendal( + OpendalObjectStore::with_minio(minio) + .unwrap() .monitored(metrics), ), "memory" => { diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 3489603026a88..79c1742349fb4 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -38,6 +38,7 @@ pub enum EngineType { Hdfs, Gcs, OpendalS3, + Minio, Obs, Oss, Webhdfs, @@ -191,6 +192,7 @@ impl ObjectStore for OpendalObjectStore { EngineType::Memory => "Memory", EngineType::Hdfs => "Hdfs", EngineType::OpendalS3 => "OpendalS3", + EngineType::Minio => "Minio", EngineType::Gcs => "Gcs", EngineType::Obs => "Obs", EngineType::Oss => "Oss", diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6257560c1c4c9..115e723b50f19 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -64,4 +64,38 @@ impl OpendalObjectStore { engine_type: EngineType::OpendalS3, }) } + + /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`. + pub fn with_minio(server: &str) -> ObjectResult { + let server = server.strip_prefix("minio://").unwrap(); + let (access_key_id, rest) = server.split_once(':').unwrap(); + let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); + let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { + rest = rest_stripped; + "https://" + } else if let Some(rest_stripped) = rest.strip_prefix("http://") { + rest = rest_stripped; + "http://" + } else { + "http://" + }; + let (address, bucket) = rest.split_once('/').unwrap(); + let mut builder = S3::default(); + builder + .bucket(bucket) + .region("custom") + .access_key_id(access_key_id) + .secret_access_key(secret_access_key) + .endpoint(&format!("{}{}", endpoint_prefix, address)); + + builder.disable_config_load(); + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(Self { + op, + engine_type: EngineType::Minio, + }) + } } diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index d150b4178f964..32b10235f353a 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::task::{ready, Context, Poll}; use std::time::Duration; -use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; @@ -65,8 +64,6 @@ const MIN_PART_ID: PartId = 1; /// /// Reference: const S3_PART_SIZE: usize = 16 * 1024 * 1024; -// TODO: we should do some benchmark to determine the proper part size for MinIO -const MINIO_PART_SIZE: usize = 16 * 1024 * 1024; /// The number of S3/MinIO bucket prefixes const NUM_BUCKET_PREFIXES: u32 = 256; /// Stop multipart uploads that don't complete within a specified number of days after being @@ -627,57 +624,6 @@ impl S3ObjectStore { } } - /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`. - pub async fn with_minio(server: &str, metrics: Arc) -> Self { - let server = server.strip_prefix("minio://").unwrap(); - let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); - let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { - rest = rest_stripped; - "https://" - } else if let Some(rest_stripped) = rest.strip_prefix("http://") { - rest = rest_stripped; - "http://" - } else { - "http://" - }; - let (address, bucket) = rest.split_once('/').unwrap(); - - let s3_object_store_config = ObjectStoreConfig::default(); - #[cfg(madsim)] - let builder = aws_sdk_s3::config::Builder::new().credentials_provider( - Credentials::from_keys(access_key_id, secret_access_key, None), - ); - #[cfg(not(madsim))] - let builder = aws_sdk_s3::config::Builder::from( - &aws_config::ConfigLoader::default() - // FIXME: https://github.com/awslabs/aws-sdk-rust/issues/973 - .credentials_provider(Credentials::from_keys( - access_key_id, - secret_access_key, - None, - )) - .load() - .await, - ) - .force_path_style(true) - .http_client(Self::new_http_client(&s3_object_store_config)) - .behavior_version_latest(); - let config = builder - .region(Region::new("custom")) - .endpoint_url(format!("{}{}", endpoint_prefix, address)) - .build(); - let client = Client::from_conf(config); - - Self { - client, - bucket: bucket.to_string(), - part_size: MINIO_PART_SIZE, - metrics, - config: s3_object_store_config, - } - } - fn get_object_prefix(obj_id: u64) -> String { let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES; let mut obj_prefix = prefix.to_string(); From f02a8b25ef5d98760ac5c4c3d6c92f959d931121 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 23 Feb 2024 13:51:33 +0800 Subject: [PATCH 07/14] clean risedev.yml --- risedev.yml | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/risedev.yml b/risedev.yml index 55972fe67ab45..cb352daab6cf9 100644 --- a/risedev.yml +++ b/risedev.yml @@ -202,19 +202,6 @@ profile: - use: compactor # - use: prometheus # - use: grafana - opendal_s3: - steps: - # - use: etcd - - use: meta-node - - use: compute-node - - use: frontend - # If you want to use google cloud storage as storage backend, configure bucket name: - - use: opendal - engine: opendal_s3 - bucket: bucket-name - - use: compactor - # - use: prometheus - # - use: grafana obs: steps: # - use: etcd From 87699c46d399096a2479d18b31353b58f8118899 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 7 Mar 2024 22:38:46 +0800 Subject: [PATCH 08/14] minor --- risingwave-test/hummock_001/checkpoint/0 | Bin 0 -> 116 bytes src/risedevtool/src/task/utils.rs | 4 ---- 2 files changed, 4 deletions(-) create mode 100644 risingwave-test/hummock_001/checkpoint/0 diff --git a/risingwave-test/hummock_001/checkpoint/0 b/risingwave-test/hummock_001/checkpoint/0 new file mode 100644 index 0000000000000000000000000000000000000000..ebf52f8099a4183a891b732f5f341d668f191bf9 GIT binary patch literal 116 zcmd-o;#k|pD5S%|B&5N`!oeuO2qc-nBr}*~0h6p?l1+e7h(Ur0sG6B*)yxtN4GZ?X RyZo3-p`qcy|DFYpxc~(c4^IF9 literal 0 HcmV?d00001 diff --git a/src/risedevtool/src/task/utils.rs b/src/risedevtool/src/task/utils.rs index bc6ada4946e33..67b51ddc55da8 100644 --- a/src/risedevtool/src/task/utils.rs +++ b/src/risedevtool/src/task/utils.rs @@ -118,10 +118,6 @@ pub fn add_hummock_backend( cmd.arg("--state-store") .arg(format!("hummock+hdfs://{}", opendal.namenode)); } - else if opendal.engine == "opendal_s3"{ - cmd.arg("--state-store") - .arg(format!("hummock+opendal_s3://{}", opendal.bucket)); - } else if opendal.engine == "gcs"{ cmd.arg("--state-store") .arg(format!("hummock+gcs://{}", opendal.bucket)); From b5e6942be4928347e1cfc59b2468f0b60b64244c Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 7 Mar 2024 22:39:35 +0800 Subject: [PATCH 09/14] minor --- risingwave-test/hummock_001/checkpoint/0 | Bin 116 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 risingwave-test/hummock_001/checkpoint/0 diff --git a/risingwave-test/hummock_001/checkpoint/0 b/risingwave-test/hummock_001/checkpoint/0 deleted file mode 100644 index ebf52f8099a4183a891b732f5f341d668f191bf9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 116 zcmd-o;#k|pD5S%|B&5N`!oeuO2qc-nBr}*~0h6p?l1+e7h(Ur0sG6B*)yxtN4GZ?X RyZo3-p`qcy|DFYpxc~(c4^IF9 From b90d2ad773a1bad12b2e632efb559e6737f62c79 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 20 Mar 2024 16:59:19 +0800 Subject: [PATCH 10/14] rebase main --- src/object_store/src/object/mod.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 312db7d3d4d6c..0ad341dc03330 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -892,16 +892,17 @@ pub async fn build_remote_object_store( panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3."); } minio if minio.starts_with("minio://") => { - if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { - ObjectStoreImpl::Opendal( - OpendalObjectStore::with_minio(minio, config) - .unwrap() + if env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) { + ObjectStoreImpl::S3( + S3ObjectStore::with_minio(minio, metrics.clone(), config.clone()) + .await .monitored(metrics, config), ) } else { - ObjectStoreImpl::S3( - S3ObjectStore::with_minio(minio, metrics.clone(), config) - .await + tracing::info!("Using OpenDAL to access minio."); + ObjectStoreImpl::Opendal( + OpendalObjectStore::with_minio(minio, config.clone()) + .unwrap() .monitored(metrics, config), ) } From 5c222f16d2e1f083ae21586e5320d8650f877756 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 20 Mar 2024 17:16:51 +0800 Subject: [PATCH 11/14] update backon --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4584f80a54ad4..98dbf715f265e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1385,11 +1385,11 @@ dependencies = [ [[package]] name = "backon" -version = "0.4.1" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458" dependencies = [ - "fastrand 1.9.0", + "fastrand 2.0.1", "futures-core", "pin-project", "tokio", @@ -8243,7 +8243,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.1", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", From 5f272c95eb935a9f1899aaef0e51ca345209b270 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 10 Apr 2024 21:18:53 +0800 Subject: [PATCH 12/14] resolve comment --- Cargo.lock | 1 + src/object_store/Cargo.toml | 1 + .../src/object/opendal_engine/opendal_s3.rs | 24 +++++++++++-------- src/object_store/src/object/s3.rs | 24 +++++++++++-------- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f54b3a5141a99..46c43e8c4c430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10037,6 +10037,7 @@ dependencies = [ "thiserror-ext", "tokio-retry", "tracing", + "url", ] [[package]] diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 29144c2cab882..17cc5ffc99dbc 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -38,6 +38,7 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] } tokio-retry = "0.3" tracing = "0.1" +url = "2" # This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...## [target.'cfg(not(madsim))'.dependencies] # workspace-hack = { path = "../workspace-hack" } # diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6fe4f772d2995..5a293086a98c8 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -19,6 +19,7 @@ use opendal::raw::HttpClient; use opendal::services::S3; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; +use url::Url; use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; @@ -69,17 +70,20 @@ impl OpendalObjectStore { pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); - let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { - rest = rest_stripped; - "https://" - } else if let Some(rest_stripped) = rest.strip_prefix("http://") { - rest = rest_stripped; - "http://" - } else { - "http://" + let (secret_access_key, rest) = rest.split_once('@').unwrap(); + let mut parsed_url = Url::parse(rest).unwrap(); + let endpoint_prefix = match parsed_url.scheme() { + "https" => { + parsed_url.set_scheme("http").unwrap(); + "https://" + } + "http" => "http://", + _ => "http://", }; - let (address, bucket) = rest.split_once('/').unwrap(); + + let modified_rest = parsed_url.as_str(); + + let (address, bucket) = modified_rest.split_once('/').unwrap(); let mut builder = S3::default(); builder .bucket(bucket) diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 196638f1e3689..2652ca0cf5a58 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -50,6 +50,7 @@ use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use url::Url; use super::object_metrics::ObjectStoreMetrics; use super::{ @@ -644,17 +645,20 @@ impl S3ObjectStore { ) -> Self { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); - let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { - rest = rest_stripped; - "https://" - } else if let Some(rest_stripped) = rest.strip_prefix("http://") { - rest = rest_stripped; - "http://" - } else { - "http://" + let (secret_access_key, rest) = rest.split_once('@').unwrap(); + let mut parsed_url = Url::parse(rest).unwrap(); + let endpoint_prefix = match parsed_url.scheme() { + "https" => { + parsed_url.set_scheme("http").unwrap(); + "https://" + } + "http" => "http://", + _ => "http://", }; - let (address, bucket) = rest.split_once('/').unwrap(); + + let modified_rest = parsed_url.as_str(); + + let (address, bucket) = modified_rest.split_once('/').unwrap(); #[cfg(madsim)] let builder = aws_sdk_s3::config::Builder::new().credentials_provider( From fa1c7dcf65c1cd4306192ce419de298670e884d9 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 10 Apr 2024 22:38:32 +0800 Subject: [PATCH 13/14] rollback --- Cargo.lock | 1 - src/object_store/Cargo.toml | 1 - .../src/object/opendal_engine/opendal_s3.rs | 24 ++++++++---------- src/object_store/src/object/s3.rs | 25 ++++++++----------- 4 files changed, 22 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46c43e8c4c430..f54b3a5141a99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10037,7 +10037,6 @@ dependencies = [ "thiserror-ext", "tokio-retry", "tracing", - "url", ] [[package]] diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 17cc5ffc99dbc..29144c2cab882 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -38,7 +38,6 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] } tokio-retry = "0.3" tracing = "0.1" -url = "2" # This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...## [target.'cfg(not(madsim))'.dependencies] # workspace-hack = { path = "../workspace-hack" } # diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 5a293086a98c8..28f90a48e9ae0 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -19,7 +19,6 @@ use opendal::raw::HttpClient; use opendal::services::S3; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use url::Url; use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; @@ -70,20 +69,19 @@ impl OpendalObjectStore { pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, rest) = rest.split_once('@').unwrap(); - let mut parsed_url = Url::parse(rest).unwrap(); - let endpoint_prefix = match parsed_url.scheme() { - "https" => { - parsed_url.set_scheme("http").unwrap(); - "https://" - } - "http" => "http://", - _ => "http://", + let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); + + let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { + rest = rest_stripped; + "https://" + } else if let Some(rest_stripped) = rest.strip_prefix("http://") { + rest = rest_stripped; + "http://" + } else { + "http://" }; + let (address, bucket) = rest.split_once('/').unwrap(); - let modified_rest = parsed_url.as_str(); - - let (address, bucket) = modified_rest.split_once('/').unwrap(); let mut builder = S3::default(); builder .bucket(bucket) diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 2652ca0cf5a58..a9f6c9ae1db07 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -50,7 +50,6 @@ use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use url::Url; use super::object_metrics::ObjectStoreMetrics; use super::{ @@ -645,20 +644,18 @@ impl S3ObjectStore { ) -> Self { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, rest) = rest.split_once('@').unwrap(); - let mut parsed_url = Url::parse(rest).unwrap(); - let endpoint_prefix = match parsed_url.scheme() { - "https" => { - parsed_url.set_scheme("http").unwrap(); - "https://" - } - "http" => "http://", - _ => "http://", + let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); + + let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { + rest = rest_stripped; + "https://" + } else if let Some(rest_stripped) = rest.strip_prefix("http://") { + rest = rest_stripped; + "http://" + } else { + "http://" }; - - let modified_rest = parsed_url.as_str(); - - let (address, bucket) = modified_rest.split_once('/').unwrap(); + let (address, bucket) = rest.split_once('/').unwrap(); #[cfg(madsim)] let builder = aws_sdk_s3::config::Builder::new().credentials_provider( From a765473ef6214d10d5fd647634624ca93d4a9a68 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 11 Apr 2024 10:41:12 +0800 Subject: [PATCH 14/14] do not change cargo.lock --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f54b3a5141a99..218ca2bb9f0ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1410,11 +1410,11 @@ dependencies = [ [[package]] name = "backon" -version = "0.4.3" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458" +checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" dependencies = [ - "fastrand 2.0.1", + "fastrand 1.9.0", "futures-core", "pin-project", "tokio", @@ -8250,7 +8250,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi",