Skip to content

Commit

Permalink
feat(storage): allow minio to use aws sdk or opendal (#15208)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Apr 17, 2024
1 parent 9ab904f commit d2c2885
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
21 changes: 16 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,22 @@ pub async fn build_remote_object_store(
set your endpoint to the environment variable RW_S3_ENDPOINT.");
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
),
minio if minio.starts_with("minio://") => {
if config.s3.developer.use_opendal {
tracing::info!("Using OpenDAL to access minio.");
ObjectStoreImpl::Opendal(
OpendalObjectStore::with_minio(minio, config.clone())
.unwrap()
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
)
}
}
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum EngineType {
Memory,
Hdfs,
Gcs,
Minio,
S3,
Obs,
Oss,
Expand All @@ -64,6 +65,7 @@ impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
Expand Down Expand Up @@ -201,6 +203,7 @@ impl ObjectStore for OpendalObjectStore {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
Expand Down
50 changes: 50 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,56 @@ impl OpendalObjectStore {
})
}

/// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let mut builder = S3::default();
builder
.bucket(bucket)
.region("custom")
.access_key_id(access_key_id)
.secret_access_key(secret_access_key)
.endpoint(&format!("{}{}", endpoint_prefix, address));

builder.disable_config_load();
let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::Minio,
})
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

Expand Down
1 change: 1 addition & 0 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ impl S3ObjectStore {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
Expand Down

0 comments on commit d2c2885

Please sign in to comment.