diff --git a/README.md b/README.md index 7e45c69cf639..f6c662d94be4 100644 --- a/README.md +++ b/README.md @@ -82,23 +82,30 @@ OpenDAL offers a unified data access layer, empowering users to seamlessly and e ## For *ANY* integrations -| Name | Description | Release | -|------------------------|------------------------------------------------------|-------------------------------------------------------------| -| [dav-server-opendalfs] | Access data via integrations to [dav-server-rs] | [![dav-server-opendalfs image]][dav-server-opendalfs crate] | -| [object_store_opendal] | Access data via integrations to [object_store] | [![object_store_opendal image]][object_store_opendal crate] | -| [fuse3_opendal] | Access data via integrations to [fuse3] | - | -| [virtiofs_opendal] | Access data via integrations to [vhost-user-backend] | - | +| Name | Description | Release | Docs | +|------------------------|------------------------------------------------------|---------------------------------------------|-----------------------------------------------------------------------------------| +| [dav-server-opendalfs] | a [dav-server-rs] implementation using opendal. | [![dav-server image]][dav-server crate] | [![Docs Release]][dav-server release docs] [![Docs Dev]][dav-server dev docs] | +| [object_store_opendal] | an [object_store] implementation using opendal. | [![object_store image]][object_store crate] | [![Docs Release]][object_store release docs] [![Docs Dev]][object_store dev docs] | +| [fuse3_opendal] | Access data via integrations to [fuse3] | - | - | +| [virtiofs_opendal] | Access data via integrations to [vhost-user-backend] | - | - | [dav-server-opendalfs]: integrations/dav-server/README.md [dav-server-rs]: https://github.com/messense/dav-server-rs -[dav-server-opendalfs image]: https://img.shields.io/crates/v/dav-server-opendalfs.svg -[dav-server-opendalfs crate]: https://crates.io/crates/dav-server-opendalfs +[dav-server image]: https://img.shields.io/crates/v/dav-server-opendalfs.svg +[dav-server crate]: https://crates.io/crates/dav-server-opendalfs +[dav-server release docs]: https://docs.rs/dav-server-opendalfs/ +[dav-server dev docs]: https://opendal.apache.org/docs/dav-server-opendalfs/dav_server_opendalfs/ + [object_store_opendal]: integrations/object_store/README.md [object_store]: https://docs.rs/object_store -[object_store_opendal image]: https://img.shields.io/crates/v/object_store_opendal.svg -[object_store_opendal crate]: https://crates.io/crates/object_store_opendal +[object_store image]: https://img.shields.io/crates/v/object_store_opendal.svg +[object_store crate]: https://crates.io/crates/object_store_opendal +[object_store release docs]: https://docs.rs/object_store_opendal/ +[object_store dev docs]: https://opendal.apache.org/docs/object-store-opendal/object_store_opendal/ + [fuse3_opendal]: integrations/fuse3/README.md [fuse3]: https://docs.rs/fuse3 + [virtiofs_opendal]: integrations/virtiofs/README.md [vhost-user-backend]: https://docs.rs/vhost-user-backend diff --git a/bin/oay/Cargo.lock b/bin/oay/Cargo.lock index 9b5129651228..e83a68135428 100644 --- a/bin/oay/Cargo.lock +++ b/bin/oay/Cargo.lock @@ -133,18 +133,19 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "axum" -version = "0.6.20" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "bitflags 1.3.2", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.29", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", "itoa", "matchit", "memchr", @@ -156,28 +157,33 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" dependencies = [ "async-trait", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", + "http", + "http-body", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -219,12 +225,6 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.5.0" @@ -338,17 +338,18 @@ dependencies = [ [[package]] name = "dav-server" -version = "0.5.8" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b91592a7b705ba0a9487b7475fca78bce97473e220f8cc3948381b1c81b793" +checksum = "4fd4e9ee075a88de34679ff951a4fc6c8d6f64fb2e96cfaa2121016f3262a70f" dependencies = [ "bytes", "futures-channel", "futures-util", "headers", "htmlescape", - "http 0.2.12", - "http-body 0.4.6", + "http", + "http-body", + "http-body-util", "lazy_static", "libc", "log", @@ -373,15 +374,9 @@ version = "0.0.2" dependencies = [ "anyhow", "bytes", - "chrono", "dav-server", - "dirs", "futures", - "futures-util", "opendal", - "quick-xml", - "serde", - "tokio", ] [[package]] @@ -598,14 +593,14 @@ dependencies = [ [[package]] name = "headers" -version = "0.3.9" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http 0.2.12", + "http", "httpdate", "mime", "sha1", @@ -613,11 +608,11 @@ dependencies = [ [[package]] name = "headers-core" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http 0.2.12", + "http", ] [[package]] @@ -632,17 +627,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9025058dae765dee5070ec375f591e2ba14638c63feff74f13805a72e523163" -[[package]] -name = "http" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.1.0" @@ -654,17 +638,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" -dependencies = [ - "bytes", - "http 0.2.12", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.0" @@ -672,7 +645,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.1.0", + "http", ] [[package]] @@ -683,17 +656,11 @@ checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", "futures-core", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" - [[package]] name = "httparse" version = "1.8.0" @@ -706,29 +673,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "hyper" -version = "0.14.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.3.1" @@ -738,9 +682,10 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -755,8 +700,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 1.1.0", - "hyper 1.3.1", + "http", + "hyper", "hyper-util", "rustls", "rustls-pki-types", @@ -774,9 +719,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.1.0", - "http-body 1.0.0", - "hyper 1.3.1", + "http", + "http-body", + "hyper", "pin-project-lite", "socket2", "tokio", @@ -993,7 +938,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.5.0", + "bitflags", "libc", ] @@ -1145,7 +1090,7 @@ dependencies = [ "futures", "futures-util", "opendal", - "quick-xml", + "quick-xml 0.32.0", "serde", "tokio", "toml", @@ -1185,12 +1130,12 @@ dependencies = [ "flagset", "futures", "getrandom", - "http 1.1.0", + "http", "log", "md-5", "once_cell", "percent-encoding", - "quick-xml", + "quick-xml 0.31.0", "reqwest", "serde", "serde_json", @@ -1302,6 +1247,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.36" @@ -1347,7 +1302,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" dependencies = [ - "bitflags 2.5.0", + "bitflags", ] [[package]] @@ -1415,10 +1370,10 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "http-body-util", - "hyper 1.3.1", + "hyper", "hyper-rustls", "hyper-util", "ipnet", @@ -1434,7 +1389,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "tokio", "tokio-rustls", "tokio-util", @@ -1676,6 +1631,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "synstructure" version = "0.13.1" @@ -1862,17 +1823,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.4" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ - "bitflags 2.5.0", + "bitflags", "bytes", - "futures-core", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "http-range-header", + "http", + "http-body", + "http-body-util", "pin-project-lite", "tower-layer", "tower-service", diff --git a/bin/oay/Cargo.toml b/bin/oay/Cargo.toml index 1804a181315e..bbc59a1de850 100644 --- a/bin/oay/Cargo.toml +++ b/bin/oay/Cargo.toml @@ -34,35 +34,37 @@ default = ["frontends-webdav", "frontends-s3"] frontends-s3 = [] frontends-webdav = [ - "dep:dav-server", - "dep:dav-server-opendalfs", - "dep:bytes", - "dep:futures-util", + "dep:dav-server", + "dep:dav-server-opendalfs", + "dep:bytes", + "dep:futures-util", ] [dependencies] anyhow = "1" -axum = "0.6" +axum = "0.7" bytes = { version = "1.5.0", optional = true } chrono = "0.4.31" clap = { version = "4", features = ["cargo", "string"] } -dav-server = { version = "0.5.8", optional = true } +dav-server = { version = "0.6", optional = true } dav-server-opendalfs = { version = "0.0.2", path = "../../integrations/dav-server", optional = true } dirs = "5.0.1" futures = "0.3" futures-util = { version = "0.3.29", optional = true } -opendal = { version = "0.47.0", path = "../../core" } -quick-xml = { version = "0.31", features = ["serialize", "overlapped-lists"] } +opendal = { version = "0.47.0", path = "../../core", features = [ + "services-fs", +] } +quick-xml = { version = "0.32", features = ["serialize", "overlapped-lists"] } serde = { version = "1", features = ["derive"] } tokio = { version = "1.34", features = [ - "fs", - "macros", - "rt-multi-thread", - "io-std", + "fs", + "macros", + "rt-multi-thread", + "io-std", ] } toml = "0.8.12" tower = "0.4" -tower-http = { version = "0.4", features = ["trace"] } +tower-http = { version = "0.5", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.5.1" diff --git a/bin/oay/src/services/s3/service.rs b/bin/oay/src/services/s3/service.rs index 56a4a4d6ad6d..b30bae284b6d 100644 --- a/bin/oay/src/services/s3/service.rs +++ b/bin/oay/src/services/s3/service.rs @@ -30,8 +30,6 @@ use opendal::Metakey; use opendal::Operator; use serde::Deserialize; use serde::Serialize; -use tower::ServiceBuilder; -use tower_http::trace::TraceLayer; use tracing::debug; use crate::Config; @@ -51,14 +49,12 @@ impl S3Service { let app = Router::new() .route("/", get(handle_list_objects)) - .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) .with_state(S3State { op: self.op.clone(), }); - axum::Server::bind(&s3_cfg.addr.parse().unwrap()) - .serve(app.into_make_service()) - .await?; + let listener = tokio::net::TcpListener::bind(&s3_cfg.addr).await.unwrap(); + axum::serve(listener, app.into_make_service()).await?; Ok(()) } diff --git a/bin/oay/src/services/webdav/service.rs b/bin/oay/src/services/webdav/service.rs index 9b5673eb9e81..987a83107ab9 100644 --- a/bin/oay/src/services/webdav/service.rs +++ b/bin/oay/src/services/webdav/service.rs @@ -55,9 +55,10 @@ impl WebdavService { let app = Router::new().route("/*path", any_service(webdav_service)); - axum::Server::bind(&webdav_cfg.addr.parse().unwrap()) - .serve(app.into_make_service()) - .await?; + let listener = tokio::net::TcpListener::bind(&webdav_cfg.addr) + .await + .unwrap(); + axum::serve(listener, app.into_make_service()).await?; Ok(()) } diff --git a/core/README.md b/core/README.md index 6b577d6064a2..94b3cc2ae35c 100644 --- a/core/README.md +++ b/core/README.md @@ -16,7 +16,7 @@ OpenDAL offers a unified data access layer, empowering users to seamlessly and e ## Useful Links -- Documentation: [stable](https://docs.rs/opendal/) | [main](https://opendal.apache.org/docs/rust/opendal/) +- Documentation: [release](https://docs.rs/opendal/) | [dev](https://opendal.apache.org/docs/rust/opendal/) - [Release Notes](https://docs.rs/opendal/latest/opendal/docs/changelog/index.html) - [Upgrade Guide](https://docs.rs/opendal/latest/opendal/docs/upgrade/index.html) - [RFC List](https://docs.rs/opendal/latest/opendal/docs/rfcs/index.html) diff --git a/integrations/dav-server/Cargo.toml b/integrations/dav-server/Cargo.toml index 2d569b273a13..0d31af23eed5 100644 --- a/integrations/dav-server/Cargo.toml +++ b/integrations/dav-server/Cargo.toml @@ -30,19 +30,16 @@ rust-version = "1.75" [dependencies] anyhow = "1" bytes = { version = "1.4.0" } -chrono = "0.4.28" -dav-server = { version = "0.5.8" } -dirs = "5.0.0" +dav-server = { version = "0.6.0" } futures = "0.3" -futures-util = { version = "0.3.16" } +opendal = { version = "0.47.0", path = "../../core" } + +[dev-dependencies] opendal = { version = "0.47.0", path = "../../core", features = [ "services-fs", ] } -quick-xml = { version = "0.32", features = ["serialize", "overlapped-lists"] } -serde = { version = "1", features = ["derive"] } tokio = { version = "1.27", features = [ - "fs", "macros", "rt-multi-thread", "io-std", -] } +] } \ No newline at end of file diff --git a/integrations/dav-server/README.md b/integrations/dav-server/README.md index f38831905533..e16c6b5fe7c5 100644 --- a/integrations/dav-server/README.md +++ b/integrations/dav-server/README.md @@ -1,3 +1,57 @@ -# dav-server-opendalfs +# Apache OpenDALâ„¢ dav-server integration -[`dav-server-opendalfs`](https://crates.io/crates/dav-server-opendalfs) is an integration which uses OpenDAL as a backend to access data in various service with WebDAV protocol. +[![Build Status]][actions] [![Latest Version]][crates.io] [![Crate Downloads]][crates.io] [![chat]][discord] + +[build status]: https://img.shields.io/github/actions/workflow/status/apache/opendal/ci_integration_dav_server.yml?branch=main +[actions]: https://github.com/apache/opendal/actions?query=branch%3Amain +[latest version]: https://img.shields.io/crates/v/dav-server-opendalfs.svg +[crates.io]: https://crates.io/crates/dav-server-opendalfs +[crate downloads]: https://img.shields.io/crates/d/dav-server-opendalfs.svg +[chat]: https://img.shields.io/discord/1081052318650339399 +[discord]: https://opendal.apache.org/discord + +`dav-server-opendalfs` is an [`dav-server`](https://github.com/messense/dav-server-rs) implementation using opendal. + +This crate can help you to access ANY storage services with the same webdav API. + +## Useful Links + +- Documentation: [release](https://docs.rs/dav-server-opendalfs/) | [dev](https://opendal.apache.org/docs/dav-server-opendalfs/dav_server_opendalfs/) + +## Examples + +``` +use anyhow::Result; +use dav_server::davpath::DavPath; +use dav_server::fs::DavFileSystem; +use dav_server_opendalfs::OpendalFs; +use opendal::services::Memory; +use opendal::Operator; + +#[tokio::test] +async fn test() -> Result<()> { + let op = Operator::new(Memory::default())?.finish(); + + let webdavfs = OpendalFs::new(op); + + let metadata = webdavfs + .metadata(&DavPath::new("/").unwrap()) + .await + .unwrap(); + println!("{}", metadata.is_dir()); + + Ok(()) +} +``` + +## Branding + +The first and most prominent mentions must use the full form: **Apache OpenDALâ„¢** of the name for any individual usage (webpage, handout, slides, etc.) Depending on the context and writing style, you should use the full form of the name sufficiently often to ensure that readers clearly understand the association of both the OpenDAL project and the OpenDAL software product to the ASF as the parent organization. + +For more details, see the [Apache Product Name Usage Guide](https://www.apache.org/foundation/marks/guide). + +## License and Trademarks + +Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + +Apache OpenDAL, OpenDAL, and Apache are either registered trademarks or trademarks of the Apache Software Foundation. diff --git a/integrations/dav-server/src/dir.rs b/integrations/dav-server/src/dir.rs new file mode 100644 index 000000000000..cb71a0304a48 --- /dev/null +++ b/integrations/dav-server/src/dir.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use dav_server::fs::{DavDirEntry, DavMetaData}; +use futures::StreamExt; +use futures::{FutureExt, Stream}; +use opendal::Operator; +use opendal::{Entry, Lister}; +use std::pin::Pin; +use std::task::Poll::Ready; +use std::task::{ready, Context, Poll}; + +use super::metadata::OpendalMetaData; +use super::utils::*; + +/// OpendalStream is a stream of `DavDirEntry` that is used to list the contents of a directory. +pub struct OpendalStream { + op: Operator, + lister: Lister, +} + +impl OpendalStream { + /// Create a new opendal stream. + pub fn new(op: Operator, lister: Lister) -> Self { + OpendalStream { op, lister } + } +} + +impl Stream for OpendalStream { + type Item = Box; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let dav_stream = self.get_mut(); + match ready!(dav_stream.lister.poll_next_unpin(cx)) { + Some(entry) => { + let webdav_entry = OpendalDirEntry::new(dav_stream.op.clone(), entry.unwrap()); + Ready(Some(Box::new(webdav_entry) as Box)) + } + None => Ready(None), + } + } +} + +/// OpendalDirEntry is a `DavDirEntry` implementation for opendal. +pub struct OpendalDirEntry { + op: Operator, + dir_entry: Entry, +} + +impl OpendalDirEntry { + /// Create a new opendal dir entry. + pub fn new(op: Operator, dir_entry: Entry) -> Self { + OpendalDirEntry { dir_entry, op } + } +} + +impl DavDirEntry for OpendalDirEntry { + fn name(&self) -> Vec { + self.dir_entry.name().as_bytes().to_vec() + } + + fn metadata(&self) -> dav_server::fs::FsFuture> { + async move { + self.op + .stat(self.dir_entry.path()) + .await + .map(|metadata| Box::new(OpendalMetaData::new(metadata)) as Box) + .map_err(convert_error) + } + .boxed() + } +} diff --git a/integrations/dav-server/src/dir_entry.rs b/integrations/dav-server/src/dir_entry.rs deleted file mode 100644 index 955266546cf4..000000000000 --- a/integrations/dav-server/src/dir_entry.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use dav_server::fs::DavDirEntry; -use futures::FutureExt; -use opendal::Entry; -use opendal::Operator; - -use super::file::convert_error; -use super::metadata::WebdavMetaData; - -pub struct WebDAVDirEntry { - dir_entry: Entry, - op: Operator, -} - -impl DavDirEntry for WebDAVDirEntry { - fn name(&self) -> Vec { - self.dir_entry.name().as_bytes().to_vec() - } - - fn metadata(&self) -> dav_server::fs::FsFuture> { - async move { - self.op - .stat(self.dir_entry.path()) - .await - .map(|metadata| { - Box::new(WebdavMetaData::new(metadata)) as Box - }) - .map_err(convert_error) - } - .boxed() - } -} - -impl WebDAVDirEntry { - pub fn new(dir_entry: Entry, op: Operator) -> Self { - WebDAVDirEntry { dir_entry, op } - } -} diff --git a/integrations/dav-server/src/file.rs b/integrations/dav-server/src/file.rs index 2e19a6ec7ece..d20a32c30e39 100644 --- a/integrations/dav-server/src/file.rs +++ b/integrations/dav-server/src/file.rs @@ -15,85 +15,157 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::{Debug, Formatter}; use std::io::SeekFrom; -use bytes::Bytes; +use bytes::{Buf, Bytes, BytesMut}; use dav_server::davpath::DavPath; -use dav_server::fs::DavFile; -use dav_server::fs::DavMetaData; -use dav_server::fs::FsFuture; +use dav_server::fs::{DavFile, OpenOptions}; +use dav_server::fs::{DavMetaData, FsResult}; +use dav_server::fs::{FsError, FsFuture}; use futures::FutureExt; -use opendal::Operator; +use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use opendal::{FuturesAsyncReader, FuturesAsyncWriter, Operator}; -use super::metadata::WebdavMetaData; +use super::metadata::OpendalMetaData; +use super::utils::*; -#[derive(Debug)] -pub struct WebdavFile { +/// OpendalFile is a `DavFile` implementation for opendal. +pub struct OpendalFile { op: Operator, path: DavPath, + state: State, + buf: BytesMut, } -impl WebdavFile { - pub fn new(op: Operator, path: DavPath) -> Self { - Self { op, path } +impl Debug for OpendalFile { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpendalFile") + .field("path", &self.path) + .field( + "state", + match &self.state { + State::Read { .. } => &"read", + State::Write(_) => &"write", + }, + ) + .finish() } } -impl DavFile for WebdavFile { - fn read_bytes(&mut self, count: usize) -> FsFuture { - async move { - let file_path = self.path.as_url_string(); - let buf = self - .op - .read_with(&file_path) - .range(0..count as u64) +enum State { + Read(FuturesAsyncReader), + Write(FuturesAsyncWriter), +} + +impl OpendalFile { + /// Create a new opendal file. + pub async fn open(op: Operator, path: DavPath, options: OpenOptions) -> FsResult { + let state = if options.read { + let r = op + .reader(path.as_url_string().as_str()) + .await + .map_err(convert_error)? + .into_futures_async_read(..) .await .map_err(convert_error)?; - Ok(buf.to_bytes()) - } - .boxed() + State::Read(r) + } else if options.write { + let w = op + .writer_with(path.as_url_string().as_str()) + .append(options.append) + .await + .map_err(convert_error)? + .into_futures_async_write(); + State::Write(w) + } else { + return Err(FsError::NotImplemented); + }; + + Ok(Self { + op, + path, + state, + buf: BytesMut::new(), + }) } +} +impl DavFile for OpendalFile { fn metadata(&mut self) -> FsFuture> { async move { self.op .stat(self.path.as_url_string().as_str()) .await .map(|opendal_metadata| { - Box::new(WebdavMetaData::new(opendal_metadata)) as Box + Box::new(OpendalMetaData::new(opendal_metadata)) as Box }) .map_err(convert_error) } .boxed() } - fn write_buf(&mut self, buf: Box) -> FsFuture<()> { - self.write_bytes(Bytes::copy_from_slice(buf.chunk())) + fn write_buf(&mut self, mut buf: Box) -> FsFuture<()> { + async move { + let State::Write(w) = &mut self.state else { + return Err(FsError::GeneralFailure); + }; + + w.write_all(&buf.copy_to_bytes(buf.remaining())) + .await + .map_err(|_| FsError::GeneralFailure)?; + Ok(()) + } + .boxed() } fn write_bytes(&mut self, buf: Bytes) -> FsFuture<()> { async move { - let file_path = self.path.as_url_string(); - self.op.write(&file_path, buf).await.map_err(convert_error) + let State::Write(w) = &mut self.state else { + return Err(FsError::GeneralFailure); + }; + + w.write_all(&buf).await.map_err(|_| FsError::GeneralFailure) + } + .boxed() + } + + fn read_bytes(&mut self, count: usize) -> FsFuture { + async move { + let State::Read(r) = &mut self.state else { + return Err(FsError::GeneralFailure); + }; + + self.buf.resize(count, 0); + let len = r + .read(&mut self.buf) + .await + .map_err(|_| FsError::GeneralFailure)?; + Ok(self.buf.split_to(len).freeze()) } .boxed() } - fn seek(&mut self, _pos: SeekFrom) -> FsFuture { - futures_util::future::err(dav_server::fs::FsError::NotImplemented).boxed() + fn seek(&mut self, pos: SeekFrom) -> FsFuture { + async move { + let State::Read(r) = &mut self.state else { + return Err(FsError::GeneralFailure); + }; + + r.seek(pos).await.map_err(|_| FsError::GeneralFailure) + } + .boxed() } fn flush(&mut self) -> FsFuture<()> { - futures_util::future::ok(()).boxed() - } -} + async move { + let State::Write(w) = &mut self.state else { + return Err(FsError::GeneralFailure); + }; -pub fn convert_error(opendal_error: opendal::Error) -> dav_server::fs::FsError { - match opendal_error.kind() { - opendal::ErrorKind::AlreadyExists | opendal::ErrorKind::IsSameFile => { - dav_server::fs::FsError::Exists + w.flush().await.map_err(|_| FsError::GeneralFailure)?; + w.close().await.map_err(|_| FsError::GeneralFailure) } - opendal::ErrorKind::NotFound => dav_server::fs::FsError::NotFound, - _ => dav_server::fs::FsError::GeneralFailure, + .boxed() } } diff --git a/integrations/dav-server/src/opendalfs.rs b/integrations/dav-server/src/fs.rs similarity index 66% rename from integrations/dav-server/src/opendalfs.rs rename to integrations/dav-server/src/fs.rs index 92a2d7783749..fda9575c79fc 100644 --- a/integrations/dav-server/src/opendalfs.rs +++ b/integrations/dav-server/src/fs.rs @@ -15,40 +15,67 @@ // specific language governing permissions and limitations // under the License. -use std::path::Path; -use std::task::ready; -use std::task::Poll::Ready; - use dav_server::davpath::DavPath; -use dav_server::fs::DavDirEntry; -use dav_server::fs::DavFile; -use dav_server::fs::DavFileSystem; use dav_server::fs::DavMetaData; use dav_server::fs::FsError; +use dav_server::fs::{DavDirEntry, FsFuture}; +use dav_server::fs::{DavFile, FsStream}; +use dav_server::fs::{DavFileSystem, ReadDirMeta}; use futures::FutureExt; -use futures_util::Stream; -use futures_util::StreamExt; -use opendal::Lister; +use futures::StreamExt; use opendal::Operator; +use std::path::Path; -use super::file::convert_error; -use super::file::WebdavFile; -use super::metadata::WebdavMetaData; -use crate::dir_entry::WebDAVDirEntry; - +use super::dir::OpendalStream; +use super::file::OpendalFile; +use super::metadata::OpendalMetaData; +use super::utils::convert_error; + +/// OpendalFs is a `DavFileSystem` implementation for opendal. +/// +/// ``` +/// use anyhow::Result; +/// use dav_server::davpath::DavPath; +/// use dav_server::fs::DavFileSystem; +/// use dav_server_opendalfs::OpendalFs; +/// use opendal::services::Memory; +/// use opendal::Operator; +/// +/// #[tokio::test] +/// async fn test() -> Result<()> { +/// let op = Operator::new(Memory::default())?.finish(); +/// +/// let webdavfs = OpendalFs::new(op); +/// +/// let metadata = webdavfs +/// .metadata(&DavPath::new("/").unwrap()) +/// .await +/// .unwrap(); +/// println!("{}", metadata.is_dir()); +/// +/// Ok(()) +/// } +/// ``` #[derive(Clone)] pub struct OpendalFs { pub op: Operator, } +impl OpendalFs { + /// Create a new `OpendalFs` instance. + pub fn new(op: Operator) -> Box { + Box::new(OpendalFs { op }) + } +} + impl DavFileSystem for OpendalFs { fn open<'a>( &'a self, - path: &'a dav_server::davpath::DavPath, - _options: dav_server::fs::OpenOptions, - ) -> dav_server::fs::FsFuture> { + path: &'a DavPath, + options: dav_server::fs::OpenOptions, + ) -> FsFuture> { async move { - let file = WebdavFile::new(self.op.clone(), path.clone()); + let file = OpendalFile::open(self.op.clone(), path.clone(), options).await?; Ok(Box::new(file) as Box) } .boxed() @@ -56,29 +83,25 @@ impl DavFileSystem for OpendalFs { fn read_dir<'a>( &'a self, - path: &'a dav_server::davpath::DavPath, - _meta: dav_server::fs::ReadDirMeta, - ) -> dav_server::fs::FsFuture>> - { + path: &'a DavPath, + _meta: ReadDirMeta, + ) -> FsFuture>> { async move { self.op .lister(path.as_url_string().as_str()) .await - .map(|lister| DavStream::new(self.op.clone(), lister).boxed()) + .map(|lister| OpendalStream::new(self.op.clone(), lister).boxed()) .map_err(convert_error) } .boxed() } - fn metadata<'a>( - &'a self, - path: &'a dav_server::davpath::DavPath, - ) -> dav_server::fs::FsFuture> { + fn metadata<'a>(&'a self, path: &'a DavPath) -> FsFuture> { async move { let opendal_metadata = self.op.stat(path.as_url_string().as_str()).await; match opendal_metadata { Ok(metadata) => { - let webdav_metadata = WebdavMetaData::new(metadata); + let webdav_metadata = OpendalMetaData::new(metadata); Ok(Box::new(webdav_metadata) as Box) } Err(e) => Err(convert_error(e)), @@ -87,7 +110,7 @@ impl DavFileSystem for OpendalFs { .boxed() } - fn create_dir<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { + fn create_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<()> { async move { let path = path.as_url_string(); @@ -98,7 +121,7 @@ impl DavFileSystem for OpendalFs { match self.op.is_exist(parent.to_str().unwrap()).await { Ok(exist) => { if !exist && parent != Path::new("/") { - return Err(dav_server::fs::FsError::NotFound); + return Err(FsError::NotFound); } } Err(e) => { @@ -111,7 +134,7 @@ impl DavFileSystem for OpendalFs { let exist = self.op.is_exist(path).await; match exist { Ok(exist) => match exist { - true => Err(dav_server::fs::FsError::Exists), + true => Err(FsError::Exists), false => { let res = self.op.create_dir(path).await; match res { @@ -126,7 +149,11 @@ impl DavFileSystem for OpendalFs { .boxed() } - fn remove_file<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { + fn remove_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<()> { + self.remove_file(path) + } + + fn remove_file<'a>(&'a self, path: &'a DavPath) -> FsFuture<()> { async move { self.op .delete(path.as_url_string().as_str()) @@ -136,75 +163,36 @@ impl DavFileSystem for OpendalFs { .boxed() } - fn remove_dir<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { - self.remove_file(path) - } - - fn copy<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> dav_server::fs::FsFuture<()> { + fn rename<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> FsFuture<()> { async move { let from_path = from .as_rel_ospath() .to_str() .ok_or(FsError::GeneralFailure)?; let to_path = to.as_rel_ospath().to_str().ok_or(FsError::GeneralFailure)?; + if from.is_collection() { + let _ = self.remove_file(to).await; + } self.op - .copy(from_path, to_path) + .rename(from_path, to_path) .await .map_err(convert_error) } .boxed() } - fn rename<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> dav_server::fs::FsFuture<()> { + fn copy<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> FsFuture<()> { async move { let from_path = from .as_rel_ospath() .to_str() .ok_or(FsError::GeneralFailure)?; let to_path = to.as_rel_ospath().to_str().ok_or(FsError::GeneralFailure)?; - if from.is_collection() { - let _ = self.remove_file(to).await; - } self.op - .rename(from_path, to_path) + .copy(from_path, to_path) .await .map_err(convert_error) } .boxed() } } - -impl OpendalFs { - pub fn new(op: Operator) -> Box { - Box::new(OpendalFs { op }) - } -} - -struct DavStream { - op: Operator, - lister: Lister, -} - -impl Stream for DavStream { - type Item = Box; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let dav_stream = self.get_mut(); - match ready!(dav_stream.lister.poll_next_unpin(cx)) { - Some(entry) => { - let webdav_entry = WebDAVDirEntry::new(entry.unwrap(), dav_stream.op.clone()); - Ready(Some(Box::new(webdav_entry) as Box)) - } - None => Ready(None), - } - } -} - -impl DavStream { - fn new(op: Operator, lister: Lister) -> Self { - DavStream { op, lister } - } -} diff --git a/integrations/dav-server/src/lib.rs b/integrations/dav-server/src/lib.rs index 6436fd7370b4..e818b2c128bc 100644 --- a/integrations/dav-server/src/lib.rs +++ b/integrations/dav-server/src/lib.rs @@ -15,9 +15,38 @@ // specific language governing permissions and limitations // under the License. -mod dir_entry; +//! dav-server-opendalfs is an dav-server implementation using opendal. +//! +//! This crate can help you to access ANY storage services with the same webdav API. +//! +//! ``` +//! use anyhow::Result; +//! use dav_server::davpath::DavPath; +//! use dav_server::fs::DavFileSystem; +//! use dav_server_opendalfs::OpendalFs; +//! use opendal::services::Memory; +//! use opendal::Operator; +//! +//! #[tokio::test] +//! async fn test() -> Result<()> { +//! let op = Operator::new(Memory::default())?.finish(); +//! +//! let webdavfs = OpendalFs::new(op); +//! +//! let metadata = webdavfs +//! .metadata(&DavPath::new("/").unwrap()) +//! .await +//! .unwrap(); +//! println!("{}", metadata.is_dir()); +//! +//! Ok(()) +//! } +//! ``` + +mod dir; mod file; mod metadata; -mod opendalfs; +mod utils; -pub use opendalfs::OpendalFs; +mod fs; +pub use fs::OpendalFs; diff --git a/integrations/dav-server/src/metadata.rs b/integrations/dav-server/src/metadata.rs index 03295125d5ee..16099efd392d 100644 --- a/integrations/dav-server/src/metadata.rs +++ b/integrations/dav-server/src/metadata.rs @@ -15,27 +15,30 @@ // specific language governing permissions and limitations // under the License. -use dav_server::fs::DavMetaData; use dav_server::fs::FsError; +use dav_server::fs::{DavMetaData, FsResult}; use opendal::Metadata; +use std::time::SystemTime; +/// OpendalMetaData is a `DavMetaData` implementation for opendal. #[derive(Debug, Clone)] -pub struct WebdavMetaData { +pub struct OpendalMetaData { metadata: Metadata, } -impl WebdavMetaData { +impl OpendalMetaData { + /// Create a new opendal metadata. pub fn new(metadata: Metadata) -> Self { - WebdavMetaData { metadata } + OpendalMetaData { metadata } } } -impl DavMetaData for WebdavMetaData { +impl DavMetaData for OpendalMetaData { fn len(&self) -> u64 { self.metadata.content_length() } - fn modified(&self) -> dav_server::fs::FsResult { + fn modified(&self) -> FsResult { match self.metadata.last_modified() { Some(t) => Ok(t.into()), None => Err(FsError::GeneralFailure), @@ -46,15 +49,15 @@ impl DavMetaData for WebdavMetaData { self.metadata.is_dir() } - fn is_file(&self) -> bool { - self.metadata.is_file() - } - fn etag(&self) -> Option { self.metadata.etag().map(|s| s.to_string()) } - fn status_changed(&self) -> dav_server::fs::FsResult { + fn is_file(&self) -> bool { + self.metadata.is_file() + } + + fn status_changed(&self) -> FsResult { self.metadata .last_modified() .map_or(Err(FsError::GeneralFailure), |t| Ok(t.into())) diff --git a/integrations/dav-server/src/utils.rs b/integrations/dav-server/src/utils.rs new file mode 100644 index 000000000000..132712f26c3e --- /dev/null +++ b/integrations/dav-server/src/utils.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub fn convert_error(opendal_error: opendal::Error) -> dav_server::fs::FsError { + match opendal_error.kind() { + opendal::ErrorKind::AlreadyExists | opendal::ErrorKind::IsSameFile => { + dav_server::fs::FsError::Exists + } + opendal::ErrorKind::NotFound => dav_server::fs::FsError::NotFound, + _ => dav_server::fs::FsError::GeneralFailure, + } +} diff --git a/integrations/object_store/README.md b/integrations/object_store/README.md index 2bf5b6a19895..24465ed95bb2 100644 --- a/integrations/object_store/README.md +++ b/integrations/object_store/README.md @@ -14,10 +14,10 @@ This crate can help you to access 30 more storage services with the same object_store API. - + ## Useful Links -- Documentation: [stable](https://docs.rs/object_store_opendal/) +- Documentation: [release](https://docs.rs/object_store_opendal/) | [dev](https://opendal.apache.org/docs/object-store-opendal/object_store_opendal/) ## Examples