diff --git a/Cargo.lock b/Cargo.lock index f54b3a5141a99..46c43e8c4c430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10037,6 +10037,7 @@ dependencies = [ "thiserror-ext", "tokio-retry", "tracing", + "url", ] [[package]] diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 29144c2cab882..17cc5ffc99dbc 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -38,6 +38,7 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] } tokio-retry = "0.3" tracing = "0.1" +url = "2" # This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...## [target.'cfg(not(madsim))'.dependencies] # workspace-hack = { path = "../workspace-hack" } # diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6fe4f772d2995..5a293086a98c8 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -19,6 +19,7 @@ use opendal::raw::HttpClient; use opendal::services::S3; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; +use url::Url; use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; @@ -69,17 +70,20 @@ impl OpendalObjectStore { pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); - let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { - rest = rest_stripped; - "https://" - } else if let Some(rest_stripped) = rest.strip_prefix("http://") { - rest = rest_stripped; - "http://" - } else { - "http://" + let (secret_access_key, rest) = rest.split_once('@').unwrap(); + let mut parsed_url = Url::parse(rest).unwrap(); + let endpoint_prefix = match parsed_url.scheme() { + "https" => { + parsed_url.set_scheme("http").unwrap(); + "https://" + } + "http" => "http://", + _ => "http://", }; - let (address, bucket) = rest.split_once('/').unwrap(); + + let modified_rest = parsed_url.as_str(); + + let (address, bucket) = modified_rest.split_once('/').unwrap(); let mut builder = S3::default(); builder .bucket(bucket) diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 196638f1e3689..2652ca0cf5a58 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -50,6 +50,7 @@ use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use url::Url; use super::object_metrics::ObjectStoreMetrics; use super::{ @@ -644,17 +645,20 @@ impl S3ObjectStore { ) -> Self { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); - let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); - let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { - rest = rest_stripped; - "https://" - } else if let Some(rest_stripped) = rest.strip_prefix("http://") { - rest = rest_stripped; - "http://" - } else { - "http://" + let (secret_access_key, rest) = rest.split_once('@').unwrap(); + let mut parsed_url = Url::parse(rest).unwrap(); + let endpoint_prefix = match parsed_url.scheme() { + "https" => { + parsed_url.set_scheme("http").unwrap(); + "https://" + } + "http" => "http://", + _ => "http://", }; - let (address, bucket) = rest.split_once('/').unwrap(); + + let modified_rest = parsed_url.as_str(); + + let (address, bucket) = modified_rest.split_once('/').unwrap(); #[cfg(madsim)] let builder = aws_sdk_s3::config::Builder::new().credentials_provider(