Skip to content

Commit

Permalink
feat(storage): Supports configurable upload part size (#17793) (#17794)
Browse files Browse the repository at this point in the history
Co-authored-by: Li0k <[email protected]>
  • Loading branch information
github-actions[bot] and Li0k authored Jul 24, 2024
1 parent b94ace5 commit a2ef134
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,9 @@ pub struct ObjectStoreConfig {
// TODO: the following field will be deprecated after opendal is stablized
#[serde(default)]
pub opendal_writer_abort_on_err: bool,

#[serde(default = "default::object_store_config::upload_part_size")]
pub upload_part_size: usize,
}

impl ObjectStoreConfig {
Expand Down Expand Up @@ -2039,6 +2042,11 @@ pub mod default {
8
}

pub fn upload_part_size() -> usize {
// 16m
16 * 1024 * 1024
}

pub mod s3 {
const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ recent_filter_rotate_interval_ms = 10000
set_atomic_write_dir = false
opendal_upload_concurrency = 8
opendal_writer_abort_on_err = false
upload_part_size = 16777216

[storage.object_store.retry]
req_backoff_interval_ms = 1000
Expand Down
11 changes: 5 additions & 6 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,11 @@ impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
let operation_type_str = operation_type.as_str();
let data_len = data.len();

let res =
// TODO: we should avoid this special case after fully migrating to opeandal for s3.
self.inner
.write_bytes(data)
.verbose_instrument_await(operation_type_str)
.await;
let res = self
.inner
.write_bytes(data)
.verbose_instrument_await(operation_type_str)
.await;

try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);

Expand Down
23 changes: 11 additions & 12 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ type PartId = i32;

/// MinIO and S3 share the same minimum part ID and part size.
const MIN_PART_ID: PartId = 1;
/// The minimum number of bytes that is buffered before they are uploaded as a part.
/// Its value must be greater than the minimum part size of 5MiB.
///
/// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const S3_PART_SIZE: usize = 16 * 1024 * 1024;
// TODO: we should do some benchmark to determine the proper part size for MinIO
const MINIO_PART_SIZE: usize = 16 * 1024 * 1024;
/// Stop multipart uploads that don't complete within a specified number of days after being
/// initiated. (Day is the smallest granularity)
const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
Expand Down Expand Up @@ -113,11 +106,21 @@ impl S3StreamingUploader {
pub fn new(
client: Client,
bucket: String,
part_size: usize,
key: String,
metrics: Arc<ObjectStoreMetrics>,
config: Arc<ObjectStoreConfig>,
) -> S3StreamingUploader {
/// The minimum number of bytes that is buffered before they are uploaded as a part.
/// Its value must be greater than the minimum part size of 5MiB.
///
/// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024;
let part_size = config
.upload_part_size
.min(MAX_PART_SIZE)
.max(MIN_PART_SIZE);

Self {
client,
bucket,
Expand Down Expand Up @@ -391,7 +394,6 @@ fn get_upload_body(data: Vec<Bytes>) -> ByteStream {
pub struct S3ObjectStore {
client: Client,
bucket: String,
part_size: usize,
/// For S3 specific metrics.
metrics: Arc<ObjectStoreMetrics>,

Expand Down Expand Up @@ -435,7 +437,6 @@ impl ObjectStore for S3ObjectStore {
Ok(S3StreamingUploader::new(
self.client.clone(),
self.bucket.clone(),
self.part_size,
path.to_string(),
self.metrics.clone(),
self.config.clone(),
Expand Down Expand Up @@ -714,7 +715,6 @@ impl S3ObjectStore {
Self {
client,
bucket,
part_size: S3_PART_SIZE,
metrics,
config,
}
Expand Down Expand Up @@ -777,7 +777,6 @@ impl S3ObjectStore {
Self {
client,
bucket: bucket.to_string(),
part_size: MINIO_PART_SIZE,
metrics,
config: object_store_config,
}
Expand Down

0 comments on commit a2ef134

Please sign in to comment.