diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 84a0c12292c7..529ce6bf66a6 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -311,7 +311,7 @@ impl oio::BlockingWrite for BlockingWrapper { impl oio::BlockingList for BlockingWrapper { fn next(&mut self) -> Result> { - self.handle.block_on(poll_fn(|cx| self.inner.poll_next(cx))) + self.handle.block_on(self.inner.next()) } } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 60dd2d26738e..405210d3202f 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -212,7 +212,7 @@ impl CompleteAccessor { ) .await?; - return if oio::ListExt::next(&mut l).await?.is_some() { + return if oio::List::next(&mut l).await?.is_some() { Ok(RpStat::new(Metadata::new(EntryMode::DIR))) } else { Err(Error::new( diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 8a663cfc4757..f3b1f95e83c9 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -16,6 +16,7 @@ // under the License. use std::fmt::Debug; + use std::io::SeekFrom; use std::sync::Arc; use std::task::Context; @@ -301,8 +302,8 @@ impl oio::BlockingWrite for ConcurrentLimitWrapper { } impl oio::List for ConcurrentLimitWrapper { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.inner.poll_next(cx) + async fn next(&mut self) -> Result> { + self.inner.next().await } } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index f8112951a1c4..52db9359e952 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; + use std::io::SeekFrom; use std::task::Context; use std::task::Poll; @@ -433,10 +434,9 @@ impl oio::BlockingWrite for ErrorContextWrapper { } } -#[async_trait::async_trait] impl oio::List for ErrorContextWrapper { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.inner.poll_next(cx).map_err(|err| { + async fn next(&mut self) -> Result> { + self.inner.next().await.map_err(|err| { err.with_operation(ListOperation::Next) .with_context("service", self.scheme) .with_context("path", &self.path) diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index eea8a343c13f..a6eb054063ae 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -17,8 +17,7 @@ use std::collections::HashSet; use std::fmt::Debug; -use std::task::Context; -use std::task::Poll; + use std::vec::IntoIter; use async_trait::async_trait; @@ -233,8 +232,8 @@ impl ImmutableDir { } impl oio::List for ImmutableDir { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - Poll::Ready(Ok(self.inner_next())) + async fn next(&mut self) -> Result> { + Ok(self.inner_next()) } } diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 02abb3f3dff0..432469fcc316 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -16,6 +16,7 @@ // under the License. use std::fmt::Debug; + use std::io; use std::task::ready; use std::task::Context; @@ -1353,11 +1354,9 @@ impl

Drop for LoggingLister

{ } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::List for LoggingLister

{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let res = ready!(self.inner.poll_next(cx)); + async fn next(&mut self) -> Result> { + let res = self.inner.next().await; match &res { Ok(Some(de)) => { @@ -1395,7 +1394,7 @@ impl oio::List for LoggingLister

{ } }; - Poll::Ready(res) + res } } diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index d4135251a3a1..c2efde340bb2 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -25,6 +25,7 @@ use std::any::Any; use std::cmp::min; use std::collections::HashMap; use std::fmt::Debug; +use std::future::Future; use std::io::Result; use std::io::SeekFrom; use std::net::SocketAddr; @@ -325,11 +326,11 @@ impl oio::Write for MadsimWriter { pub struct MadsimLister {} impl oio::List for MadsimLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - Poll::Ready(Err(Error::new( + async fn next(&mut self) -> crate::Result> { + Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", - ))) + )) } } diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index fdc6a1dc929c..2f3962b48cf5 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -16,6 +16,7 @@ // under the License. use std::fmt::Debug; + use std::io; use std::task::Context; use std::task::Poll; @@ -357,10 +358,9 @@ impl oio::BlockingWrite for MinitraceWrapper { } impl oio::List for MinitraceWrapper { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let _g = self.span.set_local_parent(); - let _span = LocalSpan::enter_with_local_parent(ListOperation::Next.into_static()); - self.inner.poll_next(cx) + #[trace(enter_on_poll = true)] + async fn next(&mut self) -> Result> { + self.inner.next().await } } diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 90e8d65392ab..d582691f1117 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -321,11 +321,9 @@ impl oio::BlockingWrite for OtelTraceWrapper { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::List for OtelTraceWrapper { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.inner.poll_next(cx) + async fn next(&mut self) -> Result> { + self.inner.next().await } } diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index e6582114e841..8abab3b2ff29 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; + use std::io; use std::pin::Pin; use std::sync::Arc; @@ -952,53 +953,37 @@ impl oio::BlockingWrite for RetryWra } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::List for RetryWrapper { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - if let Some(sleep) = self.sleep.as_mut() { - ready!(sleep.poll_unpin(cx)); - self.sleep = None; - } + async fn next(&mut self) -> Result> { + use backon::RetryableWithContext; - match ready!(self.inner.as_mut().unwrap().poll_next(cx)) { - Ok(v) => { - self.current_backoff = None; - Poll::Ready(Ok(v)) - } - Err(err) if !err.is_temporary() => { - self.current_backoff = None; - Poll::Ready(Err(err)) - } - Err(err) => { - let backoff = match self.current_backoff.as_mut() { - Some(backoff) => backoff, - None => { - self.current_backoff = Some(self.builder.build()); - self.current_backoff.as_mut().unwrap() - } - }; + let inner = self.inner.take().expect("inner must be valid"); - match backoff.next() { - None => { - self.current_backoff = None; - Poll::Ready(Err(err)) - } - Some(dur) => { - self.notify.intercept( - &err, - dur, - &[ - ("operation", ListOperation::Next.into_static()), - ("path", &self.path), - ], - ); - self.sleep = Some(Box::pin(tokio::time::sleep(dur))); - self.poll_next(cx) - } - } + let (inner, res) = { + |mut p: P| async move { + let res = p.next().await; + + (p, res) } } + .retry(&self.builder) + .when(|e| e.is_temporary()) + .context(inner) + .notify(|err, dur| { + self.notify.intercept( + err, + dur, + &[ + ("operation", ListOperation::Next.into_static()), + ("path", &self.path), + ], + ) + }) + .map(|(r, res)| (r, res.map_err(|err| err.set_persistent()))) + .await; + + self.inner = Some(inner); + res } } @@ -1028,8 +1013,6 @@ mod tests { use std::io; use std::sync::Arc; use std::sync::Mutex; - use std::task::Context; - use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; @@ -1208,9 +1191,9 @@ mod tests { } impl oio::List for MockLister { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { self.attempt += 1; - let result = match self.attempt { + match self.attempt { 1 => Err(Error::new( ErrorKind::RateLimited, "retryable rate limited error from lister", @@ -1240,9 +1223,7 @@ mod tests { _ => { unreachable!() } - }; - - Poll::Ready(result) + } } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index f80d29cf1b88..d522571364ce 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -360,12 +360,9 @@ impl oio::Write for TimeoutWrapper { } impl oio::List for TimeoutWrapper { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.poll_timeout(cx, ListOperation::Next.into_static())?; - - let v = ready!(self.inner.poll_next(cx)); - self.sleep = None; - Poll::Ready(v) + async fn next(&mut self) -> Result> { + let fut = self.inner.next(); + Self::io_timeout(self.timeout, ListOperation::Next.into_static(), fut).await } } @@ -374,8 +371,7 @@ mod tests { use std::future::{pending, Future}; use std::io::SeekFrom; use std::sync::Arc; - use std::task::Context; - use std::task::Poll; + use std::time::Duration; use async_trait::async_trait; @@ -447,8 +443,8 @@ mod tests { struct MockLister; impl oio::List for MockLister { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - Poll::Pending + fn next(&mut self) -> impl Future>> { + pending() } } @@ -506,6 +502,8 @@ mod tests { #[tokio::test] async fn test_list_timeout_raw() { + use oio::List; + let acc = MockService; let timeout_layer = TimeoutLayer::new() .with_timeout(Duration::from_secs(1)) @@ -516,7 +514,6 @@ mod tests { .await .unwrap(); - use oio::ListExt; let res = lister.next().await; assert!(res.is_err()); let err = res.unwrap_err(); diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 017264cd3297..e172e78db7d4 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -16,6 +16,7 @@ // under the License. use std::fmt::Debug; + use std::io; use std::task::Context; use std::task::Poll; @@ -349,8 +350,8 @@ impl oio::BlockingWrite for TracingWrapper { impl oio::List for TracingWrapper { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.inner.poll_next(cx) + async fn next(&mut self) -> Result> { + self.inner.next().await } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 2f1c82da8a24..c3d840fa51bb 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -249,8 +249,8 @@ impl KvLister { } impl oio::List for KvLister { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - Poll::Ready(Ok(self.inner_next())) + async fn next(&mut self) -> Result> { + Ok(self.inner_next()) } } diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 8f56a061168b..41bf51155cff 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -253,8 +253,8 @@ impl KvLister { } impl oio::List for KvLister { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - Poll::Ready(Ok(self.inner_next())) + async fn next(&mut self) -> Result> { + Ok(self.inner_next()) } } diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index 430e51377417..13890a5bb9b9 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -263,12 +263,12 @@ where THREE: oio::List, FOUR: oio::List, { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { match self { - Self::One(v) => v.poll_next(cx), - Self::Two(v) => v.poll_next(cx), - Self::Three(v) => v.poll_next(cx), - Self::Four(v) => v.poll_next(cx), + Self::One(v) => v.next().await, + Self::Two(v) => v.next().await, + Self::Three(v) => v.next().await, + Self::Four(v) => v.next().await, } } } diff --git a/core/src/raw/oio/list/api.rs b/core/src/raw/oio/list/api.rs index 07dbdf774bb4..e5c62054cd00 100644 --- a/core/src/raw/oio/list/api.rs +++ b/core/src/raw/oio/list/api.rs @@ -18,18 +18,18 @@ use std::fmt::Display; use std::fmt::Formatter; use std::future::Future; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; + +use std::ops::DerefMut; use crate::raw::oio::Entry; +use crate::raw::BoxedFuture; use crate::*; /// PageOperation is the name for APIs of lister. #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] #[non_exhaustive] pub enum ListOperation { - /// Operation for [`List::poll_next`] + /// Operation for [`List::next`] Next, /// Operation for [`BlockingList::next`] BlockingNext, @@ -59,67 +59,54 @@ impl From for &'static str { } } +/// The boxed version of [`List`] +pub type Lister = Box; + /// Page trait is used by [`raw::Accessor`] to implement `list` operation. -pub trait List: Unpin + Send + Sync + 'static { +pub trait List: Unpin + Send + Sync { /// Fetch a new page of [`Entry`] /// /// `Ok(None)` means all pages have been returned. Any following call /// to `next` will always get the same result. - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>>; -} - -/// The boxed version of [`List`] -pub type Lister = Box; - -impl List for Box

{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - (**self).poll_next(cx) - } + #[cfg(not(target_arch = "wasm32"))] + fn next(&mut self) -> impl Future>> + Send; + #[cfg(target_arch = "wasm32")] + fn next(&mut self) -> impl Future>>; } impl List for () { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - Poll::Ready(Ok(None)) + async fn next(&mut self) -> Result> { + Ok(None) } } impl List for Option

{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { match self { - Some(p) => p.poll_next(cx), - None => Poll::Ready(Ok(None)), + Some(p) => p.next().await, + None => Ok(None), } } } -/// Impl ListExt for all T: List -impl ListExt for T {} - -/// Extension of [`List`] to make it easier for use. -pub trait ListExt: List { - /// Build a future for `poll_next`. - fn next(&mut self) -> NextFuture { - NextFuture { lister: self } - } +pub trait ListDyn: Unpin + Send + Sync { + fn next_dyn(&mut self) -> BoxedFuture>>; } -pub struct NextFuture<'a, L: List + Unpin + ?Sized> { - lister: &'a mut L, +impl ListDyn for T { + fn next_dyn(&mut self) -> BoxedFuture>> { + Box::pin(self.next()) + } } -impl Future for NextFuture<'_, L> -where - L: List + Unpin + ?Sized, -{ - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - self.lister.poll_next(cx) +impl List for Box { + async fn next(&mut self) -> Result> { + self.deref_mut().next_dyn().await } } /// BlockingList is the blocking version of [`List`]. -pub trait BlockingList: Send + 'static { +pub trait BlockingList: Send { /// Fetch a new page of [`Entry`] /// /// `Ok(None)` means all pages have been returned. Any following call diff --git a/core/src/raw/oio/list/flat_list.rs b/core/src/raw/oio/list/flat_list.rs index 2b948408c1de..11ff40380f8d 100644 --- a/core/src/raw/oio/list/flat_list.rs +++ b/core/src/raw/oio/list/flat_list.rs @@ -15,18 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::task::ready; -use std::task::Context; -use std::task::Poll; - -use futures::FutureExt; - use crate::raw::*; use crate::*; -/// ListFuture is the future returned while calling async list. -type ListFuture = BoxedStaticFuture<(A, oio::Entry, Result<(RpList, L)>)>; - /// FlatLister will walk dir in bottom up way: /// /// - List nested dir first @@ -64,12 +55,11 @@ type ListFuture = BoxedStaticFuture<(A, oio::Entry, Result<(RpList, L)>)>; /// may output parent dirs' files before nested dirs, this is expected because files /// always output directly while listing. pub struct FlatLister { - acc: Option, + acc: A, root: String, next_dir: Option, active_lister: Vec<(Option, L)>, - list_future: Option>, } /// # Safety @@ -88,11 +78,10 @@ where /// Create a new flat lister pub fn new(acc: A, path: &str) -> FlatLister { FlatLister { - acc: Some(acc), + acc, root: path.to_string(), next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))), active_lister: vec![], - list_future: None, } } } @@ -102,44 +91,30 @@ where A: Accessor, L: oio::List, { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { loop { - if let Some(fut) = self.list_future.as_mut() { - let (acc, de, res) = ready!(fut.poll_unpin(cx)); - self.acc = Some(acc); - self.list_future = None; - - let (_, l) = res?; - self.active_lister.push((Some(de), l)) - } - if let Some(de) = self.next_dir.take() { - let acc = self.acc.take().expect("Accessor must be valid"); - let fut = async move { - let res = acc.list(de.path(), OpList::new()).await; - (acc, de, res) - }; - self.list_future = Some(Box::pin(fut)); - continue; + let (_, l) = self.acc.list(de.path(), OpList::new()).await?; + self.active_lister.push((Some(de), l)); } let (de, lister) = match self.active_lister.last_mut() { Some((de, lister)) => (de, lister), - None => return Poll::Ready(Ok(None)), + None => return Ok(None), }; - match ready!(lister.poll_next(cx))? { + match lister.next().await? { Some(v) if v.mode().is_dir() => { self.next_dir = Some(v); continue; } - Some(v) => return Poll::Ready(Ok(Some(v))), + Some(v) => return Ok(Some(v)), None => { match de.take() { Some(de) => { // Only push entry if it's not root dir if de.path() != self.root { - return Poll::Ready(Ok(Some(de))); + return Ok(Some(de)); } continue; } @@ -162,10 +137,8 @@ where fn next(&mut self) -> Result> { loop { if let Some(de) = self.next_dir.take() { - let acc = self.acc.take().expect("Accessor must be valid"); - let (_, l) = acc.blocking_list(de.path(), OpList::new())?; + let (_, l) = self.acc.blocking_list(de.path(), OpList::new())?; - self.acc = Some(acc); self.active_lister.push((Some(de), l)) } diff --git a/core/src/raw/oio/list/hierarchy_list.rs b/core/src/raw/oio/list/hierarchy_list.rs index 259e45532210..bed486ce74e6 100644 --- a/core/src/raw/oio/list/hierarchy_list.rs +++ b/core/src/raw/oio/list/hierarchy_list.rs @@ -16,9 +16,6 @@ // under the License. use std::collections::HashSet; -use std::task::ready; -use std::task::Context; -use std::task::Poll; use crate::raw::*; use crate::*; @@ -125,18 +122,18 @@ impl

HierarchyLister

{ } impl oio::List for HierarchyLister

{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { loop { - let mut entry = match ready!(self.lister.poll_next(cx))? { + let mut entry = match self.lister.next().await? { Some(entry) => entry, - None => return Poll::Ready(Ok(None)), + None => return Ok(None), }; if self.recursive { - return Poll::Ready(Ok(Some(entry))); + return Ok(Some(entry)); } if self.keep_entry(&mut entry) { - return Poll::Ready(Ok(Some(entry))); + return Ok(Some(entry)); } } } diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs index 31f70d6f2328..8460afb1a25b 100644 --- a/core/src/raw/oio/list/mod.rs +++ b/core/src/raw/oio/list/mod.rs @@ -19,7 +19,6 @@ mod api; pub use api::BlockingList; pub use api::BlockingLister; pub use api::List; -pub use api::ListExt; pub use api::ListOperation; pub use api::Lister; diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs index b793a6b63f0f..2ed43bdd4241 100644 --- a/core/src/raw/oio/list/page_list.rs +++ b/core/src/raw/oio/list/page_list.rs @@ -16,11 +16,7 @@ // under the License. use std::collections::VecDeque; -use std::task::ready; -use std::task::Context; -use std::task::Poll; - -use async_trait::async_trait; +use std::future::Future; use crate::raw::*; use crate::*; @@ -35,11 +31,12 @@ use crate::*; /// - Services impl `PageList` /// - `PageLister` impl `List` /// - Expose `PageLister` as `Accessor::Lister` -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait PageList: Send + Sync + Unpin + 'static { /// next_page is used to fetch next page of entries from underlying storage. - async fn next_page(&self, ctx: &mut PageContext) -> Result<()>; + #[cfg(not(target_arch = "wasm32"))] + fn next_page(&self, ctx: &mut PageContext) -> impl Future> + Send; + #[cfg(target_arch = "wasm32")] + fn next_page(&self, ctx: &mut PageContext) -> impl Future>; } /// PageContext is the context passing between `PageList`. @@ -67,23 +64,10 @@ pub struct PageContext { /// PageLister implements [`List`] based on [`PageList`]. pub struct PageLister { - state: State, -} - -enum State { - Idle(Option<(L, PageContext)>), - Fetch(BoxedStaticFuture<((L, PageContext), Result<()>)>), + inner: L, + ctx: PageContext, } -/// # Safety -/// -/// wasm32 is a special target that we only have one event-loop for this state. -unsafe impl Send for State {} -/// # Safety -/// -/// We will only take `&mut Self` reference for State. -unsafe impl Sync for State {} - impl PageLister where L: PageList, @@ -91,14 +75,12 @@ where /// Create a new PageLister. pub fn new(l: L) -> Self { Self { - state: State::Idle(Some(( - l, - PageContext { - done: false, - token: "".to_string(), - entries: VecDeque::new(), - }, - ))), + inner: l, + ctx: PageContext { + done: false, + token: "".to_string(), + entries: VecDeque::new(), + }, } } } @@ -107,33 +89,16 @@ impl oio::List for PageLister where L: PageList, { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { loop { - match &mut self.state { - State::Idle(st) => { - if let Some((_, ctx)) = st.as_mut() { - if let Some(entry) = ctx.entries.pop_front() { - return Poll::Ready(Ok(Some(entry))); - } - if ctx.done { - return Poll::Ready(Ok(None)); - } - } - - let (l, mut ctx) = st.take().expect("lister must be valid"); - let fut = async move { - let res = l.next_page(&mut ctx).await; - ((l, ctx), res) - }; - self.state = State::Fetch(Box::pin(fut)); - } - State::Fetch(fut) => { - let ((l, ctx), res) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some((l, ctx))); - - res?; - } + if let Some(entry) = self.ctx.entries.pop_front() { + return Ok(Some(entry)); } + if self.ctx.done { + return Ok(None); + } + + self.inner.next_page(&mut self.ctx).await?; } } } diff --git a/core/src/raw/oio/list/prefix_list.rs b/core/src/raw/oio/list/prefix_list.rs index 166dbcaecebf..e82eee29b17d 100644 --- a/core/src/raw/oio/list/prefix_list.rs +++ b/core/src/raw/oio/list/prefix_list.rs @@ -15,10 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::task::ready; -use std::task::Context; -use std::task::Poll; - use crate::raw::*; use crate::*; @@ -66,11 +62,11 @@ impl oio::List for PrefixLister where L: oio::List, { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { loop { - match ready!(self.lister.poll_next(cx)) { + match self.lister.next().await { Ok(Some(e)) if !starts_with_not_eq(&e, &self.prefix) => continue, - v => return Poll::Ready(v), + v => return v, } } } diff --git a/core/src/services/alluxio/lister.rs b/core/src/services/alluxio/lister.rs index 5e56dbff892d..56463ad4ff16 100644 --- a/core/src/services/alluxio/lister.rs +++ b/core/src/services/alluxio/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::AlluxioCore; use crate::raw::oio::Entry; use crate::raw::*; @@ -40,7 +38,6 @@ impl AlluxioLister { } } -#[async_trait] impl oio::PageList for AlluxioLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let result = self.core.list_status(&self.path).await; diff --git a/core/src/services/azblob/lister.rs b/core/src/services/azblob/lister.rs index eeded04999c5..6bd1a61243ba 100644 --- a/core/src/services/azblob/lister.rs +++ b/core/src/services/azblob/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use bytes::Buf; use quick_xml::de; @@ -48,8 +47,6 @@ impl AzblobLister { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::PageList for AzblobLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/azdls/lister.rs b/core/src/services/azdls/lister.rs index db7904f946e4..3a74b27a5835 100644 --- a/core/src/services/azdls/lister.rs +++ b/core/src/services/azdls/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use serde::Deserialize; use serde_json::de; @@ -39,7 +38,6 @@ impl AzdlsLister { } } -#[async_trait] impl oio::PageList for AzdlsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/azfile/lister.rs b/core/src/services/azfile/lister.rs index bb9f8f73b492..9c3e6819c5da 100644 --- a/core/src/services/azfile/lister.rs +++ b/core/src/services/azfile/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use quick_xml::de::from_str; use serde::Deserialize; @@ -39,7 +38,6 @@ impl AzfileLister { } } -#[async_trait] impl oio::PageList for AzfileLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/b2/lister.rs b/core/src/services/b2/lister.rs index 0dee20a9d14a..3d12ec6cf3e4 100644 --- a/core/src/services/b2/lister.rs +++ b/core/src/services/b2/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use bytes::Buf; use super::core::parse_file_info; @@ -58,7 +57,6 @@ impl B2Lister { } } -#[async_trait] impl oio::PageList for B2Lister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/chainsafe/lister.rs b/core/src/services/chainsafe/lister.rs index 69cdc5520082..faf00b6df70b 100644 --- a/core/src/services/chainsafe/lister.rs +++ b/core/src/services/chainsafe/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use super::core::parse_info; @@ -43,7 +42,6 @@ impl ChainsafeLister { } } -#[async_trait] impl oio::PageList for ChainsafeLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.core.list_objects(&self.path).await?; diff --git a/core/src/services/cos/lister.rs b/core/src/services/cos/lister.rs index f289b3383db5..098e243d8ba7 100644 --- a/core/src/services/cos/lister.rs +++ b/core/src/services/cos/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use bytes::Buf; use quick_xml::de; @@ -47,7 +46,6 @@ impl CosLister { } } -#[async_trait] impl oio::PageList for CosLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/dbfs/lister.rs b/core/src/services/dbfs/lister.rs index e86dd3381045..11841bdae842 100644 --- a/core/src/services/dbfs/lister.rs +++ b/core/src/services/dbfs/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use serde::Deserialize; @@ -37,7 +36,6 @@ impl DbfsLister { } } -#[async_trait] impl oio::PageList for DbfsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let response = self.core.dbfs_list(&self.path).await?; diff --git a/core/src/services/fs/lister.rs b/core/src/services/fs/lister.rs index 29ec2c9f36f5..eec0a4b8aa14 100644 --- a/core/src/services/fs/lister.rs +++ b/core/src/services/fs/lister.rs @@ -15,15 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::fs::FileType; use std::path::Path; use std::path::PathBuf; -use std::task::ready; -use std::task::Context; -use std::task::Poll; - -use futures::future::BoxFuture; -use futures::FutureExt; use crate::raw::*; use crate::EntryMode; @@ -34,8 +27,6 @@ pub struct FsLister

{ root: PathBuf, rd: P, - - fut: Option)>>, } impl

FsLister

{ @@ -43,8 +34,6 @@ impl

FsLister

{ Self { root: root.to_owned(), rd, - - fut: None, } } } @@ -55,57 +44,32 @@ impl

FsLister

{ unsafe impl

Sync for FsLister

{} impl oio::List for FsLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - if let Some(fut) = self.fut.as_mut() { - let (de, ft) = futures::ready!(fut.poll_unpin(cx)); - let ft = match ft { - Ok(ft) => { - self.fut = None; - ft - } - Err(e) => { - let fut = async move { - let ft = de.file_type().await.map_err(new_std_io_error); - (de, ft) - }; - self.fut = Some(Box::pin(fut)); - return Poll::Ready(Err(e)); - } - }; - - let entry_path = de.path(); - let rel_path = normalize_path( - &entry_path - .strip_prefix(&self.root) - .expect("cannot fail because the prefix is iterated") - .to_string_lossy() - .replace('\\', "/"), - ); - - let d = if ft.is_file() { - oio::Entry::new(&rel_path, Metadata::new(EntryMode::FILE)) - } else if ft.is_dir() { - // Make sure we are returning the correct path. - oio::Entry::new(&format!("{rel_path}/"), Metadata::new(EntryMode::DIR)) - } else { - oio::Entry::new(&rel_path, Metadata::new(EntryMode::Unknown)) - }; - - return Poll::Ready(Ok(Some(d))); - } + async fn next(&mut self) -> Result> { + let Some(de) = self.rd.next_entry().await.map_err(new_std_io_error)? else { + return Ok(None); + }; - let de = ready!(self.rd.poll_next_entry(cx)).map_err(new_std_io_error)?; - match de { - Some(de) => { - let fut = async move { - let ft = de.file_type().await.map_err(new_std_io_error); - (de, ft) - }; - self.fut = Some(Box::pin(fut)); - self.poll_next(cx) - } - None => Poll::Ready(Ok(None)), - } + let ft = de.file_type().await.map_err(new_std_io_error)?; + + let entry_path = de.path(); + let rel_path = normalize_path( + &entry_path + .strip_prefix(&self.root) + .expect("cannot fail because the prefix is iterated") + .to_string_lossy() + .replace('\\', "/"), + ); + + let d = if ft.is_file() { + oio::Entry::new(&rel_path, Metadata::new(EntryMode::FILE)) + } else if ft.is_dir() { + // Make sure we are returning the correct path. + oio::Entry::new(&format!("{rel_path}/"), Metadata::new(EntryMode::DIR)) + } else { + oio::Entry::new(&rel_path, Metadata::new(EntryMode::Unknown)) + }; + + Ok(Some(d)) } } diff --git a/core/src/services/ftp/lister.rs b/core/src/services/ftp/lister.rs index 4d77bf1b87f5..3d531815a201 100644 --- a/core/src/services/ftp/lister.rs +++ b/core/src/services/ftp/lister.rs @@ -17,8 +17,7 @@ use std::str; use std::str::FromStr; -use std::task::Context; -use std::task::Poll; + use std::vec::IntoIter; use suppaftp::list::File; @@ -41,12 +40,12 @@ impl FtpLister { } impl oio::List for FtpLister { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { let de = match self.file_iter.next() { Some(file_str) => File::from_str(file_str.as_str()).map_err(|e| { Error::new(ErrorKind::Unexpected, "parse file from response").set_source(e) })?, - None => return Poll::Ready(Ok(None)), + None => return Ok(None), }; let path = self.path.to_string() + de.name(); @@ -64,6 +63,6 @@ impl oio::List for FtpLister { oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) }; - Poll::Ready(Ok(Some(entry))) + Ok(Some(entry)) } } diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs index 0a0180198838..f18412c74bc4 100644 --- a/core/src/services/gcs/lister.rs +++ b/core/src/services/gcs/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use serde_json; use super::core::*; @@ -60,8 +59,6 @@ impl GcsLister { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::PageList for GcsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs index db0ad063a65d..34df9073f388 100644 --- a/core/src/services/gdrive/lister.rs +++ b/core/src/services/gdrive/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use super::core::GdriveCore; @@ -37,8 +36,6 @@ impl GdriveLister { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::PageList for GdriveLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let file_id = self.core.path_cache.get(&self.path).await?; diff --git a/core/src/services/github/lister.rs b/core/src/services/github/lister.rs index 320f664a02bb..654f3cbb7831 100644 --- a/core/src/services/github/lister.rs +++ b/core/src/services/github/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::GithubCore; use crate::raw::oio::Entry; use crate::raw::*; @@ -39,7 +37,6 @@ impl GithubLister { } } -#[async_trait] impl oio::PageList for GithubLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let entries = self.core.list(&self.path).await?; diff --git a/core/src/services/hdfs/lister.rs b/core/src/services/hdfs/lister.rs index 404d159f5c5a..8daebbb04f95 100644 --- a/core/src/services/hdfs/lister.rs +++ b/core/src/services/hdfs/lister.rs @@ -15,11 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::task::Context; -use std::task::Poll; - -use async_trait::async_trait; - use crate::raw::*; use crate::EntryMode; use crate::Metadata; @@ -41,12 +36,11 @@ impl HdfsLister { } } -#[async_trait] impl oio::List for HdfsLister { - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { let de = match self.rd.next() { Some(de) => de, - None => return Poll::Ready(Ok(None)), + None => return Ok(None), }; let path = build_rel_path(&self.root, de.path()); @@ -63,7 +57,7 @@ impl oio::List for HdfsLister { oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) }; - Poll::Ready(Ok(Some(entry))) + Ok(Some(entry)) } } diff --git a/core/src/services/hdfs_native/lister.rs b/core/src/services/hdfs_native/lister.rs index 65c916c28076..bd2863783f31 100644 --- a/core/src/services/hdfs_native/lister.rs +++ b/core/src/services/hdfs_native/lister.rs @@ -16,8 +16,6 @@ // under the License. use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use crate::raw::oio; use crate::raw::oio::Entry; @@ -38,7 +36,7 @@ impl HdfsNativeLister { } impl oio::List for HdfsNativeLister { - fn poll_next(&mut self, _cx: &mut Context<'_>) -> Poll>> { + async fn next(&mut self) -> Result> { todo!() } } diff --git a/core/src/services/huggingface/lister.rs b/core/src/services/huggingface/lister.rs index e291dd3cdde1..836b62481a41 100644 --- a/core/src/services/huggingface/lister.rs +++ b/core/src/services/huggingface/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::HuggingfaceCore; use super::core::HuggingfaceStatus; use super::error::parse_error; @@ -41,7 +39,6 @@ impl HuggingfaceLister { } } -#[async_trait] impl oio::PageList for HuggingfaceLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let response = self.core.hf_list(&self.path, self.recursive).await?; diff --git a/core/src/services/ipfs/backend.rs b/core/src/services/ipfs/backend.rs index 8e990e31381a..964a1cef1923 100644 --- a/core/src/services/ipfs/backend.rs +++ b/core/src/services/ipfs/backend.rs @@ -423,7 +423,6 @@ impl DirStream { } } -#[async_trait] impl oio::PageList for DirStream { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.backend.ipfs_list(&self.path).await?; diff --git a/core/src/services/ipmfs/lister.rs b/core/src/services/ipmfs/lister.rs index 9198b8e31a86..81bff72dca52 100644 --- a/core/src/services/ipmfs/lister.rs +++ b/core/src/services/ipmfs/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use serde::Deserialize; @@ -44,7 +43,6 @@ impl IpmfsLister { } } -#[async_trait] impl oio::PageList for IpmfsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.backend.ipmfs_ls(&self.path).await?; diff --git a/core/src/services/koofr/lister.rs b/core/src/services/koofr/lister.rs index 8ad58a58d2cd..8d56e65f5be8 100644 --- a/core/src/services/koofr/lister.rs +++ b/core/src/services/koofr/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::KoofrCore; use super::core::ListResponse; use super::error::parse_error; @@ -43,7 +41,6 @@ impl KoofrLister { } } -#[async_trait] impl oio::PageList for KoofrLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.core.list(&self.path).await?; diff --git a/core/src/services/obs/lister.rs b/core/src/services/obs/lister.rs index ead6bbbeab18..e38ebe75197f 100644 --- a/core/src/services/obs/lister.rs +++ b/core/src/services/obs/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use bytes::Buf; use quick_xml::de; @@ -48,7 +47,6 @@ impl ObsLister { } } -#[async_trait] impl oio::PageList for ObsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/onedrive/lister.rs b/core/src/services/onedrive/lister.rs index 12c557867ab1..32fdc426f7b9 100644 --- a/core/src/services/onedrive/lister.rs +++ b/core/src/services/onedrive/lister.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use async_trait::async_trait; - use super::backend::OnedriveBackend; use super::error::parse_error; use super::graph_model::GraphApiOnedriveListResponse; @@ -43,7 +41,6 @@ impl OnedriveLister { } } -#[async_trait] impl oio::PageList for OnedriveLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let request_url = if ctx.token.is_empty() { diff --git a/core/src/services/oss/lister.rs b/core/src/services/oss/lister.rs index 926946a6165d..28919cfa80e9 100644 --- a/core/src/services/oss/lister.rs +++ b/core/src/services/oss/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use bytes::Buf; use quick_xml::de; @@ -56,7 +55,6 @@ impl OssLister { } } -#[async_trait] impl oio::PageList for OssLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/pcloud/lister.rs b/core/src/services/pcloud/lister.rs index e4dd65c38127..eb4a0c670bf7 100644 --- a/core/src/services/pcloud/lister.rs +++ b/core/src/services/pcloud/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use super::core::*; @@ -41,7 +40,6 @@ impl PcloudLister { } } -#[async_trait] impl oio::PageList for PcloudLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.core.list_folder(&self.path).await?; diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs index 569d83ebeeb6..1e1c25b1d702 100644 --- a/core/src/services/s3/lister.rs +++ b/core/src/services/s3/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use bytes::Buf; use quick_xml::de; @@ -60,8 +59,6 @@ impl S3Lister { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::PageList for S3Lister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/seafile/lister.rs b/core/src/services/seafile/lister.rs index 792d6280324a..ff687981ffe9 100644 --- a/core/src/services/seafile/lister.rs +++ b/core/src/services/seafile/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::header; use http::Request; use http::StatusCode; @@ -44,7 +43,6 @@ impl SeafileLister { } } -#[async_trait] impl oio::PageList for SeafileLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let path = build_rooted_abs_path(&self.core.root, &self.path); @@ -103,9 +101,7 @@ impl oio::PageList for SeafileLister { ctx.done = true; Ok(()) } - _ => { - return Err(parse_error(resp).await?); - } + _ => Err(parse_error(resp).await?), } } } diff --git a/core/src/services/sftp/lister.rs b/core/src/services/sftp/lister.rs index f7fe5aac9fc2..077e123ee396 100644 --- a/core/src/services/sftp/lister.rs +++ b/core/src/services/sftp/lister.rs @@ -16,17 +16,14 @@ // under the License. use std::pin::Pin; -use std::task::ready; -use std::task::Context; -use std::task::Poll; -use async_trait::async_trait; use futures::StreamExt; use openssh_sftp_client::fs::DirEntry; use openssh_sftp_client::fs::ReadDir; use super::error::parse_sftp_error; use crate::raw::oio; +use crate::raw::oio::Entry; use crate::Result; pub struct SftpLister { @@ -45,22 +42,26 @@ impl SftpLister { } } -#[async_trait] impl oio::List for SftpLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let item = ready!(self.dir.poll_next_unpin(cx)) - .transpose() - .map_err(parse_sftp_error)?; + async fn next(&mut self) -> Result> { + loop { + let item = self + .dir + .next() + .await + .transpose() + .map_err(parse_sftp_error)?; - match item { - Some(e) => { - if e.filename().to_str() == Some(".") || e.filename().to_str() == Some("..") { - self.poll_next(cx) - } else { - Poll::Ready(Ok(Some(map_entry(self.prefix.as_str(), e)))) + match item { + Some(e) => { + if e.filename().to_str() == Some(".") || e.filename().to_str() == Some("..") { + continue; + } else { + return Ok(Some(map_entry(self.prefix.as_str(), e))); + } } + None => return Ok(None), } - None => Poll::Ready(Ok(None)), } } } diff --git a/core/src/services/swift/lister.rs b/core/src/services/swift/lister.rs index ee90a1753777..79b8fdfa6f92 100644 --- a/core/src/services/swift/lister.rs +++ b/core/src/services/swift/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::*; use super::error::parse_error; use crate::raw::*; @@ -43,7 +41,6 @@ impl SwiftLister { } } -#[async_trait] impl oio::PageList for SwiftLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let response = self diff --git a/core/src/services/upyun/lister.rs b/core/src/services/upyun/lister.rs index 4fdd068cefb9..547facbf7508 100644 --- a/core/src/services/upyun/lister.rs +++ b/core/src/services/upyun/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::ListObjectsResponse; use super::core::UpyunCore; use super::error::parse_error; @@ -45,7 +43,6 @@ impl UpyunLister { } } -#[async_trait] impl oio::PageList for UpyunLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/vercel_blob/lister.rs b/core/src/services/vercel_blob/lister.rs index e792d8f3791d..41c7a201f739 100644 --- a/core/src/services/vercel_blob/lister.rs +++ b/core/src/services/vercel_blob/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::parse_blob; use super::core::VercelBlobCore; use crate::raw::oio::Entry; @@ -42,7 +40,6 @@ impl VercelBlobLister { } } -#[async_trait] impl oio::PageList for VercelBlobLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let p = build_abs_path(&self.core.root, &self.path); diff --git a/core/src/services/webdav/lister.rs b/core/src/services/webdav/lister.rs index ab9d7c608567..f1749d522dd2 100644 --- a/core/src/services/webdav/lister.rs +++ b/core/src/services/webdav/lister.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; use http::StatusCode; use super::core::*; @@ -42,7 +41,6 @@ impl WebdavLister { } } -#[async_trait] impl oio::PageList for WebdavLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.core.webdav_list(&self.path, &self.args).await?; diff --git a/core/src/services/webhdfs/lister.rs b/core/src/services/webhdfs/lister.rs index 4a984bb4c675..cc7db189f257 100644 --- a/core/src/services/webhdfs/lister.rs +++ b/core/src/services/webhdfs/lister.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use async_trait::async_trait; use http::StatusCode; use super::backend::WebhdfsBackend; @@ -38,7 +37,6 @@ impl WebhdfsLister { } } -#[async_trait] impl oio::PageList for WebhdfsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let file_status = if self.backend.disable_list_batch { diff --git a/core/src/services/yandex_disk/lister.rs b/core/src/services/yandex_disk/lister.rs index a116ce53a081..9eaec4535778 100644 --- a/core/src/services/yandex_disk/lister.rs +++ b/core/src/services/yandex_disk/lister.rs @@ -17,8 +17,6 @@ use std::sync::Arc; -use async_trait::async_trait; - use super::core::parse_info; use super::core::MetainformationResponse; use super::core::YandexDiskCore; @@ -44,7 +42,6 @@ impl YandexDiskLister { } } -#[async_trait] impl oio::PageList for YandexDiskLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let offset = if ctx.token.is_empty() { diff --git a/core/src/types/list.rs b/core/src/types/list.rs index fd0f2485ed9b..141a13bd70fd 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -26,7 +26,6 @@ use flagset::FlagSet; use futures::Stream; use futures::StreamExt; -use crate::raw::oio::List; use crate::raw::*; use crate::*; @@ -45,7 +44,11 @@ pub struct Lister { /// required_metakey is the metakey required by users. required_metakey: FlagSet, + fut: Option>)>>, + /// tasks is used to store tasks that are run in concurrent. + /// + /// TODO: maybe we should move logic inside? tasks: ConcurrentFutures, errored: bool, } @@ -134,6 +137,7 @@ impl Lister { lister: Some(lister), required_metakey, + fut: None, tasks: ConcurrentFutures::new(concurrent), errored: false, }) @@ -151,31 +155,44 @@ impl Stream for Lister { // Trying to pull more tasks if there are more space. if self.tasks.has_remaining() { - if let Some(lister) = self.lister.as_mut() { - match lister.poll_next(cx) { - Poll::Pending => {} - Poll::Ready(Ok(Some(oe))) => { - let (path, metadata) = oe.into_entry().into_parts(); - if metadata.contains_metakey(self.required_metakey) { - self.tasks - .push_back(StatTask::Known(Some((path, metadata)))); - } else { - let acc = self.acc.clone(); - let fut = async move { - let res = acc.stat(&path, OpStat::default()).await; - (path, res.map(|rp| rp.into_metadata())) - }; - self.tasks.push_back(StatTask::Stating(Box::pin(fut))); + // Building future is we have a lister available. + if let Some(mut lister) = self.lister.take() { + let fut = async move { + let res = lister.next_dyn().await; + (lister, res) + }; + self.fut = Some(Box::pin(fut)); + } + + if let Some(fut) = self.fut.as_mut() { + if let Poll::Ready((lister, entry)) = fut.as_mut().poll(cx) { + self.lister = Some(lister); + self.fut = None; + + match entry { + Ok(Some(oe)) => { + let (path, metadata) = oe.into_entry().into_parts(); + if metadata.contains_metakey(self.required_metakey) { + self.tasks + .push_back(StatTask::Known(Some((path, metadata)))); + } else { + let acc = self.acc.clone(); + let fut = async move { + let res = acc.stat(&path, OpStat::default()).await; + (path, res.map(|rp| rp.into_metadata())) + }; + self.tasks.push_back(StatTask::Stating(Box::pin(fut))); + } + } + Ok(None) => { + self.lister = None; + } + Err(err) => { + self.errored = true; + return Poll::Ready(Some(Err(err))); } } - Poll::Ready(Ok(None)) => { - self.lister = None; - } - Poll::Ready(Err(err)) => { - self.errored = true; - return Poll::Ready(Some(Err(err))); - } - }; + } } } @@ -185,7 +202,7 @@ impl Stream for Lister { return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); } - if self.lister.is_some() { + if self.lister.is_some() || self.fut.is_some() { Poll::Pending } else { Poll::Ready(None)