Skip to content

Commit

Permalink
chore(core): Use reader's chunk size instead
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Apr 15, 2024
1 parent 0625bce commit a3c7d76
Showing 1 changed file with 10 additions and 24 deletions.
34 changes: 10 additions & 24 deletions core/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ impl Reader {
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_futures_io_async_read(self, range: Range<u64>) -> FuturesIoAsyncReader {
FuturesIoAsyncReader::new(self.inner, range)
FuturesIoAsyncReader::new(self.inner, self.options.chunk(), range)
}

/// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_futures_bytes_stream(self, range: Range<u64>) -> FuturesBytesStream {
FuturesBytesStream::new(self.inner, range)
FuturesBytesStream::new(self.inner, self.options.chunk(), range)
}
}

Expand Down Expand Up @@ -292,7 +292,7 @@ pub mod into_futures_async_read {
state: State,
offset: u64,
size: u64,
cap: usize,
chunk: usize,

cur: u64,
buf: Buffer,
Expand All @@ -311,24 +311,17 @@ pub mod into_futures_async_read {
impl FuturesIoAsyncReader {
/// NOTE: don't allow users to create FuturesAsyncReader directly.
#[inline]
pub(super) fn new(r: oio::Reader, range: Range<u64>) -> Self {
pub(super) fn new(r: oio::Reader, chunk: Option<usize>, range: Range<u64>) -> Self {
FuturesIoAsyncReader {
state: State::Idle(Some(r)),
offset: range.start,
size: range.end - range.start,
// TODO: should use services preferred io size.
cap: 8 * 1024 * 1024,
chunk: chunk.unwrap_or(8 * 1024 * 1024),

cur: 0,
buf: Buffer::new(),
}
}

/// Set the capacity of this reader to control the IO size.
pub fn with_capacity(mut self, cap: usize) -> Self {
self.cap = cap;
self
}
}

impl AsyncBufRead for FuturesIoAsyncReader {
Expand All @@ -348,7 +341,7 @@ pub mod into_futures_async_read {

let r = r.take().expect("reader must be present");
let next_offset = this.offset + this.cur;
let next_size = (this.size - this.cur).min(this.cap as u64) as usize;
let next_size = (this.size - this.cur).min(this.chunk as u64) as usize;
let fut = async move {
let res = r.read_at_dyn(next_offset, next_size).await;
(r, res)
Expand Down Expand Up @@ -448,7 +441,7 @@ pub mod into_futures_stream {
state: State,
offset: u64,
size: u64,
cap: usize,
chunk: usize,

cur: u64,
}
Expand All @@ -466,24 +459,17 @@ pub mod into_futures_stream {
impl FuturesBytesStream {
/// NOTE: don't allow users to create FuturesStream directly.
#[inline]
pub(crate) fn new(r: oio::Reader, range: Range<u64>) -> Self {
pub(crate) fn new(r: oio::Reader, chunk: Option<usize>, range: Range<u64>) -> Self {
FuturesBytesStream {
r,
state: State::Idle(Buffer::new()),
offset: range.start,
size: range.end - range.start,
// TODO: should use services preferred io size.
cap: 4 * 1024 * 1024,
chunk: chunk.unwrap_or(8 * 1024 * 1024),

cur: 0,
}
}

/// Set the capacity of this reader to control the IO size.
pub fn with_capacity(mut self, cap: usize) -> Self {
self.cap = cap;
self
}
}

impl Stream for FuturesBytesStream {
Expand All @@ -507,7 +493,7 @@ pub mod into_futures_stream {

let r = this.r.clone();
let next_offset = this.offset + this.cur;
let next_size = (this.size - this.cur).min(this.cap as u64) as usize;
let next_size = (this.size - this.cur).min(this.chunk as u64) as usize;
let fut = async move { r.read_at_dyn(next_offset, next_size).await };
this.state = State::Next(Box::pin(fut));
}
Expand Down

0 comments on commit a3c7d76

Please sign in to comment.