Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): allow minio to use aws sdk or opendal #15208

Merged
merged 18 commits into from
Apr 17, 2024
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,22 @@ pub async fn build_remote_object_store(
set your endpoint to the environment variable RW_S3_ENDPOINT.");
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
),
minio if minio.starts_with("minio://") => {
if config.s3.developer.use_opendal {
tracing::info!("Using OpenDAL to access minio.");
ObjectStoreImpl::Opendal(
OpendalObjectStore::with_minio(minio, config.clone())
.unwrap()
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
)
}
}
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum EngineType {
Memory,
Hdfs,
Gcs,
Minio,
S3,
Obs,
Oss,
Expand All @@ -64,6 +65,7 @@ impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
Expand Down Expand Up @@ -201,6 +203,7 @@ impl ObjectStore for OpendalObjectStore {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
Expand Down
50 changes: 50 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,56 @@ impl OpendalObjectStore {
})
}

/// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let mut builder = S3::default();
builder
.bucket(bucket)
.region("custom")
.access_key_id(access_key_id)
.secret_access_key(secret_access_key)
.endpoint(&format!("{}{}", endpoint_prefix, address));

builder.disable_config_load();
let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::Minio,
})
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

Expand Down
1 change: 1 addition & 0 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ impl S3ObjectStore {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
Expand Down
Loading