diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8ccf09c1a279f..a4d827db84dfd 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1059,6 +1059,9 @@ pub struct ObjectStoreConfig { // TODO: the following field will be deprecated after opendal is stablized #[serde(default)] pub opendal_writer_abort_on_err: bool, + + #[serde(default = "default::object_store_config::upload_part_size")] + pub upload_part_size: usize, } impl ObjectStoreConfig { @@ -2039,6 +2042,11 @@ pub mod default { 8 } + pub fn upload_part_size() -> usize { + // 16m + 16 * 1024 * 1024 + } + pub mod s3 { const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5; diff --git a/src/config/example.toml b/src/config/example.toml index 0e33ba465c9ae..67acc0acf5afe 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -193,6 +193,7 @@ recent_filter_rotate_interval_ms = 10000 set_atomic_write_dir = false opendal_upload_concurrency = 8 opendal_writer_abort_on_err = false +upload_part_size = 16777216 [storage.object_store.retry] req_backoff_interval_ms = 1000 diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index b206fdb5dd17d..035df9562a38d 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -384,12 +384,11 @@ impl MonitoredStreamingUploader { let operation_type_str = operation_type.as_str(); let data_len = data.len(); - let res = - // TODO: we should avoid this special case after fully migrating to opeandal for s3. - self.inner - .write_bytes(data) - .verbose_instrument_await(operation_type_str) - .await; + let res = self + .inner + .write_bytes(data) + .verbose_instrument_await(operation_type_str) + .await; try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 077d4179e06c9..6f5cd4fc9417a 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -71,13 +71,6 @@ type PartId = i32; /// MinIO and S3 share the same minimum part ID and part size. const MIN_PART_ID: PartId = 1; -/// The minimum number of bytes that is buffered before they are uploaded as a part. -/// Its value must be greater than the minimum part size of 5MiB. -/// -/// Reference: -const S3_PART_SIZE: usize = 16 * 1024 * 1024; -// TODO: we should do some benchmark to determine the proper part size for MinIO -const MINIO_PART_SIZE: usize = 16 * 1024 * 1024; /// Stop multipart uploads that don't complete within a specified number of days after being /// initiated. (Day is the smallest granularity) const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1; @@ -113,11 +106,21 @@ impl S3StreamingUploader { pub fn new( client: Client, bucket: String, - part_size: usize, key: String, metrics: Arc, config: Arc, ) -> S3StreamingUploader { + /// The minimum number of bytes that is buffered before they are uploaded as a part. + /// Its value must be greater than the minimum part size of 5MiB. + /// + /// Reference: + const MIN_PART_SIZE: usize = 5 * 1024 * 1024; + const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024; + let part_size = config + .upload_part_size + .min(MAX_PART_SIZE) + .max(MIN_PART_SIZE); + Self { client, bucket, @@ -391,7 +394,6 @@ fn get_upload_body(data: Vec) -> ByteStream { pub struct S3ObjectStore { client: Client, bucket: String, - part_size: usize, /// For S3 specific metrics. metrics: Arc, @@ -435,7 +437,6 @@ impl ObjectStore for S3ObjectStore { Ok(S3StreamingUploader::new( self.client.clone(), self.bucket.clone(), - self.part_size, path.to_string(), self.metrics.clone(), self.config.clone(), @@ -714,7 +715,6 @@ impl S3ObjectStore { Self { client, bucket, - part_size: S3_PART_SIZE, metrics, config, } @@ -777,7 +777,6 @@ impl S3ObjectStore { Self { client, bucket: bucket.to_string(), - part_size: MINIO_PART_SIZE, metrics, config: object_store_config, }