Skip to content

Commit

Permalink
chore: Add docs and tests for reader related types (#4513)
Browse files Browse the repository at this point in the history
* Refactor reader layout

Signed-off-by: Xuanwo <[email protected]>

* Polish docs for futures async reader

Signed-off-by: Xuanwo <[email protected]>

* Polish docs

Signed-off-by: Xuanwo <[email protected]>

* Add tests

Signed-off-by: Xuanwo <[email protected]>

* format code

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 23, 2024
1 parent 2f91f60 commit 7bb1901
Show file tree
Hide file tree
Showing 20 changed files with 692 additions and 360 deletions.
3 changes: 2 additions & 1 deletion core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::vec::IntoIter;
use async_trait::async_trait;

use super::Adapter;
use crate::raw::oio::{HierarchyLister, QueueBuf};
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::QueueBuf;
use crate::raw::*;
use crate::*;

Expand Down
3 changes: 2 additions & 1 deletion core/src/raw/adapters/typed_kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use std::mem::size_of;
use async_trait::async_trait;
use chrono::Utc;

use crate::Buffer;
use crate::EntryMode;
use crate::Error;
use crate::ErrorKind;
use crate::Metadata;
use crate::Result;
use crate::Scheme;
use crate::{Buffer, EntryMode};

/// Adapter is the typed adapter to underlying kv services.
///
Expand Down
3 changes: 2 additions & 1 deletion core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use async_trait::async_trait;

use super::Adapter;
use super::Value;
use crate::raw::oio::{HierarchyLister, QueueBuf};
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::QueueBuf;
use crate::raw::*;
use crate::*;

Expand Down
4 changes: 3 additions & 1 deletion core/src/raw/oio/buf/flex_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use bytes::{Buf, BufMut, Bytes};

/// FlexBuf is a buffer that support frozen bytes and reuse existing allocated memory.
///
Expand Down
6 changes: 4 additions & 2 deletions core/src/raw/oio/buf/queue_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::*;
use bytes::Buf;
use std::collections::VecDeque;

use bytes::Buf;

use crate::*;

/// QueueBuf is a queue of [`Buffer`].
///
/// It's designed to allow storing multiple buffers without copying underlying bytes and consume them
Expand Down
3 changes: 2 additions & 1 deletion core/src/raw/oio/write/exact_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {

#[cfg(test)]
mod tests {
use bytes::{Buf, Bytes};
use bytes::Buf;
use bytes::Bytes;
use log::debug;
use pretty_assertions::assert_eq;
use rand::thread_rng;
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/d1/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use serde::Serialize;
use serde_json::Map;
use serde_json::Value;

use crate::{Buffer, Error};
use crate::Buffer;
use crate::Error;

/// response data from d1
#[derive(Deserialize, Debug)]
Expand Down
1 change: 0 additions & 1 deletion core/src/services/dbfs/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use bytes::Bytes;
use serde::Deserialize;

use super::core::DbfsCore;

use crate::raw::*;
use crate::*;

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;
use std::io::Write;
use std::path::PathBuf;

use bytes::Buf;
use tokio::io::AsyncWriteExt;

use crate::raw::*;
Expand Down
7 changes: 4 additions & 3 deletions core/src/services/ftp/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

use bb8::PooledConnection;
use bytes::Buf;
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use futures::AsyncRead;
use futures::AsyncWrite;
use futures::AsyncWriteExt;

use super::backend::Manager;
use crate::raw::*;
use crate::services::ftp::err::parse_error;
use crate::*;

use super::backend::Manager;

trait DataStream: AsyncRead + AsyncWrite {}
impl<T> DataStream for T where T: AsyncRead + AsyncWrite {}

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;
use std::io::Write;
use std::sync::Arc;

use bytes::Buf;
use futures::AsyncWriteExt;

use crate::raw::*;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/sftp/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;
use std::pin::Pin;

use bytes::Buf;
use openssh_sftp_client::file::File;
use openssh_sftp_client::file::TokioCompatFile;
use tokio::io::AsyncWriteExt;
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/surrealdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ use tokio::sync::OnceCell;
use crate::raw::adapters::kv;
use crate::raw::normalize_root;
use crate::raw::ConfigDeserializer;
use crate::Buffer;
use crate::Builder;
use crate::Capability;
use crate::Error;
use crate::ErrorKind;
use crate::Scheme;
use crate::{Buffer, Builder};

/// Config for Surrealdb services support.
#[derive(Default, Deserialize)]
Expand Down
20 changes: 12 additions & 8 deletions core/src/types/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt::{Debug, Formatter};
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::IoSlice;
use std::mem;
use std::ops::{Bound, RangeBounds};
use std::ops::Bound;
use std::ops::RangeBounds;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;

use bytes::{Buf, BytesMut};
use bytes::{BufMut, Bytes};
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::Stream;

/// Buffer is a wrapper of contiguous `Bytes` and non contiguous `[Bytes]`.
Expand All @@ -48,8 +53,8 @@ use futures::Stream;
///
/// ```rust
/// use bytes::Buf;
/// use serde_json;
/// use opendal::Buffer;
/// use serde_json;
///
/// fn test(mut buf: Buffer) -> Vec<String> {
/// serde_json::from_reader(buf.reader()).unwrap()
Expand All @@ -75,8 +80,8 @@ use futures::Stream;
///
/// ```rust
/// use bytes::Bytes;
/// use opendal::Buffer;
/// use futures::TryStreamExt;
/// use opendal::Buffer;
///
/// async fn test(mut buf: Buffer) -> Vec<Bytes> {
/// buf.into_iter().try_collect().await.unwrap()
Expand All @@ -102,7 +107,6 @@ use futures::Stream;
/// }
/// ```
///
///
#[derive(Clone)]
pub struct Buffer(Inner);

Expand Down
4 changes: 1 addition & 3 deletions core/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ pub use metadata::Metadata;
pub use metadata::Metakey;

mod reader;
pub use reader::into_futures_async_read::FuturesAsyncReader;
pub use reader::into_futures_stream::FuturesBytesStream;
pub use reader::Reader;
pub use reader::*;

mod blocking_reader;
pub use blocking_reader::into_std_iterator::StdBytesIterator;
Expand Down
Loading

0 comments on commit 7bb1901

Please sign in to comment.