Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): allow minio to use aws sdk or opendal #15208

Merged
merged 18 commits into from
Apr 17, 2024
Prev Previous commit
Next Next commit
rollback
  • Loading branch information
wcy-fdu committed Apr 10, 2024
commit fa1c7dcf65c1cd4306192ce419de298670e884d9
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -38,7 +38,6 @@ thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] }
tokio-retry = "0.3"
tracing = "0.1"
url = "2"
# This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...## [target.'cfg(not(madsim))'.dependencies]
# workspace-hack = { path = "../workspace-hack" }
#
24 changes: 11 additions & 13 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ use opendal::raw::HttpClient;
use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
use url::Url;

use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;
@@ -70,20 +69,19 @@ impl OpendalObjectStore {
pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, rest) = rest.split_once('@').unwrap();
let mut parsed_url = Url::parse(rest).unwrap();
let endpoint_prefix = match parsed_url.scheme() {
"https" => {
parsed_url.set_scheme("http").unwrap();
"https://"
}
"http" => "http://",
_ => "http://",
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let modified_rest = parsed_url.as_str();

let (address, bucket) = modified_rest.split_once('/').unwrap();
let mut builder = S3::default();
builder
.bucket(bucket)
25 changes: 11 additions & 14 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@ use risingwave_common::range::RangeBoundsExt;
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use url::Url;

use super::object_metrics::ObjectStoreMetrics;
use super::{
@@ -645,20 +644,18 @@ impl S3ObjectStore {
) -> Self {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, rest) = rest.split_once('@').unwrap();
let mut parsed_url = Url::parse(rest).unwrap();
let endpoint_prefix = match parsed_url.scheme() {
"https" => {
parsed_url.set_scheme("http").unwrap();
"https://"
}
"http" => "http://",
_ => "http://",
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};

let modified_rest = parsed_url.as_str();

let (address, bucket) = modified_rest.split_once('/').unwrap();
let (address, bucket) = rest.split_once('/').unwrap();

#[cfg(madsim)]
let builder = aws_sdk_s3::config::Builder::new().credentials_provider(