Skip to content

Commit

Permalink
support None streaming read chunk size
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Jun 4, 2024
1 parent fb1c80c commit 86cd6e8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
8 changes: 2 additions & 6 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ pub struct S3ObjectStoreDeveloperConfig {
#[serde(default = "default::object_store_config::s3::developer::upload_concurrency")]
pub upload_concurrency: usize,

#[serde(default = "default::object_store_config::s3::developer::streaming_read_buffer_size")]
pub streaming_read_buffer_size: usize,
#[serde(default)]
pub streaming_read_buffer_size: Some(usize),
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
Expand Down Expand Up @@ -1890,10 +1890,6 @@ pub mod default {
pub fn upload_concurrency() -> usize {
8
}

pub fn streaming_read_buffer_size() -> usize {
512 * (1 << 10)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl ObjectStore for OpendalObjectStore {
ObjectError::internal("opendal streaming read error")
));
let range: Range<u64> = (range.start as u64)..(range.end as u64);
let reader = self
let reader_fut = self
.op
.clone()
.layer(
Expand All @@ -158,9 +158,12 @@ impl ObjectStore for OpendalObjectStore {
.layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
self.config.retry.streaming_read_attempt_timeout_ms,
)))
.reader_with(path)
.chunk(self.config.s3.developer.streaming_read_buffer_size)
.await?;
.reader_with(path);
let reader = if let Some(streaming_read_buffer_size) = self.config.s3.developer.streaming_read_buffer_size {
reader_fut.chunk(streaming_read_buffer_size).await?
} else {
reader_fut.await?
};
let stream = reader.into_bytes_stream(range).await?.map(|item| {
item.map(|b| Bytes::copy_from_slice(b.as_ref())).map_err(|e| {
ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
Expand Down

0 comments on commit 86cd6e8

Please sign in to comment.