From 8b60b7c24aa71061002cd2cff1a27cac134ddca4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 12 Apr 2024 18:40:22 +0800 Subject: [PATCH] feat(core): Implement into_stream for Reader Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 2 +- core/src/layers/type_eraser.rs | 3 +- core/src/raw/oio/read/api.rs | 33 ++++++- core/src/raw/oio/read/mod.rs | 1 + core/src/types/reader.rs | 170 +++++++++++++++++++-------------- 5 files changed, 134 insertions(+), 75 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 228d68e2ceb2..b51dfc22245e 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -734,7 +734,7 @@ mod tests { } async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { - Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) + Ok((RpRead::new(), Arc::new(bytes::Bytes::new()))) } async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs index 37a0444eca5e..51bb9ba258ee 100644 --- a/core/src/layers/type_eraser.rs +++ b/core/src/layers/type_eraser.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::Arc; use async_trait::async_trait; @@ -71,7 +72,7 @@ impl LayeredAccessor for TypeEraseAccessor { self.inner .read(path, args) .await - .map(|(rp, r)| (rp, Box::new(r) as oio::Reader)) + .map(|(rp, r)| (rp, Arc::new(r) as oio::Reader)) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 0cfe3bfd5959..4a050f33e4e5 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -18,6 +18,7 @@ use std::fmt::Display; use std::fmt::Formatter; use std::ops::Deref; +use std::sync::Arc; use bytes::Bytes; use futures::Future; @@ -60,7 +61,7 @@ impl From for &'static str { } /// Reader is a type erased [`Read`]. -pub type Reader = Box; +pub type Reader = Arc; /// Read is the internal trait used by OpenDAL to read data from storage. /// @@ -112,14 +113,38 @@ impl Read for Bytes { } } +/// ReadDyn is the dyn version of [`Read`] make it possible to use as +/// `Box`. pub trait ReadDyn: Unpin + Send + Sync { + /// The dyn version of [`Read::read_at`]. + /// + /// This function returns a boxed future to make it object safe. fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture>; + + /// The static version of [`Read::read_at`]. + /// + /// This function returns a `'static` future by moving `self` into the + /// future. Caller can call `Box::pin` to build a static boxed future. + fn read_at_static( + self, + offset: u64, + limit: usize, + ) -> impl Future> + Send + 'static + where + Self: Sized + 'static; } impl ReadDyn for T { fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture> { Box::pin(self.read_at(offset, limit)) } + + async fn read_at_static(self, offset: u64, limit: usize) -> Result + where + Self: Sized + 'static, + { + self.read_at(offset, limit).await + } } /// # NOTE @@ -132,6 +157,12 @@ impl Read for Box { } } +impl Read for Arc { + async fn read_at(&self, offset: u64, limit: usize) -> Result { + self.deref().read_at_dyn(offset, limit).await + } +} + /// BlockingReader is a boxed dyn `BlockingRead`. pub type BlockingReader = Box; diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 3144ce874b3a..5ec4114c5121 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -19,5 +19,6 @@ mod api; pub use api::BlockingRead; pub use api::BlockingReader; pub use api::Read; +pub use api::ReadDyn; pub use api::ReadOperation; pub use api::Reader; diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 3ff7cb758860..7ba8122b0eb0 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Bound; use std::ops::Range; use std::ops::RangeBounds; -use bytes::Buf; use bytes::BufMut; +use futures::stream; +use futures::Stream; +use futures::StreamExt; +use futures::TryStreamExt; use crate::raw::*; use crate::*; @@ -36,6 +38,7 @@ use crate::*; /// ## Direct /// /// [`Reader`] provides public API including [`Reader::read`], [`Reader:read_range`], and [`Reader::read_to_end`]. You can use those APIs directly without extra copy. +#[derive(Clone)] pub struct Reader { inner: oio::Reader, } @@ -63,43 +66,8 @@ impl Reader { /// /// - Buffer length smaller than range means we have reached the end of file. pub async fn read(&self, range: impl RangeBounds) -> Result { - let start = match range.start_bound().cloned() { - Bound::Included(start) => start, - Bound::Excluded(start) => start + 1, - Bound::Unbounded => 0, - }; - - let end = match range.end_bound().cloned() { - Bound::Included(end) => Some(end + 1), - Bound::Excluded(end) => Some(end), - Bound::Unbounded => None, - }; - - // If range is empty, return Ok(0) directly. - if let Some(end) = end { - if end <= start { - return Ok(Buffer::new()); - } - } - - let mut bufs = Vec::new(); - let mut offset = start; - - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at_dyn(offset, limit).await?; - let n = bs.remaining(); - bufs.push(bs); - if n < limit { - return Ok(bufs.into_iter().flatten().collect()); - } - - offset += n as u64; - if Some(offset) == end { - return Ok(bufs.into_iter().flatten().collect()); - } - } + let bufs: Vec<_> = self.into_stream(range).try_collect().await?; + Ok(bufs.into_iter().flatten().collect()) } /// Read all data from reader into given [`BufMut`]. @@ -115,46 +83,28 @@ impl Reader { buf: &mut impl BufMut, range: impl RangeBounds, ) -> Result { - let start = match range.start_bound().cloned() { - Bound::Included(start) => start, - Bound::Excluded(start) => start + 1, - Bound::Unbounded => 0, - }; - - let end = match range.end_bound().cloned() { - Bound::Included(end) => Some(end + 1), - Bound::Excluded(end) => Some(end), - Bound::Unbounded => None, - }; - - // If range is empty, return Ok(0) directly. - if let Some(end) = end { - if end <= start { - return Ok(0); - } - } + let mut stream = self.into_stream(range); - let mut offset = start; let mut read = 0; - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at_dyn(offset, limit).await?; - let n = bs.remaining(); + let Some(bs) = stream.try_next().await? else { + return Ok(read); + }; + read += bs.len(); buf.put(bs); - read += n as u64; - if n < limit { - return Ok(read as _); - } - - offset += n as u64; - if Some(offset) == end { - return Ok(read as _); - } } } + /// Create a buffer stream to read specific range from given reader. + pub fn into_stream( + &self, + range: impl RangeBounds, + ) -> impl Stream> + Unpin + Send + 'static { + let futs = into_stream::ReadFutureIterator::new(self.inner.clone(), range); + + stream::iter(futs).then(|f| f) + } + /// Convert reader into [`FuturesIoAsyncReader`] which implements [`futures::AsyncRead`], /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. #[inline] @@ -170,6 +120,82 @@ impl Reader { } } +pub mod into_stream { + use std::sync::atomic::Ordering; + use std::{ + ops::{Bound, RangeBounds}, + sync::{atomic::AtomicBool, Arc}, + }; + + use crate::raw::oio::ReadDyn; + use crate::raw::*; + use crate::*; + + pub struct ReadFutureIterator { + r: oio::Reader, + + offset: u64, + end: Option, + finished: Arc, + } + + impl ReadFutureIterator { + pub fn new(r: oio::Reader, range: impl RangeBounds) -> Self { + let start = match range.start_bound().cloned() { + Bound::Included(start) => start, + Bound::Excluded(start) => start + 1, + Bound::Unbounded => 0, + }; + let end = match range.end_bound().cloned() { + Bound::Included(end) => Some(end + 1), + Bound::Excluded(end) => Some(end), + Bound::Unbounded => None, + }; + + ReadFutureIterator { + r, + offset: start, + end, + finished: Arc::default(), + } + } + } + + impl Iterator for ReadFutureIterator { + type Item = BoxedFuture<'static, Result>; + + fn next(&mut self) -> Option { + if self.offset >= self.end.unwrap_or(u64::MAX) { + return None; + } + if self.finished.load(Ordering::Relaxed) { + return None; + } + + let offset = self.offset; + let limit = self + .end + .map(|end| end - self.offset) + .unwrap_or(4 * 1024 * 1024) as usize; + let finished = self.finished.clone(); + let r = self.r.clone(); + + // Update self.offset before building future. + self.offset += limit as u64; + let fut = async move { + let buf = r.read_at_static(offset, limit).await?; + if buf.len() < limit || limit == 0 { + // Update finished marked if buf is less than limit. + finished.store(true, Ordering::Relaxed); + } + Ok(buf) + }; + + Some(Box::pin(fut)) + } + } +} + pub mod into_futures_async_read { use std::io; use std::io::SeekFrom;