Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): allow minio to use aws sdk or opendal #15208

Merged
merged 18 commits into from
Apr 17, 2024
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,22 @@ pub async fn build_remote_object_store(
set your endpoint to the environment variable RW_S3_ENDPOINT.");
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
),
minio if minio.starts_with("minio://") => {
if env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
)
} else {
tracing::info!("Using OpenDAL to access minio.");
ObjectStoreImpl::Opendal(
OpendalObjectStore::with_minio(minio, config.clone())
.unwrap()
.monitored(metrics, config),
)
}
}
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum EngineType {
Memory,
Hdfs,
Gcs,
Minio,
S3,
Obs,
Oss,
Expand Down Expand Up @@ -191,6 +192,7 @@ impl ObjectStore for OpendalObjectStore {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
Expand Down
46 changes: 45 additions & 1 deletion src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl OpendalObjectStore {
if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
builder.enable_virtual_host_style();
}

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
Expand All @@ -60,4 +59,49 @@ impl OpendalObjectStore {
engine_type: EngineType::S3,
})
}

/// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it can be better to use the url crate to parse the URL here instead of doing it manually. Less code and less possibility of security problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm just tried, but the here it's not a normal url(127.0.0.1:9301/hummock001, instead of http://127.0.0.1:9301/hummock001), so just keep it the same as before.

Copy link
Member

@stdrc stdrc Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"minio://" is part of the URL, which will result in parsed_url.scheme() == "minio".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change the use of all state store urls in later PR.

let mut builder = S3::default();
builder
.bucket(bucket)
.region("custom")
.access_key_id(access_key_id)
.secret_access_key(secret_access_key)
.endpoint(&format!("{}{}", endpoint_prefix, address));

builder.disable_config_load();
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();
Ok(Self {
op,
engine_type: EngineType::Minio,
})
}
}
4 changes: 2 additions & 2 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ type PartId = i32;

/// MinIO and S3 share the same minimum part ID and part size.
const MIN_PART_ID: PartId = 1;
// TODO: we should do some benchmark to determine the proper part size for MinIO
const MINIO_PART_SIZE: usize = 16 * 1024 * 1024;
/// The minimum number of bytes that is buffered before they are uploaded as a part.
/// Its value must be greater than the minimum part size of 5MiB.
///
/// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const S3_PART_SIZE: usize = 16 * 1024 * 1024;
// TODO: we should do some benchmark to determine the proper part size for MinIO
const MINIO_PART_SIZE: usize = 16 * 1024 * 1024;
/// The number of S3/MinIO bucket prefixes
const NUM_BUCKET_PREFIXES: u32 = 256;
/// Stop multipart uploads that don't complete within a specified number of days after being
Expand Down
Loading