Skip to content

Commit

Permalink
feat(unftp-sbe): impl OpendalStorage (#4765)
Browse files Browse the repository at this point in the history
* impl OpendalStorage

* make OpendalStorage `Clone`

* fix problem
  • Loading branch information
George-Miao authored Jun 19, 2024
1 parent c8d33a4 commit 248538f
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 0 deletions.
5 changes: 5 additions & 0 deletions integrations/unftp-sbe/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions integrations/unftp-sbe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,9 @@ rust-version = "1.75"
version = "0.0.0"

[dependencies]
async-trait = "0.1.80"
bytes = "1.6.0"
libunftp = "0.20.0"
opendal = { version = "0.47.0", path = "../../core" }
tokio = { version = "1.38.0", default-features = false, features = ["io-util"] }
tokio-util = { version = "0.7.11", features = ["compat"] }
215 changes: 215 additions & 0 deletions integrations/unftp-sbe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,218 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::task::Poll;

use bytes::{Buf, BufMut};
use libunftp::auth::UserDetail;
use libunftp::storage::{self, StorageBackend};
use opendal::{Buffer, Operator};

use tokio::io::AsyncRead;
use tokio_util::compat::FuturesAsyncWriteCompatExt;

#[derive(Debug, Clone)]
pub struct OpendalStorage {
op: Operator,
}

impl OpendalStorage {
pub fn new(op: Operator) -> Self {
Self { op }
}
}

/// A wrapper around [`Buffer`] to implement [`tokio::io::AsyncRead`].
pub struct IoBuffer(Buffer);

impl AsyncRead for IoBuffer {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let len = std::io::copy(&mut self.as_mut().0.by_ref().reader(), &mut buf.writer())?;
self.0.advance(len as usize);
Poll::Ready(Ok(()))
}
}

/// A wrapper around [`opendal::Metadata`] to implement [`libunftp::storage::Metadata`].
pub struct OpendalMetadata(opendal::Metadata);

impl storage::Metadata for OpendalMetadata {
fn len(&self) -> u64 {
self.0.content_length()
}

fn is_dir(&self) -> bool {
self.0.is_dir()
}

fn is_file(&self) -> bool {
self.0.is_file()
}

fn is_symlink(&self) -> bool {
false
}

fn modified(&self) -> storage::Result<std::time::SystemTime> {
self.0.last_modified().map(Into::into).ok_or_else(|| {
storage::Error::new(storage::ErrorKind::LocalError, "no last modified time")
})
}

fn gid(&self) -> u32 {
0
}

fn uid(&self) -> u32 {
0
}
}

fn convert_err(err: opendal::Error) -> storage::Error {
let kind = match err.kind() {
opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
_ => storage::ErrorKind::LocalError,
};
storage::Error::new(kind, err)
}

fn convert_path(path: &Path) -> storage::Result<&str> {
path.to_str().ok_or_else(|| {
storage::Error::new(
storage::ErrorKind::LocalError,
"Path is not a valid UTF-8 string",
)
})
}

#[async_trait::async_trait]
impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
type Metadata = OpendalMetadata;

async fn metadata<P: AsRef<Path> + Send + Debug>(
&self,
_: &User,
path: P,
) -> storage::Result<Self::Metadata> {
let metadata = self
.op
.stat(convert_path(path.as_ref())?)
.await
.map_err(convert_err)?;
Ok(OpendalMetadata(metadata))
}

async fn list<P: AsRef<Path> + Send + Debug>(
&self,
_: &User,
path: P,
) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
where
Self::Metadata: storage::Metadata,
{
let ret = self
.op
.list(convert_path(path.as_ref())?)
.await
.map_err(convert_err)?
.into_iter()
.map(|x| {
let (path, metadata) = x.into_parts();
storage::Fileinfo {
path: path.into(),
metadata: OpendalMetadata(metadata),
}
})
.collect();
Ok(ret)
}

async fn get<P: AsRef<Path> + Send + Debug>(
&self,
_: &User,
path: P,
start_pos: u64,
) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
let buf = self
.op
.read_with(convert_path(path.as_ref())?)
.range(start_pos..)
.await
.map_err(convert_err)?;
Ok(Box::new(IoBuffer(buf)))
}

async fn put<
P: AsRef<Path> + Send + Debug,
R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
>(
&self,
_: &User,
mut input: R,
path: P,
_: u64,
) -> storage::Result<u64> {
let mut w = self
.op
.writer(convert_path(path.as_ref())?)
.await
.map_err(convert_err)?
.into_futures_async_write()
.compat_write();
let len = tokio::io::copy(&mut input, &mut w).await?;
Ok(len)
}

async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
self.op
.delete(convert_path(path.as_ref())?)
.await
.map_err(convert_err)
}

async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
self.op
.create_dir(convert_path(path.as_ref())?)
.await
.map_err(convert_err)
}

async fn rename<P: AsRef<Path> + Send + Debug>(
&self,
_: &User,
from: P,
to: P,
) -> storage::Result<()> {
let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
self.op.rename(from, to).await.map_err(convert_err)
}

async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
self.op
.remove_all(convert_path(path.as_ref())?)
.await
.map_err(convert_err)
}

async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
use opendal::ErrorKind::*;

match self.op.stat(convert_path(path.as_ref())?).await {
Ok(_) => Ok(()),
Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(storage::Error::new(
storage::ErrorKind::PermanentDirectoryNotAvailable,
e,
)),
Err(e) => Err(convert_err(e)),
}
}
}

0 comments on commit 248538f

Please sign in to comment.