Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): allow minio to use aws sdk or opendal #15208

Merged
merged 18 commits into from
Apr 17, 2024
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
@@ -889,11 +889,22 @@ pub async fn build_remote_object_store(
set your endpoint to the environment variable RW_S3_ENDPOINT.");
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
),
minio if minio.starts_with("minio://") => {
if config.s3.developer.use_opendal {
tracing::info!("Using OpenDAL to access minio.");
ObjectStoreImpl::Opendal(
OpendalObjectStore::with_minio(minio, config.clone())
.unwrap()
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
)
}
}
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ pub enum EngineType {
Memory,
Hdfs,
Gcs,
Minio,
S3,
Obs,
Oss,
@@ -64,6 +65,7 @@ impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
@@ -201,6 +203,7 @@ impl ObjectStore for OpendalObjectStore {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
48 changes: 48 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
@@ -65,6 +65,54 @@ impl OpendalObjectStore {
})
}

/// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it can be better to use the url crate to parse the URL here instead of doing it manually. Less code and less possibility of security problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm just tried, but the here it's not a normal url(127.0.0.1:9301/hummock001, instead of http://127.0.0.1:9301/hummock001), so just keep it the same as before.

Copy link
Member

@stdrc stdrc Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"minio://" is part of the URL, which will result in parsed_url.scheme() == "minio".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change the use of all state store urls in later PR.

let mut builder = S3::default();
builder
.bucket(bucket)
.region("custom")
.access_key_id(access_key_id)
.secret_access_key(secret_access_key)
.endpoint(&format!("{}{}", endpoint_prefix, address));

builder.disable_config_load();
let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::Minio,
})
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();