From e5ecbb1c53cd94bc81d3b7a2ea9182f78da72ab1 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Tue, 13 Feb 2024 23:40:30 +0800 Subject: [PATCH 1/9] refactor: project structure --- bin/ofs/src/bin/ofs.rs | 62 +--------- bin/ofs/src/config.rs | 33 +++++ bin/ofs/src/frontend/fuse.rs | 229 +++++++++++++++++++++++++++++++++++ bin/ofs/src/frontend/mod.rs | 56 +++++++++ bin/ofs/src/lib.rs | 225 +++++----------------------------- 5 files changed, 350 insertions(+), 255 deletions(-) create mode 100644 bin/ofs/src/config.rs create mode 100644 bin/ofs/src/frontend/fuse.rs create mode 100644 bin/ofs/src/frontend/mod.rs diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/bin/ofs.rs index 08399b56ef22..89a619928724 100644 --- a/bin/ofs/src/bin/ofs.rs +++ b/bin/ofs/src/bin/ofs.rs @@ -15,69 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; -use std::str::FromStr; - -use anyhow::anyhow; -use anyhow::Context; use anyhow::Result; use clap::Parser; -use fuse3::path::Session; -use fuse3::MountOptions; -use ofs::Ofs; -use opendal::Operator; -use opendal::Scheme; -use url::Url; #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); - fuse().await -} - -#[derive(Parser, Debug)] -#[command(version, about)] -struct Config { - /// fuse mount path - #[arg(env = "OFS_MOUNT_PATH", index = 1)] - mount_path: String, - - /// location of opendal service - /// format: ://?=&= - /// example: fs://root=/tmp - #[arg(env = "OFS_BACKEND", index = 2)] - backend: String, -} - -async fn fuse() -> Result<()> { - let cfg = Config::try_parse().context("parse command line arguments")?; - - let location = Url::parse(&cfg.backend)?; - if location.has_host() { - Err(anyhow!("Host part in a location is not supported."))?; - } + let cfg = ofs::Config::parse(); - let scheme_str = location.scheme(); - - let op_args = location - .query_pairs() - .into_owned() - .collect::>(); - - let scheme = Scheme::from_str(scheme_str).context("unsupported scheme")?; - let op = Operator::via_map(scheme, op_args)?; - - let mut mount_option = MountOptions::default(); - mount_option.uid(nix::unistd::getuid().into()); - mount_option.gid(nix::unistd::getgid().into()); - - let ofs = Ofs { op }; - - let mounthandle = Session::new(mount_option) - .mount_with_unprivileged(ofs, cfg.mount_path) - .await?; - - mounthandle.await?; - - Ok(()) + env_logger::init(); + ofs::new_app(cfg).await } diff --git a/bin/ofs/src/config.rs b/bin/ofs/src/config.rs new file mode 100644 index 000000000000..6afa729a79ef --- /dev/null +++ b/bin/ofs/src/config.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about)] +pub struct Config { + /// fuse mount path + #[arg(env = "OFS_MOUNT_PATH", index = 1)] + pub(crate) mount_path: String, + + /// location of opendal service + /// format: ://?=&= + /// example: fs://?root=/tmp + #[arg(env = "OFS_BACKEND", index = 2)] + pub(crate) backend: Url, +} diff --git a/bin/ofs/src/frontend/fuse.rs b/bin/ofs/src/frontend/fuse.rs new file mode 100644 index 000000000000..090f12a7cd89 --- /dev/null +++ b/bin/ofs/src/frontend/fuse.rs @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::OsStr; +use std::vec::IntoIter; + +use async_trait::async_trait; +use fuse3::path::prelude::*; +use fuse3::Result; +use futures_util::stream::Empty; +use futures_util::stream::Iter; +use opendal::Operator; + +pub(super) struct Ofs { + op: Operator, +} + +impl Ofs { + pub fn new(op: Operator) -> Self { + Self { op } + } +} + +#[async_trait] +impl PathFilesystem for Ofs { + type DirEntryStream = Empty>; + type DirEntryPlusStream = Iter>>; + + // Init a fuse filesystem + async fn init(&self, _req: Request) -> Result<()> { + Ok(()) + } + + // Callback when fs is being destroyed + async fn destroy(&self, _req: Request) {} + + async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> Result { + // TODO + Err(libc::ENOSYS.into()) + } + + async fn getattr( + &self, + _req: Request, + path: Option<&OsStr>, + _fh: Option, + _flags: u32, + ) -> Result { + // TODO + log::debug!("getattr(path={:?})", path); + + Err(libc::ENOSYS.into()) + } + + async fn read( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + offset: u64, + size: u32, + ) -> Result { + // TODO + log::debug!( + "read(path={:?}, fh={}, offset={}, size={})", + path, + fh, + offset, + size + ); + + Err(libc::ENOSYS.into()) + } + + async fn mkdir( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _umask: u32, + ) -> Result { + // TODO + log::debug!( + "mkdir(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + + Err(libc::ENOSYS.into()) + } + + async fn readdir( + &self, + _req: Request, + path: &OsStr, + fh: u64, + offset: i64, + ) -> Result> { + // TODO + log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); + + Err(libc::ENOSYS.into()) + } + + async fn mknod( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _rdev: u32, + ) -> Result { + // TODO + log::debug!( + "mknod(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + + Err(libc::ENOSYS.into()) + } + + async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { + // TODO + log::debug!("open(path={:?}, flags=0x{:x})", path, flags); + + Err(libc::ENOSYS.into()) + } + + async fn setattr( + &self, + _req: Request, + path: Option<&OsStr>, + _fh: Option, + _set_attr: SetAttr, + ) -> Result { + // TODO + log::debug!("setattr(path={:?})", path); + + Err(libc::ENOSYS.into()) + } + + async fn write( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + offset: u64, + data: &[u8], + flags: u32, + ) -> Result { + // TODO + log::debug!( + "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})", + path, + fh, + offset, + data.len(), + flags + ); + + Err(libc::ENOSYS.into()) + } + + async fn release( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + flags: u32, + _lock_owner: u64, + flush: bool, + ) -> Result<()> { + // TODO + log::debug!( + "release(path={:?}, fh={}, flags={}, flush={})", + path, + fh, + flags, + flush + ); + + Err(libc::ENOSYS.into()) + } + + async fn rename( + &self, + _req: Request, + origin_parent: &OsStr, + origin_name: &OsStr, + parent: &OsStr, + name: &OsStr, + ) -> Result<()> { + // TODO + log::debug!( + "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", + origin_parent, + origin_name, + parent, + name + ); + + Err(libc::ENOSYS.into()) + } + + async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { + // TODO + log::debug!("unlink(parent={:?}, name={:?})", parent, name); + + Err(libc::ENOSYS.into()) + } +} diff --git a/bin/ofs/src/frontend/mod.rs b/bin/ofs/src/frontend/mod.rs new file mode 100644 index 000000000000..7ea87ec29875 --- /dev/null +++ b/bin/ofs/src/frontend/mod.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::Result; +use opendal::Operator; + +#[cfg(target_os = "linux")] +mod fuse; + +pub(crate) struct FrontendArgs { + pub mount_path: String, + pub backend: Operator, +} + +pub(crate) struct Frontend; + +impl Frontend { + #[cfg(any(not(target_os = "linux")))] + pub async fn execute(_: FrontendArgs) -> Result<()> { + Err(anyhow::anyhow!("platform not supported")) + } + + #[cfg(target_os = "linux")] + pub async fn execute(args: FrontendArgs) -> Result<()> { + use fuse3::path::Session; + use fuse3::MountOptions; + + let mut mount_option = MountOptions::default(); + mount_option.uid(nix::unistd::getuid().into()); + mount_option.gid(nix::unistd::getgid().into()); + + let ofs = fuse::Ofs::new(args.backend); + + let mount_handle = Session::new(mount_option) + .mount_with_unprivileged(ofs, args.mount_path) + .await?; + + mount_handle.await?; + + Ok(()) + } +} diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs index 3fc0cadae6b5..d7972eeaf6a0 100644 --- a/bin/ofs/src/lib.rs +++ b/bin/ofs/src/lib.rs @@ -15,209 +15,42 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::OsStr; -use std::vec::IntoIter; +use std::collections::HashMap; +use std::str::FromStr; -use async_trait::async_trait; -use fuse3::path::prelude::*; -use fuse3::Result; -use futures_util::stream::Empty; -use futures_util::stream::Iter; +use anyhow::anyhow; +use anyhow::Result; +use frontend::Frontend; +use frontend::FrontendArgs; use opendal::Operator; +use opendal::Scheme; -pub struct Ofs { - pub op: Operator, -} - -#[async_trait] -impl PathFilesystem for Ofs { - type DirEntryStream = Empty>; - type DirEntryPlusStream = Iter>>; - - // Init a fuse filesystem - async fn init(&self, _req: Request) -> Result<()> { - Ok(()) - } - - // Callback when fs is being destroyed - async fn destroy(&self, _req: Request) {} - - async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> Result { - // TODO - Err(libc::ENOSYS.into()) - } - - async fn getattr( - &self, - _req: Request, - path: Option<&OsStr>, - _fh: Option, - _flags: u32, - ) -> Result { - // TODO - log::debug!("getattr(path={:?})", path); - - Err(libc::ENOSYS.into()) - } - - async fn read( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - offset: u64, - size: u32, - ) -> Result { - // TODO - log::debug!( - "read(path={:?}, fh={}, offset={}, size={})", - path, - fh, - offset, - size - ); - - Err(libc::ENOSYS.into()) - } - - async fn mkdir( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - _umask: u32, - ) -> Result { - // TODO - log::debug!( - "mkdir(parent={:?}, name={:?}, mode=0o{:o})", - parent, - name, - mode - ); - - Err(libc::ENOSYS.into()) - } - - async fn readdir( - &self, - _req: Request, - path: &OsStr, - fh: u64, - offset: i64, - ) -> Result> { - // TODO - log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); +pub mod config; +pub use config::Config; - Err(libc::ENOSYS.into()) - } - - async fn mknod( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - _rdev: u32, - ) -> Result { - // TODO - log::debug!( - "mknod(parent={:?}, name={:?}, mode=0o{:o})", - parent, - name, - mode - ); - - Err(libc::ENOSYS.into()) - } - - async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { - // TODO - log::debug!("open(path={:?}, flags=0x{:x})", path, flags); +mod frontend; - Err(libc::ENOSYS.into()) +pub async fn new_app(cfg: Config) -> Result<()> { + if cfg.backend.has_host() { + log::warn!("backend host will be ignored"); } - async fn setattr( - &self, - _req: Request, - path: Option<&OsStr>, - _fh: Option, - _set_attr: SetAttr, - ) -> Result { - // TODO - log::debug!("setattr(path={:?})", path); + let scheme_str = cfg.backend.scheme(); + let op_args = cfg + .backend + .query_pairs() + .into_owned() + .collect::>(); - Err(libc::ENOSYS.into()) - } - - async fn write( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - offset: u64, - data: &[u8], - flags: u32, - ) -> Result { - // TODO - log::debug!( - "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})", - path, - fh, - offset, - data.len(), - flags - ); - - Err(libc::ENOSYS.into()) - } + let scheme = match Scheme::from_str(scheme_str) { + Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)), + Ok(s) => Ok(s), + }?; + let backend = Operator::via_map(scheme, op_args)?; - async fn release( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - flags: u32, - _lock_owner: u64, - flush: bool, - ) -> Result<()> { - // TODO - log::debug!( - "release(path={:?}, fh={}, flags={}, flush={})", - path, - fh, - flags, - flush - ); - - Err(libc::ENOSYS.into()) - } - - async fn rename( - &self, - _req: Request, - origin_parent: &OsStr, - origin_name: &OsStr, - parent: &OsStr, - name: &OsStr, - ) -> Result<()> { - // TODO - log::debug!( - "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", - origin_parent, - origin_name, - parent, - name - ); - - Err(libc::ENOSYS.into()) - } - - async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { - // TODO - log::debug!("unlink(parent={:?}, name={:?})", parent, name); - - Err(libc::ENOSYS.into()) - } + let args = FrontendArgs { + mount_path: cfg.mount_path, + backend, + }; + Frontend::execute(args).await } From f62d218e0d32526a75fd1257996ca2bd0343983d Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Fri, 16 Feb 2024 21:40:14 +0800 Subject: [PATCH 2/9] feat: impl fuse for linux [wip] --- bin/ofs/Cargo.lock | 14 +- bin/ofs/Cargo.toml | 16 +- bin/ofs/src/frontend/fuse.rs | 412 ++++++++++++++++++++++++++++------- bin/ofs/src/frontend/mod.rs | 9 +- 4 files changed, 357 insertions(+), 94 deletions(-) diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock index 6a3952492a5e..68b92592853d 100644 --- a/bin/ofs/Cargo.lock +++ b/bin/ofs/Cargo.lock @@ -57,9 +57,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" @@ -216,9 +216,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -419,9 +419,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8" +checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" dependencies = [ "anstream", "anstyle", @@ -1026,7 +1026,7 @@ name = "ofs" version = "0.0.1+core.0.45.0" dependencies = [ "anyhow", - "async-trait", + "chrono", "clap", "env_logger", "fuse3", diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml index 187e81d4f8a7..17d72261691c 100644 --- a/bin/ofs/Cargo.toml +++ b/bin/ofs/Cargo.toml @@ -31,15 +31,11 @@ rust-version = "1.67" [dependencies] anyhow = "1" -async-trait = "0.1.75" -clap = { version = "4.5.1", features = ["derive", "env"] } -env_logger = "0.11" -fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] } +clap = { version = "4.4.18", features = ["derive", "env"] } +env_logger = "0.11.2" futures-util = "0.3.30" -libc = "0.2.151" log = "0.4.20" -nix = { version = "0.27.1", features = ["user"] } -opendal = { version = "0.45.0", path = "../../core" } +opendal = { path = "../../core" } tokio = { version = "1.34", features = [ "fs", "macros", @@ -47,3 +43,9 @@ tokio = { version = "1.34", features = [ "io-std", ] } url = "2.5.0" +chrono = "0.4.34" + +[target.'cfg(target_os = "linux")'.dependencies] +libc = "0.2.151" +fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] } +nix = { version = "0.27.1", features = ["user"] } diff --git a/bin/ofs/src/frontend/fuse.rs b/bin/ofs/src/frontend/fuse.rs index 090f12a7cd89..7434c38ce286 100644 --- a/bin/ofs/src/frontend/fuse.rs +++ b/bin/ofs/src/frontend/fuse.rs @@ -16,28 +16,40 @@ // under the License. use std::ffi::OsStr; +use std::path::PathBuf; +use std::time::Duration; +use std::time::SystemTime; use std::vec::IntoIter; -use async_trait::async_trait; +use fuse3::async_trait; use fuse3::path::prelude::*; +use fuse3::Errno; use fuse3::Result; -use futures_util::stream::Empty; +use futures_util::stream; use futures_util::stream::Iter; +use futures_util::StreamExt; +use opendal::EntryMode; +use opendal::ErrorKind; +use opendal::Metadata; use opendal::Operator; +const TTL: Duration = Duration::from_secs(1); // 1 second + pub(super) struct Ofs { op: Operator, + gid: u32, + uid: u32, } impl Ofs { - pub fn new(op: Operator) -> Self { - Self { op } + pub fn new(op: Operator, uid: u32, gid: u32) -> Self { + Self { op, uid, gid } } } #[async_trait] impl PathFilesystem for Ofs { - type DirEntryStream = Empty>; + type DirEntryStream = Iter>>; type DirEntryPlusStream = Iter>>; // Init a fuse filesystem @@ -48,9 +60,27 @@ impl PathFilesystem for Ofs { // Callback when fs is being destroyed async fn destroy(&self, _req: Request) {} - async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> Result { - // TODO - Err(libc::ENOSYS.into()) + async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result { + log::debug!( + "lookup(parent={}, name=\"{}\")", + parent.to_string_lossy(), + name.to_string_lossy() + ); + + let path = PathBuf::from(parent).join(name); + let metadata = self + .op + .stat(&path.to_string_lossy()) + .await + .map_err(|e| match e.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + _ => Errno::from(libc::ENOENT), + })?; + + let now = SystemTime::now(); + let attr = metadata2file_attr(&metadata, now); + + Ok(ReplyEntry { ttl: TTL, attr }) } async fn getattr( @@ -60,30 +90,68 @@ impl PathFilesystem for Ofs { _fh: Option, _flags: u32, ) -> Result { - // TODO log::debug!("getattr(path={:?})", path); - Err(libc::ENOSYS.into()) + let metadata = self + .op + .stat(&path.unwrap_or_default().to_string_lossy()) + .await + .map_err(|e| match e.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + _ => Errno::from(libc::ENOENT), + })?; + + let now = SystemTime::now(); + let mut attr = metadata2file_attr(&metadata, now); + attr.uid = self.uid; + attr.gid = self.gid; + + Ok(ReplyAttr { ttl: TTL, attr }) } - async fn read( + async fn setattr( &self, _req: Request, path: Option<&OsStr>, - fh: u64, - offset: u64, - size: u32, - ) -> Result { - // TODO + _fh: Option, + _set_attr: SetAttr, + ) -> Result { + log::debug!("setattr(path={:?})", path); + Err(libc::EOPNOTSUPP.into()) + } + + async fn symlink( + &self, + req: Request, + parent: &OsStr, + name: &OsStr, + link_path: &OsStr, + ) -> Result { log::debug!( - "read(path={:?}, fh={}, offset={}, size={})", - path, - fh, - offset, - size + "symlink(req={:?}, parent={:?}, name={:?}, link_path={:?})", + req, + parent, + name, + link_path ); + Err(libc::EOPNOTSUPP.into()) + } - Err(libc::ENOSYS.into()) + async fn mknod( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _rdev: u32, + ) -> Result { + log::debug!( + "mknod(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + Err(libc::EOPNOTSUPP.into()) } async fn mkdir( @@ -94,7 +162,6 @@ impl PathFilesystem for Ofs { mode: u32, _umask: u32, ) -> Result { - // TODO log::debug!( "mkdir(parent={:?}, name={:?}, mode=0o{:o})", parent, @@ -102,57 +169,119 @@ impl PathFilesystem for Ofs { mode ); - Err(libc::ENOSYS.into()) + let path = PathBuf::from(parent).join(name); + self.op + .create_dir(&path.to_string_lossy()) + .await + .map_err(|e| match e.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + _ => Errno::from(libc::ENOENT), + })?; + + let metadata = Metadata::new(EntryMode::DIR); + let now = SystemTime::now(); + let mut attr = metadata2file_attr(&metadata, now); + attr.uid = self.uid; + attr.gid = self.gid; + + Ok(ReplyEntry { ttl: TTL, attr }) } - async fn readdir( - &self, - _req: Request, - path: &OsStr, - fh: u64, - offset: i64, - ) -> Result> { - // TODO - log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); + async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { + log::debug!("unlink(parent={:?}, name={:?})", parent, name); + Err(libc::EOPNOTSUPP.into()) + } - Err(libc::ENOSYS.into()) + async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { + log::debug!("rmdir(parent={:?}, name={:?})", parent, name); + + let path = PathBuf::from(parent).join(name); + self.op + .delete(&path.to_string_lossy()) + .await + .map_err(|e| match e.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + _ => Errno::from(libc::ENOENT), + })?; + + Ok(()) } - async fn mknod( + async fn rename( &self, _req: Request, + origin_parent: &OsStr, + origin_name: &OsStr, parent: &OsStr, name: &OsStr, - mode: u32, - _rdev: u32, - ) -> Result { - // TODO + ) -> Result<()> { log::debug!( - "mknod(parent={:?}, name={:?}, mode=0o{:o})", + "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", + origin_parent, + origin_name, parent, - name, - mode + name ); - Err(libc::ENOSYS.into()) + let origin_path = PathBuf::from(origin_parent).join(origin_name); + let path = PathBuf::from(parent).join(name); + + self.op + .rename(&origin_path.to_string_lossy(), &path.to_string_lossy()) + .await + .map_err(|e| match e.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::IsSameFile => Errno::from(libc::EINVAL), + _ => Errno::from(libc::ENOENT), + })?; + + Ok(()) + } + + async fn link( + &self, + _req: Request, + path: &OsStr, + new_parent: &OsStr, + new_name: &OsStr, + ) -> Result { + log::debug!( + "link(path={:?}, new_parent={:?}, new_name={:?})", + path, + new_parent, + new_name + ); + Err(libc::EOPNOTSUPP.into()) } async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { // TODO log::debug!("open(path={:?}, flags=0x{:x})", path, flags); - Err(libc::ENOSYS.into()) } - async fn setattr( + async fn read( &self, _req: Request, path: Option<&OsStr>, - _fh: Option, - _set_attr: SetAttr, - ) -> Result { + fh: u64, + offset: u64, + size: u32, + ) -> Result { // TODO - log::debug!("setattr(path={:?})", path); + log::debug!( + "read(path={:?}, fh={}, offset={}, size={})", + path, + fh, + offset, + size + ); Err(libc::ENOSYS.into()) } @@ -179,51 +308,180 @@ impl PathFilesystem for Ofs { Err(libc::ENOSYS.into()) } - async fn release( + async fn readdir( &self, - _req: Request, - path: Option<&OsStr>, + req: Request, + path: &OsStr, fh: u64, - flags: u32, - _lock_owner: u64, - flush: bool, - ) -> Result<()> { - // TODO + offset: i64, + ) -> Result> { log::debug!( - "release(path={:?}, fh={}, flags={}, flush={})", + "readdir(req={:?}, path={:?}, fh={}, offset={})", + req, path, fh, - flags, - flush + offset ); - Err(libc::ENOSYS.into()) + let current_dir = path.to_string_lossy(); + let entries = self + .op + .list(¤t_dir) + .await + .map_err(|e| match e.kind() { + ErrorKind::NotFound => Errno::new_not_exist(), + ErrorKind::NotADirectory => Errno::new_is_not_dir(), + _ => Errno::from(libc::ENOENT), + })?; + + let relative_paths = [ + Result::Ok(DirectoryEntry { + kind: FileType::Directory, + name: ".".into(), + offset: 1, + }), + Result::Ok(DirectoryEntry { + kind: FileType::Directory, + name: "..".into(), + offset: 2, + }), + ]; + + let res = relative_paths + .into_iter() + .chain(entries.iter().enumerate().map(|(i, entry)| { + Result::Ok(DirectoryEntry { + kind: entry_mode2file_type(entry.metadata().mode()), + name: entry.name().trim_matches('/').into(), + offset: (i + 3) as i64, + }) + })) + .skip(offset as usize) + .collect::>(); + log::debug!("readdir entries={:#?}", res); + + Ok(ReplyDirectory { + entries: stream::iter(res), + }) } - async fn rename( + async fn access(&self, _req: Request, path: &OsStr, mask: u32) -> Result<()> { + log::debug!("access(path={:?}, mask=0x{:x})", path, mask); + self.op + .stat(&path.to_string_lossy()) + .await + .map_err(|e| match e.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + _ => Errno::from(libc::ENOENT), + })?; + + Ok(()) + } + + async fn readdirplus( &self, - _req: Request, - origin_parent: &OsStr, - origin_name: &OsStr, + req: Request, parent: &OsStr, - name: &OsStr, - ) -> Result<()> { - // TODO + fh: u64, + offset: u64, + _lock_owner: u64, + ) -> Result> { log::debug!( - "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", - origin_parent, - origin_name, + "readdirplus(req={:?}, parent={:?}, fh={}, offset={})", + req, parent, - name + fh, + offset ); - Err(libc::ENOSYS.into()) + let current_dir = parent.to_string_lossy(); + let entries = self + .op + .list(¤t_dir) + .await + .map_err(|e| match e.kind() { + ErrorKind::NotFound => Errno::new_not_exist(), + ErrorKind::NotADirectory => Errno::new_is_not_dir(), + _ => Errno::from(libc::ENOENT), + })?; + + let now = SystemTime::now(); + let relative_path_metadata = Metadata::new(EntryMode::DIR); + let relative_path_attr = metadata2file_attr(&relative_path_metadata, now); + let relative_paths = stream::iter([ + Result::Ok(DirectoryEntryPlus { + kind: FileType::Directory, + name: ".".into(), + offset: 1, + attr: relative_path_attr, + entry_ttl: TTL, + attr_ttl: TTL, + }), + Result::Ok(DirectoryEntryPlus { + kind: FileType::Directory, + name: "..".into(), + offset: 2, + attr: relative_path_attr, + entry_ttl: TTL, + attr_ttl: TTL, + }), + ]); + + let children = stream::iter(entries) + .enumerate() + .then(|(i, entry)| async move { + let metadata = self + .op + .stat(&entry.name()) + .await + .unwrap_or_else(|_| entry.metadata().clone()); + let mut attr = metadata2file_attr(&metadata, now); + attr.uid = self.uid; + attr.gid = self.gid; + Result::Ok(DirectoryEntryPlus { + kind: entry_mode2file_type(entry.metadata().mode()), + name: entry.name().trim_matches('/').into(), + offset: (i + 3) as i64, + attr, + entry_ttl: TTL, + attr_ttl: TTL, + }) + }); + + let res = relative_paths + .chain(children) + .skip(offset as usize) + .collect::>() + .await; + + Ok(ReplyDirectoryPlus { + entries: stream::iter(res), + }) } +} - async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { - // TODO - log::debug!("unlink(parent={:?}, name={:?})", parent, name); +const fn entry_mode2file_type(mode: EntryMode) -> FileType { + match mode { + EntryMode::DIR => FileType::Directory, + _ => FileType::RegularFile, + } +} - Err(libc::ENOSYS.into()) +fn metadata2file_attr(metadata: &Metadata, atime: SystemTime) -> FileAttr { + let last_modified = metadata.last_modified().map(|t| t.into()).unwrap_or(atime); + let kind = entry_mode2file_type(metadata.mode()); + FileAttr { + size: metadata.content_length(), + blocks: 0, + atime, + mtime: last_modified, + ctime: last_modified, + kind, + perm: fuse3::perm_from_mode_and_kind(kind, 0o775), + nlink: 0, + uid: 1000, + gid: 1000, + rdev: 0, + blksize: 4096, } } diff --git a/bin/ofs/src/frontend/mod.rs b/bin/ofs/src/frontend/mod.rs index 7ea87ec29875..5eba238ce886 100644 --- a/bin/ofs/src/frontend/mod.rs +++ b/bin/ofs/src/frontend/mod.rs @@ -39,11 +39,14 @@ impl Frontend { use fuse3::path::Session; use fuse3::MountOptions; + let uid = nix::unistd::getuid(); + let gid = nix::unistd::getgid(); + let mut mount_option = MountOptions::default(); - mount_option.uid(nix::unistd::getuid().into()); - mount_option.gid(nix::unistd::getgid().into()); + mount_option.uid(uid.into()); + mount_option.gid(gid.into()); - let ofs = fuse::Ofs::new(args.backend); + let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into()); let mount_handle = Session::new(mount_option) .mount_with_unprivileged(ofs, args.mount_path) From 65d87250b2f7a917fa1cff015d2bb36af305ad54 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Sun, 18 Feb 2024 15:28:53 +0800 Subject: [PATCH 3/9] feat: impl fuse for linux --- bin/ofs/Cargo.lock | 10 + bin/ofs/Cargo.toml | 2 + bin/ofs/src/frontend/fuse.rs | 352 +++++++++++++++++++++++++++-------- bin/ofs/src/frontend/mod.rs | 1 + 4 files changed, 292 insertions(+), 73 deletions(-) diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock index 68b92592853d..051c429ef79e 100644 --- a/bin/ofs/Cargo.lock +++ b/bin/ofs/Cargo.lock @@ -1035,6 +1035,7 @@ dependencies = [ "log", "nix 0.27.1", "opendal", + "sharded-slab", "tokio", "url", ] @@ -1559,6 +1560,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signature" version = "2.2.0" diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml index 17d72261691c..669d0e30e766 100644 --- a/bin/ofs/Cargo.toml +++ b/bin/ofs/Cargo.toml @@ -44,6 +44,8 @@ tokio = { version = "1.34", features = [ ] } url = "2.5.0" chrono = "0.4.34" +sharded-slab = "0.1.7" +bytes = "1.5.0" [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2.151" diff --git a/bin/ofs/src/frontend/fuse.rs b/bin/ofs/src/frontend/fuse.rs index 7434c38ce286..57b8337a61c4 100644 --- a/bin/ofs/src/frontend/fuse.rs +++ b/bin/ofs/src/frontend/fuse.rs @@ -16,11 +16,14 @@ // under the License. use std::ffi::OsStr; +use std::ffi::OsString; +use std::ops::Deref; use std::path::PathBuf; use std::time::Duration; use std::time::SystemTime; use std::vec::IntoIter; +use bytes::Bytes; use fuse3::async_trait; use fuse3::path::prelude::*; use fuse3::Errno; @@ -28,22 +31,79 @@ use fuse3::Result; use futures_util::stream; use futures_util::stream::Iter; use futures_util::StreamExt; + use opendal::EntryMode; use opendal::ErrorKind; use opendal::Metadata; use opendal::Operator; +use sharded_slab::Slab; const TTL: Duration = Duration::from_secs(1); // 1 second +#[derive(Debug, Clone)] +struct OpenedFile { + path: OsString, + is_read: bool, + is_write: bool, + is_append: bool, +} + pub(super) struct Ofs { op: Operator, gid: u32, uid: u32, + opened_files: Slab, } impl Ofs { pub fn new(op: Operator, uid: u32, gid: u32) -> Self { - Self { op, uid, gid } + Self { + op, + uid, + gid, + opened_files: Slab::new(), + } + } + + fn check_flags(&self, flags: u32) -> Result<(bool, bool)> { + let mode = flags & libc::O_ACCMODE as u32; + let is_read = mode == libc::O_RDONLY as u32 || mode == libc::O_RDWR as u32; + let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32; + if !is_read && !is_write { + Err(Errno::from(libc::EINVAL))?; + } + + let capability = self.op.info().full_capability(); + if is_read && !capability.read { + Err(Errno::from(libc::EACCES))?; + } + if is_write && !capability.write { + Err(Errno::from(libc::EACCES))?; + } + + log::trace!("check_flags: is_read={}, is_write={}", is_read, is_write); + Ok((is_read, is_write)) + } + + // Get opened file and check given path + fn get_opened_file(&self, key: usize, path: Option<&OsStr>) -> Result { + let file = self + .opened_files + .get(key) + .as_ref() + .ok_or(Errno::from(libc::ENOENT))? + .deref() + .clone(); + if matches!(path, Some(path) if path != file.path) { + log::trace!( + "get_opened_file: path not match: path={:?}, file={:?}", + path, + file.path + ); + Err(Errno::from(libc::EBADF))?; + } + + Ok(file) } } @@ -61,24 +121,19 @@ impl PathFilesystem for Ofs { async fn destroy(&self, _req: Request) {} async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result { - log::debug!( - "lookup(parent={}, name=\"{}\")", - parent.to_string_lossy(), - name.to_string_lossy() - ); + log::debug!("lookup(parent={:?}, name={:?})", parent, name); let path = PathBuf::from(parent).join(name); let metadata = self .op .stat(&path.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)?; let now = SystemTime::now(); - let attr = metadata2file_attr(&metadata, now); + let mut attr = metadata2file_attr(&metadata, now); + attr.uid = self.uid; + attr.gid = self.gid; Ok(ReplyEntry { ttl: TTL, attr }) } @@ -87,19 +142,34 @@ impl PathFilesystem for Ofs { &self, _req: Request, path: Option<&OsStr>, - _fh: Option, - _flags: u32, + fh: Option, + flags: u32, ) -> Result { - log::debug!("getattr(path={:?})", path); + log::debug!("getattr(path={:?}, fh={:?}, flags={:?})", path, fh, flags); + + let key = fh.unwrap_or_default() - 1; + let fh_path = self + .opened_files + .get(key as usize) + .as_ref() + .map(|f| &f.path) + .cloned(); + + let file_path = match (path.map(Into::into), fh_path) { + (Some(a), Some(b)) => { + if a != b { + Err(Errno::from(libc::EBADF))?; + } + Some(a) + } + (a, b) => a.or(b), + }; let metadata = self .op - .stat(&path.unwrap_or_default().to_string_lossy()) + .stat(&file_path.unwrap_or_default().to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)?; let now = SystemTime::now(); let mut attr = metadata2file_attr(&metadata, now); @@ -113,23 +183,27 @@ impl PathFilesystem for Ofs { &self, _req: Request, path: Option<&OsStr>, - _fh: Option, - _set_attr: SetAttr, + fh: Option, + set_attr: SetAttr, ) -> Result { - log::debug!("setattr(path={:?})", path); + log::debug!( + "setattr(path={:?}, fh={:?}, set_attr={:?})", + path, + fh, + set_attr + ); Err(libc::EOPNOTSUPP.into()) } async fn symlink( &self, - req: Request, + _req: Request, parent: &OsStr, name: &OsStr, link_path: &OsStr, ) -> Result { log::debug!( - "symlink(req={:?}, parent={:?}, name={:?}, link_path={:?})", - req, + "symlink(parent={:?}, name={:?}, link_path={:?})", parent, name, link_path @@ -169,16 +243,12 @@ impl PathFilesystem for Ofs { mode ); - let path = PathBuf::from(parent).join(name); + let mut path = PathBuf::from(parent).join(name); + path.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 self.op .create_dir(&path.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), - ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), - ErrorKind::PermissionDenied => Errno::from(libc::EACCES), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)?; let metadata = Metadata::new(EntryMode::DIR); let now = SystemTime::now(); @@ -191,7 +261,14 @@ impl PathFilesystem for Ofs { async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { log::debug!("unlink(parent={:?}, name={:?})", parent, name); - Err(libc::EOPNOTSUPP.into()) + + let path = PathBuf::from(parent).join(name); + self.op + .delete(&path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + Ok(()) } async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { @@ -201,12 +278,7 @@ impl PathFilesystem for Ofs { self.op .delete(&path.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), - ErrorKind::NotFound => Errno::from(libc::ENOENT), - ErrorKind::PermissionDenied => Errno::from(libc::EACCES), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)?; Ok(()) } @@ -233,13 +305,7 @@ impl PathFilesystem for Ofs { self.op .rename(&origin_path.to_string_lossy(), &path.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), - ErrorKind::NotFound => Errno::from(libc::ENOENT), - ErrorKind::PermissionDenied => Errno::from(libc::EACCES), - ErrorKind::IsSameFile => Errno::from(libc::EINVAL), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)?; Ok(()) } @@ -260,10 +326,102 @@ impl PathFilesystem for Ofs { Err(libc::EOPNOTSUPP.into()) } + async fn create( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + flags: u32, + ) -> Result { + log::debug!( + "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})", + parent, + name, + mode, + flags + ); + + let (is_read, is_write) = self.check_flags(flags)?; + + let path = PathBuf::from(parent).join(name); + self.op + .write(&path.to_string_lossy(), Bytes::new()) + .await + .map_err(opendal_error2errno)?; + + let metadata = Metadata::new(EntryMode::FILE); + let mut attr = metadata2file_attr(&metadata, SystemTime::now()); + attr.uid = self.uid; + attr.gid = self.gid; + + let fh = self + .opened_files + .insert(OpenedFile { + path: path.into(), + is_read, + is_write, + is_append: flags & libc::O_APPEND as u32 != 0, + }) + .ok_or(Errno::from(libc::EBUSY))? as u64 + + 1; // ensure fh > 0 + + Ok(ReplyCreated { + ttl: TTL, + attr, + generation: 0, + fh, + flags, + }) + } + + async fn release( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> Result<()> { + log::debug!( + "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})", + path, + fh, + flags, + lock_owner, + flush + ); + + let key = fh as usize - 1; + let file = self + .opened_files + .take(key) + .ok_or(Errno::from(libc::EBADF))?; + if matches!(path, Some(ref p) if p != &file.path) { + Err(Errno::from(libc::EBADF))?; + } + + Ok(()) + } + async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { - // TODO log::debug!("open(path={:?}, flags=0x{:x})", path, flags); - Err(libc::ENOSYS.into()) + + let (is_read, is_write) = self.check_flags(flags)?; + + let fh = self + .opened_files + .insert(OpenedFile { + path: path.into(), + is_read, + is_write, + is_append: flags & libc::O_APPEND as u32 != 0, + }) + .ok_or(Errno::from(libc::EBUSY))? as u64 + + 1; // ensure fh > 0 + + Ok(ReplyOpen { fh, flags }) } async fn read( @@ -274,7 +432,6 @@ impl PathFilesystem for Ofs { offset: u64, size: u32, ) -> Result { - // TODO log::debug!( "read(path={:?}, fh={}, offset={}, size={})", path, @@ -283,7 +440,24 @@ impl PathFilesystem for Ofs { size ); - Err(libc::ENOSYS.into()) + if fh == 0 { + Err(Errno::from(libc::EBADF))?; + } + let key = fh - 1; + let file = self.get_opened_file(key as _, path)?; + + if !file.is_read { + Err(Errno::from(libc::EACCES))?; + } + + let data = self + .op + .read_with(&file.path.to_string_lossy()) + .range(offset..offset + size as u64) + .await + .map_err(opendal_error2errno)?; + + Ok(ReplyData { data: data.into() }) } async fn write( @@ -295,9 +469,8 @@ impl PathFilesystem for Ofs { data: &[u8], flags: u32, ) -> Result { - // TODO log::debug!( - "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})", + "write(path={:?}, fh={}, offset={}, data_len={}, flags=0x{:x})", path, fh, offset, @@ -305,28 +478,48 @@ impl PathFilesystem for Ofs { flags ); - Err(libc::ENOSYS.into()) + if offset != 0 { + Err(Errno::from(libc::EINVAL))?; + } + + if fh == 0 { + Err(Errno::from(libc::EBADF))?; + } + let key = fh - 1; + + let file = self.get_opened_file(key as _, path)?; + if !file.is_write { + Err(Errno::from(libc::EACCES))?; + } + + self.op + .write_with( + &file.path.clone().to_string_lossy(), + Bytes::copy_from_slice(data), + ) + .append(file.is_append) + .await + .map_err(opendal_error2errno)?; + + Ok(ReplyWrite { + written: data.len() as _, + }) } async fn readdir( &self, - req: Request, + _req: Request, path: &OsStr, fh: u64, offset: i64, ) -> Result> { - log::debug!( - "readdir(req={:?}, path={:?}, fh={}, offset={})", - req, - path, - fh, - offset - ); + log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); - let current_dir = path.to_string_lossy(); + let mut current_dir = PathBuf::from(path); + current_dir.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 let entries = self .op - .list(¤t_dir) + .list(¤t_dir.to_string_lossy()) .await .map_err(|e| match e.kind() { ErrorKind::NotFound => Errno::new_not_exist(), @@ -358,7 +551,6 @@ impl PathFilesystem for Ofs { })) .skip(offset as usize) .collect::>(); - log::debug!("readdir entries={:#?}", res); Ok(ReplyDirectory { entries: stream::iter(res), @@ -367,37 +559,36 @@ impl PathFilesystem for Ofs { async fn access(&self, _req: Request, path: &OsStr, mask: u32) -> Result<()> { log::debug!("access(path={:?}, mask=0x{:x})", path, mask); + + self.check_flags(mask)?; self.op .stat(&path.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)?; Ok(()) } async fn readdirplus( &self, - req: Request, + _req: Request, parent: &OsStr, fh: u64, offset: u64, _lock_owner: u64, ) -> Result> { log::debug!( - "readdirplus(req={:?}, parent={:?}, fh={}, offset={})", - req, + "readdirplus(parent={:?}, fh={}, offset={})", parent, fh, offset ); - let current_dir = parent.to_string_lossy(); + let mut current_dir = PathBuf::from(parent); + current_dir.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 let entries = self .op - .list(¤t_dir) + .list(¤t_dir.to_string_lossy()) .await .map_err(|e| match e.kind() { ErrorKind::NotFound => Errno::new_not_exist(), @@ -485,3 +676,18 @@ fn metadata2file_attr(metadata: &Metadata, atime: SystemTime) -> FileAttr { blksize: 4096, } } + +fn opendal_error2errno(err: opendal::Error) -> fuse3::Errno { + log::trace!("opendal_error2errno: {:?}", err); + match err.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::IsADirectory => Errno::from(libc::EISDIR), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR), + ErrorKind::ContentTruncated => Errno::from(libc::EAGAIN), + ErrorKind::ContentIncomplete => Errno::from(libc::EIO), + _ => Errno::from(libc::ENOENT), + } +} diff --git a/bin/ofs/src/frontend/mod.rs b/bin/ofs/src/frontend/mod.rs index 5eba238ce886..090f29ae3cc2 100644 --- a/bin/ofs/src/frontend/mod.rs +++ b/bin/ofs/src/frontend/mod.rs @@ -45,6 +45,7 @@ impl Frontend { let mut mount_option = MountOptions::default(); mount_option.uid(uid.into()); mount_option.gid(gid.into()); + mount_option.no_open_dir_support(true); let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into()); From a7803e38917d99d4203d8106a0036d583b063576 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Sun, 18 Feb 2024 18:14:52 +0800 Subject: [PATCH 4/9] refactor: use Operator::lister in readdir --- bin/ofs/src/frontend/fuse.rs | 111 ++++++++++++++++------------------- 1 file changed, 49 insertions(+), 62 deletions(-) diff --git a/bin/ofs/src/frontend/fuse.rs b/bin/ofs/src/frontend/fuse.rs index 57b8337a61c4..f3c240923635 100644 --- a/bin/ofs/src/frontend/fuse.rs +++ b/bin/ofs/src/frontend/fuse.rs @@ -21,7 +21,6 @@ use std::ops::Deref; use std::path::PathBuf; use std::time::Duration; use std::time::SystemTime; -use std::vec::IntoIter; use bytes::Bytes; use fuse3::async_trait; @@ -29,9 +28,10 @@ use fuse3::path::prelude::*; use fuse3::Errno; use fuse3::Result; use futures_util::stream; -use futures_util::stream::Iter; +use futures_util::stream::BoxStream; use futures_util::StreamExt; +use opendal::Entry; use opendal::EntryMode; use opendal::ErrorKind; use opendal::Metadata; @@ -109,8 +109,8 @@ impl Ofs { #[async_trait] impl PathFilesystem for Ofs { - type DirEntryStream = Iter>>; - type DirEntryPlusStream = Iter>>; + type DirEntryStream = BoxStream<'static, Result>; + type DirEntryPlusStream = BoxStream<'static, Result>; // Init a fuse filesystem async fn init(&self, _req: Request) -> Result<()> { @@ -517,17 +517,23 @@ impl PathFilesystem for Ofs { let mut current_dir = PathBuf::from(path); current_dir.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 - let entries = self + let children = self .op - .list(¤t_dir.to_string_lossy()) + .lister(¤t_dir.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::NotFound => Errno::new_not_exist(), - ErrorKind::NotADirectory => Errno::new_is_not_dir(), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)? + .enumerate() + .map(|(i, entry)| { + entry + .map(|e| DirectoryEntry { + kind: entry_mode2file_type(e.metadata().mode()), + name: e.name().trim_matches('/').into(), + offset: (i + 3) as i64, + }) + .map_err(opendal_error2errno) + }); - let relative_paths = [ + let relative_paths = stream::iter([ Result::Ok(DirectoryEntry { kind: FileType::Directory, name: ".".into(), @@ -538,22 +544,10 @@ impl PathFilesystem for Ofs { name: "..".into(), offset: 2, }), - ]; - - let res = relative_paths - .into_iter() - .chain(entries.iter().enumerate().map(|(i, entry)| { - Result::Ok(DirectoryEntry { - kind: entry_mode2file_type(entry.metadata().mode()), - name: entry.name().trim_matches('/').into(), - offset: (i + 3) as i64, - }) - })) - .skip(offset as usize) - .collect::>(); + ]); Ok(ReplyDirectory { - entries: stream::iter(res), + entries: relative_paths.chain(children).skip(offset as usize).boxed(), }) } @@ -584,19 +578,39 @@ impl PathFilesystem for Ofs { offset ); + let make_entry = |op: Operator, i: usize, entry: opendal::Result, uid, gid, now| async move { + let e = entry.map_err(opendal_error2errno)?; + let metadata = op + .stat(&e.name()) + .await + .unwrap_or_else(|_| e.metadata().clone()); + let mut attr = metadata2file_attr(&metadata, now); + attr.uid = uid; + attr.gid = gid; + Result::Ok(DirectoryEntryPlus { + kind: entry_mode2file_type(metadata.mode()), + name: e.name().trim_matches('/').into(), + offset: (i + 3) as i64, + attr, + entry_ttl: TTL, + attr_ttl: TTL, + }) + }; + + let now = SystemTime::now(); let mut current_dir = PathBuf::from(parent); current_dir.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 - let entries = self + let op = self.op.clone(); + let uid = self.uid; + let gid = self.gid; + let children = self .op - .list(¤t_dir.to_string_lossy()) + .lister(¤t_dir.to_string_lossy()) .await - .map_err(|e| match e.kind() { - ErrorKind::NotFound => Errno::new_not_exist(), - ErrorKind::NotADirectory => Errno::new_is_not_dir(), - _ => Errno::from(libc::ENOENT), - })?; + .map_err(opendal_error2errno)? + .enumerate() + .then(move |(i, entry)| make_entry(op.clone(), i, entry, uid, gid, now)); - let now = SystemTime::now(); let relative_path_metadata = Metadata::new(EntryMode::DIR); let relative_path_attr = metadata2file_attr(&relative_path_metadata, now); let relative_paths = stream::iter([ @@ -618,35 +632,8 @@ impl PathFilesystem for Ofs { }), ]); - let children = stream::iter(entries) - .enumerate() - .then(|(i, entry)| async move { - let metadata = self - .op - .stat(&entry.name()) - .await - .unwrap_or_else(|_| entry.metadata().clone()); - let mut attr = metadata2file_attr(&metadata, now); - attr.uid = self.uid; - attr.gid = self.gid; - Result::Ok(DirectoryEntryPlus { - kind: entry_mode2file_type(entry.metadata().mode()), - name: entry.name().trim_matches('/').into(), - offset: (i + 3) as i64, - attr, - entry_ttl: TTL, - attr_ttl: TTL, - }) - }); - - let res = relative_paths - .chain(children) - .skip(offset as usize) - .collect::>() - .await; - Ok(ReplyDirectoryPlus { - entries: stream::iter(res), + entries: relative_paths.chain(children).skip(offset as usize).boxed(), }) } } From 2a8477b1f68b11054b0eac0a5ec4cc1fc88d77c8 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Tue, 20 Feb 2024 18:07:15 +0800 Subject: [PATCH 5/9] refactor: remove frontend folder --- bin/ofs/src/frontend/mod.rs | 60 ------------------------------ bin/ofs/src/{frontend => }/fuse.rs | 0 bin/ofs/src/lib.rs | 43 ++++++++++++++++++--- 3 files changed, 38 insertions(+), 65 deletions(-) delete mode 100644 bin/ofs/src/frontend/mod.rs rename bin/ofs/src/{frontend => }/fuse.rs (100%) diff --git a/bin/ofs/src/frontend/mod.rs b/bin/ofs/src/frontend/mod.rs deleted file mode 100644 index 090f29ae3cc2..000000000000 --- a/bin/ofs/src/frontend/mod.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use anyhow::Result; -use opendal::Operator; - -#[cfg(target_os = "linux")] -mod fuse; - -pub(crate) struct FrontendArgs { - pub mount_path: String, - pub backend: Operator, -} - -pub(crate) struct Frontend; - -impl Frontend { - #[cfg(any(not(target_os = "linux")))] - pub async fn execute(_: FrontendArgs) -> Result<()> { - Err(anyhow::anyhow!("platform not supported")) - } - - #[cfg(target_os = "linux")] - pub async fn execute(args: FrontendArgs) -> Result<()> { - use fuse3::path::Session; - use fuse3::MountOptions; - - let uid = nix::unistd::getuid(); - let gid = nix::unistd::getgid(); - - let mut mount_option = MountOptions::default(); - mount_option.uid(uid.into()); - mount_option.gid(gid.into()); - mount_option.no_open_dir_support(true); - - let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into()); - - let mount_handle = Session::new(mount_option) - .mount_with_unprivileged(ofs, args.mount_path) - .await?; - - mount_handle.await?; - - Ok(()) - } -} diff --git a/bin/ofs/src/frontend/fuse.rs b/bin/ofs/src/fuse.rs similarity index 100% rename from bin/ofs/src/frontend/fuse.rs rename to bin/ofs/src/fuse.rs diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs index d7972eeaf6a0..a489b1e449b5 100644 --- a/bin/ofs/src/lib.rs +++ b/bin/ofs/src/lib.rs @@ -20,15 +20,14 @@ use std::str::FromStr; use anyhow::anyhow; use anyhow::Result; -use frontend::Frontend; -use frontend::FrontendArgs; use opendal::Operator; use opendal::Scheme; pub mod config; pub use config::Config; -mod frontend; +#[cfg(target_os = "linux")] +mod fuse; pub async fn new_app(cfg: Config) -> Result<()> { if cfg.backend.has_host() { @@ -48,9 +47,43 @@ pub async fn new_app(cfg: Config) -> Result<()> { }?; let backend = Operator::via_map(scheme, op_args)?; - let args = FrontendArgs { + let args = Args { mount_path: cfg.mount_path, backend, }; - Frontend::execute(args).await + execute(args).await +} + +struct Args { + mount_path: String, + backend: Operator, +} + +#[cfg(any(not(target_os = "linux")))] +async fn execute(_: FrontendArgs) -> Result<()> { + Err(anyhow::anyhow!("platform not supported")) +} + +#[cfg(target_os = "linux")] +async fn execute(args: Args) -> Result<()> { + use fuse3::path::Session; + use fuse3::MountOptions; + + let uid = nix::unistd::getuid(); + let gid = nix::unistd::getgid(); + + let mut mount_option = MountOptions::default(); + mount_option.uid(uid.into()); + mount_option.gid(gid.into()); + mount_option.no_open_dir_support(true); + + let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into()); + + let mount_handle = Session::new(mount_option) + .mount_with_unprivileged(ofs, args.mount_path) + .await?; + + mount_handle.await?; + + Ok(()) } From 038b9c863b3a47266cd4cb2a92616b937d7f589d Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Tue, 20 Feb 2024 18:14:28 +0800 Subject: [PATCH 6/9] chore: cleanup code --- bin/ofs/src/fuse.rs | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/bin/ofs/src/fuse.rs b/bin/ofs/src/fuse.rs index f3c240923635..4d939e7c7c2a 100644 --- a/bin/ofs/src/fuse.rs +++ b/bin/ofs/src/fuse.rs @@ -131,9 +131,7 @@ impl PathFilesystem for Ofs { .map_err(opendal_error2errno)?; let now = SystemTime::now(); - let mut attr = metadata2file_attr(&metadata, now); - attr.uid = self.uid; - attr.gid = self.gid; + let attr = metadata2file_attr(&metadata, now, self.uid, self.gid); Ok(ReplyEntry { ttl: TTL, attr }) } @@ -172,9 +170,7 @@ impl PathFilesystem for Ofs { .map_err(opendal_error2errno)?; let now = SystemTime::now(); - let mut attr = metadata2file_attr(&metadata, now); - attr.uid = self.uid; - attr.gid = self.gid; + let attr = metadata2file_attr(&metadata, now, self.uid, self.gid); Ok(ReplyAttr { ttl: TTL, attr }) } @@ -252,9 +248,7 @@ impl PathFilesystem for Ofs { let metadata = Metadata::new(EntryMode::DIR); let now = SystemTime::now(); - let mut attr = metadata2file_attr(&metadata, now); - attr.uid = self.uid; - attr.gid = self.gid; + let attr = metadata2file_attr(&metadata, now, self.uid, self.gid); Ok(ReplyEntry { ttl: TTL, attr }) } @@ -351,9 +345,7 @@ impl PathFilesystem for Ofs { .map_err(opendal_error2errno)?; let metadata = Metadata::new(EntryMode::FILE); - let mut attr = metadata2file_attr(&metadata, SystemTime::now()); - attr.uid = self.uid; - attr.gid = self.gid; + let attr = metadata2file_attr(&metadata, SystemTime::now(), self.uid, self.gid); let fh = self .opened_files @@ -584,9 +576,7 @@ impl PathFilesystem for Ofs { .stat(&e.name()) .await .unwrap_or_else(|_| e.metadata().clone()); - let mut attr = metadata2file_attr(&metadata, now); - attr.uid = uid; - attr.gid = gid; + let attr = metadata2file_attr(&metadata, now, uid, gid); Result::Ok(DirectoryEntryPlus { kind: entry_mode2file_type(metadata.mode()), name: e.name().trim_matches('/').into(), @@ -612,7 +602,7 @@ impl PathFilesystem for Ofs { .then(move |(i, entry)| make_entry(op.clone(), i, entry, uid, gid, now)); let relative_path_metadata = Metadata::new(EntryMode::DIR); - let relative_path_attr = metadata2file_attr(&relative_path_metadata, now); + let relative_path_attr = metadata2file_attr(&relative_path_metadata, now, uid, gid); let relative_paths = stream::iter([ Result::Ok(DirectoryEntryPlus { kind: FileType::Directory, @@ -645,7 +635,7 @@ const fn entry_mode2file_type(mode: EntryMode) -> FileType { } } -fn metadata2file_attr(metadata: &Metadata, atime: SystemTime) -> FileAttr { +fn metadata2file_attr(metadata: &Metadata, atime: SystemTime, uid: u32, gid: u32) -> FileAttr { let last_modified = metadata.last_modified().map(|t| t.into()).unwrap_or(atime); let kind = entry_mode2file_type(metadata.mode()); FileAttr { @@ -657,8 +647,8 @@ fn metadata2file_attr(metadata: &Metadata, atime: SystemTime) -> FileAttr { kind, perm: fuse3::perm_from_mode_and_kind(kind, 0o775), nlink: 0, - uid: 1000, - gid: 1000, + uid, + gid, rdev: 0, blksize: 4096, } From 6381003f3a8627470fe721398d2f62d5cf9e1ae8 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Tue, 20 Feb 2024 18:16:13 +0800 Subject: [PATCH 7/9] chore: make clippy happy --- bin/ofs/src/fuse.rs | 2 +- bin/ofs/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/ofs/src/fuse.rs b/bin/ofs/src/fuse.rs index 4d939e7c7c2a..214187e861e9 100644 --- a/bin/ofs/src/fuse.rs +++ b/bin/ofs/src/fuse.rs @@ -573,7 +573,7 @@ impl PathFilesystem for Ofs { let make_entry = |op: Operator, i: usize, entry: opendal::Result, uid, gid, now| async move { let e = entry.map_err(opendal_error2errno)?; let metadata = op - .stat(&e.name()) + .stat(e.name()) .await .unwrap_or_else(|_| e.metadata().clone()); let attr = metadata2file_attr(&metadata, now, uid, gid); diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs index a489b1e449b5..c82d5e5edc84 100644 --- a/bin/ofs/src/lib.rs +++ b/bin/ofs/src/lib.rs @@ -59,7 +59,7 @@ struct Args { backend: Operator, } -#[cfg(any(not(target_os = "linux")))] +#[cfg(not(target_os = "linux"))] async fn execute(_: FrontendArgs) -> Result<()> { Err(anyhow::anyhow!("platform not supported")) } From 289e1b3ef885ce5026bc670c36fc7a44db22576e Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Tue, 20 Feb 2024 18:58:16 +0800 Subject: [PATCH 8/9] Update Cargo.lock --- bin/ofs/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock index 051c429ef79e..8ca5394fa01d 100644 --- a/bin/ofs/Cargo.lock +++ b/bin/ofs/Cargo.lock @@ -1026,6 +1026,7 @@ name = "ofs" version = "0.0.1+core.0.45.0" dependencies = [ "anyhow", + "bytes", "chrono", "clap", "env_logger", From c1302f3f45ff3f4f560eff6ef47ee4020a42e0c7 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Thu, 22 Feb 2024 20:04:18 +0800 Subject: [PATCH 9/9] fix: dependencies --- bin/ofs/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml index 669d0e30e766..f6a64bc5e0b4 100644 --- a/bin/ofs/Cargo.toml +++ b/bin/ofs/Cargo.toml @@ -31,11 +31,11 @@ rust-version = "1.67" [dependencies] anyhow = "1" -clap = { version = "4.4.18", features = ["derive", "env"] } +clap = { version = "4.5.1", features = ["derive", "env"] } env_logger = "0.11.2" futures-util = "0.3.30" log = "0.4.20" -opendal = { path = "../../core" } +opendal = { version = "0.45.0", path = "../../core" } tokio = { version = "1.34", features = [ "fs", "macros",