Skip to content

Commit

Permalink
feat(core): Implement into_stream for Reader
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Apr 12, 2024
1 parent 6697a2c commit 8b60b7c
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 75 deletions.
2 changes: 1 addition & 1 deletion core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ mod tests {
}

async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
Ok((RpRead::new(), Arc::new(bytes::Bytes::new())))
}

async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
3 changes: 2 additions & 1 deletion core/src/layers/type_eraser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use async_trait::async_trait;

Expand Down Expand Up @@ -71,7 +72,7 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
self.inner
.read(path, args)
.await
.map(|(rp, r)| (rp, Box::new(r) as oio::Reader))
.map(|(rp, r)| (rp, Arc::new(r) as oio::Reader))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
33 changes: 32 additions & 1 deletion core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::fmt::Display;
use std::fmt::Formatter;
use std::ops::Deref;
use std::sync::Arc;

use bytes::Bytes;
use futures::Future;
Expand Down Expand Up @@ -60,7 +61,7 @@ impl From<ReadOperation> for &'static str {
}

/// Reader is a type erased [`Read`].
pub type Reader = Box<dyn ReadDyn>;
pub type Reader = Arc<dyn ReadDyn>;

/// Read is the internal trait used by OpenDAL to read data from storage.
///
Expand Down Expand Up @@ -112,14 +113,38 @@ impl Read for Bytes {
}
}

/// ReadDyn is the dyn version of [`Read`] make it possible to use as
/// `Box<dyn ReadDyn>`.
pub trait ReadDyn: Unpin + Send + Sync {
/// The dyn version of [`Read::read_at`].
///
/// This function returns a boxed future to make it object safe.
fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture<Result<Buffer>>;

/// The static version of [`Read::read_at`].
///
/// This function returns a `'static` future by moving `self` into the
/// future. Caller can call `Box::pin` to build a static boxed future.
fn read_at_static(
self,
offset: u64,
limit: usize,
) -> impl Future<Output = Result<Buffer>> + Send + 'static
where
Self: Sized + 'static;
}

impl<T: Read + ?Sized> ReadDyn for T {
fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture<Result<Buffer>> {
Box::pin(self.read_at(offset, limit))
}

async fn read_at_static(self, offset: u64, limit: usize) -> Result<Buffer>
where
Self: Sized + 'static,
{
self.read_at(offset, limit).await
}
}

/// # NOTE
Expand All @@ -132,6 +157,12 @@ impl<T: ReadDyn + ?Sized> Read for Box<T> {
}
}

impl<T: ReadDyn + ?Sized> Read for Arc<T> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.deref().read_at_dyn(offset, limit).await
}
}

/// BlockingReader is a boxed dyn `BlockingRead`.
pub type BlockingReader = Box<dyn BlockingRead>;

Expand Down
1 change: 1 addition & 0 deletions core/src/raw/oio/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ mod api;
pub use api::BlockingRead;
pub use api::BlockingReader;
pub use api::Read;
pub use api::ReadDyn;
pub use api::ReadOperation;
pub use api::Reader;
170 changes: 98 additions & 72 deletions core/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Bound;
use std::ops::Range;
use std::ops::RangeBounds;

use bytes::Buf;
use bytes::BufMut;
use futures::stream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;

use crate::raw::*;
use crate::*;
Expand All @@ -36,6 +38,7 @@ use crate::*;
/// ## Direct
///
/// [`Reader`] provides public API including [`Reader::read`], [`Reader:read_range`], and [`Reader::read_to_end`]. You can use those APIs directly without extra copy.
#[derive(Clone)]
pub struct Reader {
inner: oio::Reader,
}
Expand Down Expand Up @@ -63,43 +66,8 @@ impl Reader {
///
/// - Buffer length smaller than range means we have reached the end of file.
pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Bound::Unbounded => 0,
};

let end = match range.end_bound().cloned() {
Bound::Included(end) => Some(end + 1),
Bound::Excluded(end) => Some(end),
Bound::Unbounded => None,
};

// If range is empty, return Ok(0) directly.
if let Some(end) = end {
if end <= start {
return Ok(Buffer::new());
}
}

let mut bufs = Vec::new();
let mut offset = start;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at_dyn(offset, limit).await?;
let n = bs.remaining();
bufs.push(bs);
if n < limit {
return Ok(bufs.into_iter().flatten().collect());
}

offset += n as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
}
}
let bufs: Vec<_> = self.into_stream(range).try_collect().await?;
Ok(bufs.into_iter().flatten().collect())
}

/// Read all data from reader into given [`BufMut`].
Expand All @@ -115,46 +83,28 @@ impl Reader {
buf: &mut impl BufMut,
range: impl RangeBounds<u64>,
) -> Result<usize> {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Bound::Unbounded => 0,
};

let end = match range.end_bound().cloned() {
Bound::Included(end) => Some(end + 1),
Bound::Excluded(end) => Some(end),
Bound::Unbounded => None,
};

// If range is empty, return Ok(0) directly.
if let Some(end) = end {
if end <= start {
return Ok(0);
}
}
let mut stream = self.into_stream(range);

let mut offset = start;
let mut read = 0;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at_dyn(offset, limit).await?;
let n = bs.remaining();
let Some(bs) = stream.try_next().await? else {
return Ok(read);
};
read += bs.len();
buf.put(bs);
read += n as u64;
if n < limit {
return Ok(read as _);
}

offset += n as u64;
if Some(offset) == end {
return Ok(read as _);
}
}
}

/// Create a buffer stream to read specific range from given reader.
pub fn into_stream(
&self,
range: impl RangeBounds<u64>,
) -> impl Stream<Item = Result<Buffer>> + Unpin + Send + 'static {
let futs = into_stream::ReadFutureIterator::new(self.inner.clone(), range);

stream::iter(futs).then(|f| f)
}

/// Convert reader into [`FuturesIoAsyncReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
Expand All @@ -170,6 +120,82 @@ impl Reader {
}
}

pub mod into_stream {
use std::sync::atomic::Ordering;
use std::{
ops::{Bound, RangeBounds},
sync::{atomic::AtomicBool, Arc},
};

use crate::raw::oio::ReadDyn;
use crate::raw::*;
use crate::*;

pub struct ReadFutureIterator {
r: oio::Reader,

offset: u64,
end: Option<u64>,
finished: Arc<AtomicBool>,
}

impl ReadFutureIterator {
pub fn new(r: oio::Reader, range: impl RangeBounds<u64>) -> Self {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Bound::Unbounded => 0,
};
let end = match range.end_bound().cloned() {
Bound::Included(end) => Some(end + 1),
Bound::Excluded(end) => Some(end),
Bound::Unbounded => None,
};

ReadFutureIterator {
r,
offset: start,
end,
finished: Arc::default(),
}
}
}

impl Iterator for ReadFutureIterator {
type Item = BoxedFuture<'static, Result<Buffer>>;

fn next(&mut self) -> Option<Self::Item> {
if self.offset >= self.end.unwrap_or(u64::MAX) {
return None;
}
if self.finished.load(Ordering::Relaxed) {
return None;
}

let offset = self.offset;
let limit = self
.end
.map(|end| end - self.offset)
.unwrap_or(4 * 1024 * 1024) as usize;
let finished = self.finished.clone();
let r = self.r.clone();

// Update self.offset before building future.
self.offset += limit as u64;
let fut = async move {
let buf = r.read_at_static(offset, limit).await?;
if buf.len() < limit || limit == 0 {
// Update finished marked if buf is less than limit.
finished.store(true, Ordering::Relaxed);
}
Ok(buf)
};

Some(Box::pin(fut))
}
}
}

pub mod into_futures_async_read {
use std::io;
use std::io::SeekFrom;
Expand Down

0 comments on commit 8b60b7c

Please sign in to comment.