From 86cd6e864168c78ff48bf7296a7fe6fc35f7f37f Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Tue, 4 Jun 2024 21:53:19 +0800 Subject: [PATCH] support None streaming read chunk size --- src/common/src/config.rs | 8 ++------ .../src/object/opendal_engine/opendal_object_store.rs | 11 +++++++---- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index e0b10623d1fbf..87d236500bdf6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1038,8 +1038,8 @@ pub struct S3ObjectStoreDeveloperConfig { #[serde(default = "default::object_store_config::s3::developer::upload_concurrency")] pub upload_concurrency: usize, - #[serde(default = "default::object_store_config::s3::developer::streaming_read_buffer_size")] - pub streaming_read_buffer_size: usize, + #[serde(default)] + pub streaming_read_buffer_size: Some(usize), } #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] @@ -1890,10 +1890,6 @@ pub mod default { pub fn upload_concurrency() -> usize { 8 } - - pub fn streaming_read_buffer_size() -> usize { - 512 * (1 << 10) - } } } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 929d218de5e81..ddc0ad89a8987 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -140,7 +140,7 @@ impl ObjectStore for OpendalObjectStore { ObjectError::internal("opendal streaming read error") )); let range: Range = (range.start as u64)..(range.end as u64); - let reader = self + let reader_fut = self .op .clone() .layer( @@ -158,9 +158,12 @@ impl ObjectStore for OpendalObjectStore { .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( self.config.retry.streaming_read_attempt_timeout_ms, ))) - .reader_with(path) - .chunk(self.config.s3.developer.streaming_read_buffer_size) - .await?; + .reader_with(path); + let reader = if let Some(streaming_read_buffer_size) = self.config.s3.developer.streaming_read_buffer_size { + reader_fut.chunk(streaming_read_buffer_size).await? + } else { + reader_fut.await? + }; let stream = reader.into_bytes_stream(range).await?.map(|item| { item.map(|b| Bytes::copy_from_slice(b.as_ref())).map_err(|e| { ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))