From 248538f1495711f1e4b59e83758184ab763cfb7f Mon Sep 17 00:00:00 2001 From: Pop Date: Wed, 19 Jun 2024 13:41:38 +0900 Subject: [PATCH] feat(unftp-sbe): impl `OpendalStorage` (#4765) * impl OpendalStorage * make OpendalStorage `Clone` * fix problem --- integrations/unftp-sbe/Cargo.lock | 5 + integrations/unftp-sbe/Cargo.toml | 4 + integrations/unftp-sbe/src/lib.rs | 215 ++++++++++++++++++++++++++++++ 3 files changed, 224 insertions(+) diff --git a/integrations/unftp-sbe/Cargo.lock b/integrations/unftp-sbe/Cargo.lock index adb683a893d2..0d4613c253d7 100644 --- a/integrations/unftp-sbe/Cargo.lock +++ b/integrations/unftp-sbe/Cargo.lock @@ -1840,6 +1840,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -1925,8 +1926,12 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" name = "unftp-sbe-opendal" version = "0.0.0" dependencies = [ + "async-trait", + "bytes", "libunftp", "opendal", + "tokio", + "tokio-util", ] [[package]] diff --git a/integrations/unftp-sbe/Cargo.toml b/integrations/unftp-sbe/Cargo.toml index 58a4d5cbb4f3..2c696861f115 100644 --- a/integrations/unftp-sbe/Cargo.toml +++ b/integrations/unftp-sbe/Cargo.toml @@ -27,5 +27,9 @@ rust-version = "1.75" version = "0.0.0" [dependencies] +async-trait = "0.1.80" +bytes = "1.6.0" libunftp = "0.20.0" opendal = { version = "0.47.0", path = "../../core" } +tokio = { version = "1.38.0", default-features = false, features = ["io-util"] } +tokio-util = { version = "0.7.11", features = ["compat"] } diff --git a/integrations/unftp-sbe/src/lib.rs b/integrations/unftp-sbe/src/lib.rs index b248758bc120..aabaad8fe891 100644 --- a/integrations/unftp-sbe/src/lib.rs +++ b/integrations/unftp-sbe/src/lib.rs @@ -14,3 +14,218 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +use std::fmt::Debug; +use std::path::{Path, PathBuf}; +use std::task::Poll; + +use bytes::{Buf, BufMut}; +use libunftp::auth::UserDetail; +use libunftp::storage::{self, StorageBackend}; +use opendal::{Buffer, Operator}; + +use tokio::io::AsyncRead; +use tokio_util::compat::FuturesAsyncWriteCompatExt; + +#[derive(Debug, Clone)] +pub struct OpendalStorage { + op: Operator, +} + +impl OpendalStorage { + pub fn new(op: Operator) -> Self { + Self { op } + } +} + +/// A wrapper around [`Buffer`] to implement [`tokio::io::AsyncRead`]. +pub struct IoBuffer(Buffer); + +impl AsyncRead for IoBuffer { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let len = std::io::copy(&mut self.as_mut().0.by_ref().reader(), &mut buf.writer())?; + self.0.advance(len as usize); + Poll::Ready(Ok(())) + } +} + +/// A wrapper around [`opendal::Metadata`] to implement [`libunftp::storage::Metadata`]. +pub struct OpendalMetadata(opendal::Metadata); + +impl storage::Metadata for OpendalMetadata { + fn len(&self) -> u64 { + self.0.content_length() + } + + fn is_dir(&self) -> bool { + self.0.is_dir() + } + + fn is_file(&self) -> bool { + self.0.is_file() + } + + fn is_symlink(&self) -> bool { + false + } + + fn modified(&self) -> storage::Result { + self.0.last_modified().map(Into::into).ok_or_else(|| { + storage::Error::new(storage::ErrorKind::LocalError, "no last modified time") + }) + } + + fn gid(&self) -> u32 { + 0 + } + + fn uid(&self) -> u32 { + 0 + } +} + +fn convert_err(err: opendal::Error) -> storage::Error { + let kind = match err.kind() { + opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable, + opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable, + opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied, + _ => storage::ErrorKind::LocalError, + }; + storage::Error::new(kind, err) +} + +fn convert_path(path: &Path) -> storage::Result<&str> { + path.to_str().ok_or_else(|| { + storage::Error::new( + storage::ErrorKind::LocalError, + "Path is not a valid UTF-8 string", + ) + }) +} + +#[async_trait::async_trait] +impl StorageBackend for OpendalStorage { + type Metadata = OpendalMetadata; + + async fn metadata + Send + Debug>( + &self, + _: &User, + path: P, + ) -> storage::Result { + let metadata = self + .op + .stat(convert_path(path.as_ref())?) + .await + .map_err(convert_err)?; + Ok(OpendalMetadata(metadata)) + } + + async fn list + Send + Debug>( + &self, + _: &User, + path: P, + ) -> storage::Result>> + where + Self::Metadata: storage::Metadata, + { + let ret = self + .op + .list(convert_path(path.as_ref())?) + .await + .map_err(convert_err)? + .into_iter() + .map(|x| { + let (path, metadata) = x.into_parts(); + storage::Fileinfo { + path: path.into(), + metadata: OpendalMetadata(metadata), + } + }) + .collect(); + Ok(ret) + } + + async fn get + Send + Debug>( + &self, + _: &User, + path: P, + start_pos: u64, + ) -> storage::Result> { + let buf = self + .op + .read_with(convert_path(path.as_ref())?) + .range(start_pos..) + .await + .map_err(convert_err)?; + Ok(Box::new(IoBuffer(buf))) + } + + async fn put< + P: AsRef + Send + Debug, + R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static, + >( + &self, + _: &User, + mut input: R, + path: P, + _: u64, + ) -> storage::Result { + let mut w = self + .op + .writer(convert_path(path.as_ref())?) + .await + .map_err(convert_err)? + .into_futures_async_write() + .compat_write(); + let len = tokio::io::copy(&mut input, &mut w).await?; + Ok(len) + } + + async fn del + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> { + self.op + .delete(convert_path(path.as_ref())?) + .await + .map_err(convert_err) + } + + async fn mkd + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> { + self.op + .create_dir(convert_path(path.as_ref())?) + .await + .map_err(convert_err) + } + + async fn rename + Send + Debug>( + &self, + _: &User, + from: P, + to: P, + ) -> storage::Result<()> { + let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?); + self.op.rename(from, to).await.map_err(convert_err) + } + + async fn rmd + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> { + self.op + .remove_all(convert_path(path.as_ref())?) + .await + .map_err(convert_err) + } + + async fn cwd + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> { + use opendal::ErrorKind::*; + + match self.op.stat(convert_path(path.as_ref())?).await { + Ok(_) => Ok(()), + Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(storage::Error::new( + storage::ErrorKind::PermanentDirectoryNotAvailable, + e, + )), + Err(e) => Err(convert_err(e)), + } + } +}