From a3c7d76a1b837cb784adb3ade43f2206e27d6307 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 15 Apr 2024 18:49:17 +0800 Subject: [PATCH] chore(core): Use reader's chunk size instead Signed-off-by: Xuanwo --- core/src/types/reader.rs | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index cd932e91df1d..52f820afa954 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -175,14 +175,14 @@ impl Reader { /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. #[inline] pub fn into_futures_io_async_read(self, range: Range) -> FuturesIoAsyncReader { - FuturesIoAsyncReader::new(self.inner, range) + FuturesIoAsyncReader::new(self.inner, self.options.chunk(), range) } /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`], /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. #[inline] pub fn into_futures_bytes_stream(self, range: Range) -> FuturesBytesStream { - FuturesBytesStream::new(self.inner, range) + FuturesBytesStream::new(self.inner, self.options.chunk(), range) } } @@ -292,7 +292,7 @@ pub mod into_futures_async_read { state: State, offset: u64, size: u64, - cap: usize, + chunk: usize, cur: u64, buf: Buffer, @@ -311,24 +311,17 @@ pub mod into_futures_async_read { impl FuturesIoAsyncReader { /// NOTE: don't allow users to create FuturesAsyncReader directly. #[inline] - pub(super) fn new(r: oio::Reader, range: Range) -> Self { + pub(super) fn new(r: oio::Reader, chunk: Option, range: Range) -> Self { FuturesIoAsyncReader { state: State::Idle(Some(r)), offset: range.start, size: range.end - range.start, - // TODO: should use services preferred io size. - cap: 8 * 1024 * 1024, + chunk: chunk.unwrap_or(8 * 1024 * 1024), cur: 0, buf: Buffer::new(), } } - - /// Set the capacity of this reader to control the IO size. - pub fn with_capacity(mut self, cap: usize) -> Self { - self.cap = cap; - self - } } impl AsyncBufRead for FuturesIoAsyncReader { @@ -348,7 +341,7 @@ pub mod into_futures_async_read { let r = r.take().expect("reader must be present"); let next_offset = this.offset + this.cur; - let next_size = (this.size - this.cur).min(this.cap as u64) as usize; + let next_size = (this.size - this.cur).min(this.chunk as u64) as usize; let fut = async move { let res = r.read_at_dyn(next_offset, next_size).await; (r, res) @@ -448,7 +441,7 @@ pub mod into_futures_stream { state: State, offset: u64, size: u64, - cap: usize, + chunk: usize, cur: u64, } @@ -466,24 +459,17 @@ pub mod into_futures_stream { impl FuturesBytesStream { /// NOTE: don't allow users to create FuturesStream directly. #[inline] - pub(crate) fn new(r: oio::Reader, range: Range) -> Self { + pub(crate) fn new(r: oio::Reader, chunk: Option, range: Range) -> Self { FuturesBytesStream { r, state: State::Idle(Buffer::new()), offset: range.start, size: range.end - range.start, - // TODO: should use services preferred io size. - cap: 4 * 1024 * 1024, + chunk: chunk.unwrap_or(8 * 1024 * 1024), cur: 0, } } - - /// Set the capacity of this reader to control the IO size. - pub fn with_capacity(mut self, cap: usize) -> Self { - self.cap = cap; - self - } } impl Stream for FuturesBytesStream { @@ -507,7 +493,7 @@ pub mod into_futures_stream { let r = this.r.clone(); let next_offset = this.offset + this.cur; - let next_size = (this.size - this.cur).min(this.cap as u64) as usize; + let next_size = (this.size - this.cur).min(this.chunk as u64) as usize; let fut = async move { r.read_at_dyn(next_offset, next_size).await }; this.state = State::Next(Box::pin(fut)); }