From b84903485c8b207dfbf8d973cc4565345e8ed6cb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 28 Feb 2024 10:07:25 +0800 Subject: [PATCH] refactor: Rewrite webdav to improve code quality (#4280) --- .github/services/webdav/jfrog/action.yml | 5 +- core/src/services/webdav/backend.rs | 452 ++---------- core/src/services/webdav/core.rs | 848 +++++++++++++++++++++++ core/src/services/webdav/lister.rs | 557 ++------------- core/src/services/webdav/mod.rs | 1 + core/src/services/webdav/writer.rs | 11 +- 6 files changed, 946 insertions(+), 928 deletions(-) create mode 100644 core/src/services/webdav/core.rs diff --git a/.github/services/webdav/jfrog/action.yml b/.github/services/webdav/jfrog/action.yml index aaa264ea61f5..b575fd532a45 100644 --- a/.github/services/webdav/jfrog/action.yml +++ b/.github/services/webdav/jfrog/action.yml @@ -24,10 +24,7 @@ runs: - name: Setup webdav in jfrog shell: bash working-directory: fixtures/webdav - run: | - # Can we remove this? - # touch artifactory/etc/system.yaml - docker compose -f docker-compose-webdav-jfrog.yml up -d --wait + run: docker compose -f docker-compose-webdav-jfrog.yml up -d --wait - name: Setup shell: bash diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index c2e14436fc0f..ed358b943f5c 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -16,22 +16,18 @@ // under the License. use std::collections::HashMap; -use std::collections::VecDeque; use std::fmt::Debug; use std::fmt::Formatter; +use std::str::FromStr; +use std::sync::Arc; use async_trait::async_trait; -use bytes::Buf; -use http::header; -use http::HeaderMap; -use http::Request; -use http::Response; use http::StatusCode; use log::debug; use serde::Deserialize; +use super::core::*; use super::error::parse_error; -use super::lister::Multistatus; use super::lister::WebdavLister; use super::writer::WebdavWriter; use crate::raw::*; @@ -177,6 +173,16 @@ impl Builder for WebdavBuilder { .with_context("service", Scheme::Webdav)); } }; + // Some services might return the path with suffix `/remote.php/webdav/`, we need to trim them. + let server_path = http::Uri::from_str(endpoint) + .map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid") + .with_context("service", Scheme::Webdav) + .set_source(err) + })? + .path() + .trim_end_matches('/') + .to_string(); let root = normalize_root(&self.config.root.clone().unwrap_or_default()); debug!("backend use root {}", root); @@ -190,45 +196,41 @@ impl Builder for WebdavBuilder { })? }; - let mut auth = None; + let mut authorization = None; if let Some(username) = &self.config.username { - auth = Some(format_authorization_by_basic( + authorization = Some(format_authorization_by_basic( username, self.config.password.as_deref().unwrap_or_default(), )?); } if let Some(token) = &self.config.token { - auth = Some(format_authorization_by_bearer(token)?) + authorization = Some(format_authorization_by_bearer(token)?) } debug!("backend build finished: {:?}", &self); - Ok(WebdavBackend { + + let core = Arc::new(WebdavCore { endpoint: endpoint.to_string(), - authorization: auth, + server_path, + authorization, disable_copy: self.config.disable_copy, root, client, - }) + }); + Ok(WebdavBackend { core }) } } /// Backend is used to serve `Accessor` support for http. #[derive(Clone)] pub struct WebdavBackend { - endpoint: String, - root: String, - client: HttpClient, - disable_copy: bool, - - authorization: Option, + core: Arc, } impl Debug for WebdavBackend { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Backend") - .field("endpoint", &self.endpoint) - .field("root", &self.root) - .field("client", &self.client) + f.debug_struct("WebdavBackend") + .field("core", &self.core) .finish() } } @@ -237,7 +239,7 @@ impl Debug for WebdavBackend { impl Accessor for WebdavBackend { type Reader = IncomingAsyncBody; type Writer = oio::OneShotWriter; - type Lister = Option>; + type Lister = oio::PageLister; type BlockingReader = (); type BlockingWriter = (); type BlockingLister = (); @@ -245,7 +247,7 @@ impl Accessor for WebdavBackend { fn info(&self) -> AccessorInfo { let mut ma = AccessorInfo::default(); ma.set_scheme(Scheme::Webdav) - .set_root(&self.root) + .set_root(&self.core.root) .set_native_capability(Capability { stat: true, @@ -259,12 +261,13 @@ impl Accessor for WebdavBackend { create_dir: true, delete: true, - copy: !self.disable_copy, + copy: !self.core.disable_copy, rename: true, list: true, - + // We already support recursive list but some details still need to polish. + // list_with_recursive: true, ..Default::default() }); @@ -272,60 +275,17 @@ impl Accessor for WebdavBackend { } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - self.ensure_parent_path(path).await?; - self.create_dir_internal(path).await?; - + self.core.webdav_mkcol(path).await?; Ok(RpCreateDir::default()) } async fn stat(&self, path: &str, _: OpStat) -> Result { - let mut header_map = HeaderMap::new(); - // not include children - header_map.insert("Depth", "0".parse().unwrap()); - header_map.insert(header::ACCEPT, "application/xml".parse().unwrap()); - - let resp = self.webdav_propfind(path, Some(header_map)).await?; - - let status = resp.status(); - - if !status.is_success() { - return Err(parse_error(resp).await?); - } - - let bs = resp.into_body().bytes().await?; - let s = String::from_utf8_lossy(&bs); - - // Make sure the string is escaped. - // Related to - // - // This is a temporary solution, we should find a better way to handle this. - let s = s.replace("&()_+-=;", "%26%28%29_%2B-%3D%3B"); - let result: Multistatus = quick_xml::de::from_str(&s).map_err(new_xml_deserialize_error)?; - - let response = match result.response { - Some(v) => v, - None => { - return Err(Error::new( - ErrorKind::NotFound, - "Failed getting item stat: response field was not found", - )); - } - }; - - let item = response - .first() - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "Failed getting item stat: bad response", - ) - })? - .parse_into_metadata()?; - Ok(RpStat::new(item)) + let metadata = self.core.webdav_stat(path).await?; + Ok(RpStat::new(metadata)) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.webdav_get(path, args).await?; + let resp = self.core.webdav_get(path, args).await?; let status = resp.status(); match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => { @@ -345,21 +305,19 @@ impl Accessor for WebdavBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.ensure_parent_path(path).await?; - - let p = build_abs_path(&self.root, path); + // Ensure parent path exists + self.core.webdav_mkcol(get_parent(path)).await?; Ok(( RpWrite::default(), - oio::OneShotWriter::new(WebdavWriter::new(self.clone(), args, p)), + oio::OneShotWriter::new(WebdavWriter::new(self.core.clone(), args, path.to_string())), )) } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.webdav_delete(path).await?; + let resp = self.core.webdav_delete(path).await?; let status = resp.status(); - match status { StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(RpDelete::default()), _ => Err(parse_error(resp).await?), @@ -367,57 +325,14 @@ impl Accessor for WebdavBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - if args.recursive() { - return Err(Error::new( - ErrorKind::Unsupported, - "webdav doesn't support list with recursive", - )); - } - - let mut header_map = HeaderMap::new(); - header_map.insert("Depth", "1".parse().unwrap()); - header_map.insert(header::CONTENT_TYPE, "application/xml".parse().unwrap()); - let resp = self.webdav_propfind(path, Some(header_map)).await?; - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::MULTI_STATUS => { - let bs = resp.into_body().bytes().await?; - let result: Multistatus = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; - - let result = match result.response { - Some(v) => v, - None => { - return Ok((RpList::default(), None)); - } - }; - - let l = WebdavLister::new(&self.endpoint, &self.root, path, result); - - Ok((RpList::default(), Some(oio::PageLister::new(l)))) - } - StatusCode::NOT_FOUND if path.ends_with('/') => Ok((RpList::default(), None)), - _ => Err(parse_error(resp).await?), - } + Ok(( + RpList::default(), + oio::PageLister::new(WebdavLister::new(self.core.clone(), path, args)), + )) } - /// # Notes - /// - /// There is a strange dead lock issues when copying a non-exist file, so we will check - /// if the source exists first. - /// - /// For example: async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { - if let Err(err) = self.stat(from, OpStat::default()).await { - if err.kind() == ErrorKind::NotFound { - return Err(err); - } - } - - self.ensure_parent_path(to).await?; - - let resp = self.webdav_copy(from, to).await?; + let resp = self.core.webdav_copy(from, to).await?; let status = resp.status(); @@ -428,12 +343,9 @@ impl Accessor for WebdavBackend { } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - self.ensure_parent_path(to).await?; - - let resp = self.webdav_move(from, to).await?; + let resp = self.core.webdav_move(from, to).await?; let status = resp.status(); - match status { StatusCode::CREATED | StatusCode::NO_CONTENT | StatusCode::OK => { Ok(RpRename::default()) @@ -442,279 +354,3 @@ impl Accessor for WebdavBackend { } } } - -impl WebdavBackend { - async fn webdav_get(&self, path: &str, args: OpRead) -> Result> { - let p = build_rooted_abs_path(&self.root, path); - let url: String = format!("{}{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::get(&url); - - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth.clone()) - } - - let range = args.range(); - if !range.is_full() { - req = req.header(header::RANGE, range.to_header()); - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub async fn webdav_put( - &self, - abs_path: &str, - size: Option, - args: &OpWrite, - body: AsyncBody, - ) -> Result> { - let url = format!("{}/{}", self.endpoint, percent_encode_path(abs_path)); - - let mut req = Request::put(&url); - - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth.clone()) - } - - if let Some(size) = size { - req = req.header(header::CONTENT_LENGTH, size) - } - - if let Some(mime) = args.content_type() { - req = req.header(header::CONTENT_TYPE, mime) - } - - if let Some(cd) = args.content_disposition() { - req = req.header(header::CONTENT_DISPOSITION, cd) - } - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn webdav_mkcol_absolute_path(&self, path: &str) -> Result> { - debug_assert!(path.starts_with('/'), "path must be absolute path"); - let url = format!("{}{}", self.endpoint, percent_encode_path(path)); - - let mut req = Request::builder().method("MKCOL").uri(&url); - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth); - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn webdav_propfind( - &self, - path: &str, - headers: Option, - ) -> Result> { - let p = build_rooted_abs_path(&self.root, path); - - self.webdav_propfind_absolute_path(&p, headers).await - } - - async fn webdav_propfind_absolute_path( - &self, - path: &str, - headers: Option, - ) -> Result> { - debug_assert!(path.starts_with('/'), "path must be absolute path"); - - let url = format!("{}{}", self.endpoint, percent_encode_path(path)); - let mut req = Request::builder().method("PROPFIND").uri(&url); - - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth); - } - - if let Some(headers) = headers { - for (name, value) in headers { - // all key should be not None, otherwise panic - req = req.header(name.unwrap(), value); - } - } - - // rfc4918 9.1: retrieve all properties define in specification - let body; - { - req = req.header(header::CONTENT_TYPE, "application/xml"); - // XML body must start without a new line. Otherwise, the server will panic: `xmlParseChunk() failed` - let all_prop_xml_body = r#" - - - - "#; - body = AsyncBody::Bytes(bytes::Bytes::from(all_prop_xml_body)); - } - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn webdav_delete(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::delete(&url); - - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth.clone()) - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn webdav_copy(&self, from: &str, to: &str) -> Result> { - let source = build_abs_path(&self.root, from); - let target = build_abs_path(&self.root, to); - // Make sure target's dir is exist. - self.ensure_parent_path(&target).await?; - - let source = format!("{}/{}", self.endpoint, percent_encode_path(&source)); - let target = format!("{}/{}", self.endpoint, percent_encode_path(&target)); - - let mut req = Request::builder().method("COPY").uri(&source); - - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth); - } - - req = req.header("Destination", target); - - // We always specific "T" for keeping to overwrite the destination. - req = req.header("Overwrite", "T"); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn webdav_move(&self, from: &str, to: &str) -> Result> { - // Check if the source exists first. - self.stat(from, OpStat::new()).await?; - - let source = build_abs_path(&self.root, from); - let target = build_abs_path(&self.root, to); - // Make sure target's dir is exist. - self.ensure_parent_path(&target).await?; - - let source = format!("{}/{}", self.endpoint, percent_encode_path(&source)); - let target = format!("{}/{}", self.endpoint, percent_encode_path(&target)); - - let mut req = Request::builder().method("MOVE").uri(&source); - - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth); - } - - req = req.header("Destination", target); - - // We always specific "T" for keeping to overwrite the destination. - req = req.header("Overwrite", "T"); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn create_dir_internal(&self, path: &str) -> Result<()> { - let p = build_rooted_abs_path(&self.root, path); - self.create_dir_internal_absolute_path(&p).await - } - - async fn create_dir_internal_absolute_path(&self, path: &str) -> Result<()> { - debug_assert!(path.starts_with('/'), "path must be absolute path"); - - let resp = self.webdav_mkcol_absolute_path(path).await?; - - let status = resp.status(); - - match status { - StatusCode::CREATED - // Allow multiple status - | StatusCode::MULTI_STATUS - // The MKCOL method can only be performed on a deleted or non-existent resource. - // This error means the directory already exists which is allowed by create_dir. - | StatusCode::METHOD_NOT_ALLOWED => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } - - async fn ensure_parent_path(&self, path: &str) -> Result<()> { - let path = build_rooted_abs_path(&self.root, path); - let mut path = path.as_str(); - - let mut dirs = VecDeque::default(); - - loop { - // check path first. - let parent = get_parent(path); - - let mut header_map = HeaderMap::new(); - // not include children - header_map.insert("Depth", "0".parse().unwrap()); - header_map.insert(header::ACCEPT, "application/xml".parse().unwrap()); - - let resp = self - .webdav_propfind_absolute_path(parent, Some(header_map)) - .await?; - match resp.status() { - StatusCode::OK => { - break; - } - StatusCode::MULTI_STATUS => { - let bs = resp.into_body().bytes().await?; - let s = String::from_utf8_lossy(&bs); - let result: Multistatus = - quick_xml::de::from_str(&s).map_err(new_xml_deserialize_error)?; - - if result.response.is_some() { - break; - } - - dirs.push_front(parent); - path = parent - } - StatusCode::NOT_FOUND => { - dirs.push_front(parent); - path = parent - } - _ => return Err(parse_error(resp).await?), - } - - if path == "/" { - break; - } - } - - for dir in dirs { - self.create_dir_internal_absolute_path(dir).await?; - } - Ok(()) - } -} diff --git a/core/src/services/webdav/core.rs b/core/src/services/webdav/core.rs new file mode 100644 index 000000000000..126f1e3f9df8 --- /dev/null +++ b/core/src/services/webdav/core.rs @@ -0,0 +1,848 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::error::parse_error; +use crate::raw::*; +use crate::*; +use bytes::Bytes; +use http::{header, Request, Response, StatusCode}; +use serde::Deserialize; +use std::collections::VecDeque; +use std::fmt; +use std::fmt::{Debug, Formatter}; + +/// The request to query all properties of a file or directory. +/// +/// rfc4918 9.1: retrieve all properties define in specification +static PROPFIND_REQUEST: &str = r#""#; + +/// The header to specify the depth of the query. +/// +/// Valid values are `0`, `1`, `infinity`. +/// +/// - `0`: only to the resource itself. +/// - `1`: to the resource and its internal members only. +/// - `infinity`: to the resource and all its members. +/// +/// reference: [RFC4918: 10.2. Depth Header](https://datatracker.ietf.org/doc/html/rfc4918#section-10.2) +static HEADER_DEPTH: &str = "Depth"; +/// The header to specify the destination of the query. +/// +/// The Destination request header specifies the URI that identifies a +/// destination resource for methods such as COPY and MOVE, which take +/// two URIs as parameters. +/// +/// reference: [RFC4918: 10.3. Destination Header](https://datatracker.ietf.org/doc/html/rfc4918#section-10.3) +static HEADER_DESTINATION: &str = "Destination"; +/// The header to specify the overwrite behavior of the query +/// +/// The Overwrite request header specifies whether the server should +/// overwrite a resource mapped to the destination URL during a COPY or +/// MOVE. +/// +/// Valid values are `T` and `F`. +/// +/// A value of "F" states that the server must not perform the COPY or MOVE operation +/// if the destination URL does map to a resource. +/// +/// reference: [RFC4918: 10.6. Overwrite Header](https://datatracker.ietf.org/doc/html/rfc4918#section-10.6) +static HEADER_OVERWRITE: &str = "Overwrite"; + +pub struct WebdavCore { + pub endpoint: String, + pub server_path: String, + pub root: String, + pub disable_copy: bool, + pub authorization: Option, + + pub client: HttpClient, +} + +impl Debug for WebdavCore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("WebdavCore") + .field("endpoint", &self.endpoint) + .field("root", &self.root) + .finish_non_exhaustive() + } +} + +impl WebdavCore { + pub async fn webdav_stat(&self, path: &str) -> Result { + let path = build_rooted_abs_path(&self.root, path); + self.webdav_stat_rooted_abs_path(&path).await + } + + /// Input path must be `rooted_abs_path`. + async fn webdav_stat_rooted_abs_path(&self, rooted_abs_path: &str) -> Result { + let url = format!("{}{}", self.endpoint, percent_encode_path(rooted_abs_path)); + let mut req = Request::builder().method("PROPFIND").uri(url); + + req = req.header(header::CONTENT_TYPE, "application/xml"); + req = req.header(header::CONTENT_LENGTH, PROPFIND_REQUEST.len()); + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + + // Only stat the resource itself. + req = req.header(HEADER_DEPTH, "0"); + + let req = req + .body(AsyncBody::Bytes(Bytes::from(PROPFIND_REQUEST))) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + if !resp.status().is_success() { + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + + let result: Multistatus = deserialize_multistatus(&bs)?; + let propfind_resp = result.response.first().ok_or_else(|| { + Error::new( + ErrorKind::NotFound, + "propfind response is empty, the resource is not exist", + ) + })?; + + let metadata = parse_propstat(&propfind_resp.propstat)?; + Ok(metadata) + } + + pub async fn webdav_get( + &self, + path: &str, + args: OpRead, + ) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + let url: String = format!("{}{}", self.endpoint, percent_encode_path(&path)); + + let mut req = Request::get(&url); + + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth.clone()) + } + + let range = args.range(); + if !range.is_full() { + req = req.header(header::RANGE, range.to_header()); + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn webdav_put( + &self, + path: &str, + size: Option, + args: &OpWrite, + body: AsyncBody, + ) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + let url = format!("{}{}", self.endpoint, percent_encode_path(&path)); + + let mut req = Request::put(&url); + + if let Some(v) = &self.authorization { + req = req.header(header::AUTHORIZATION, v) + } + + if let Some(v) = size { + req = req.header(header::CONTENT_LENGTH, v) + } + + if let Some(v) = args.content_type() { + req = req.header(header::CONTENT_TYPE, v) + } + + if let Some(v) = args.content_disposition() { + req = req.header(header::CONTENT_DISPOSITION, v) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn webdav_delete(&self, path: &str) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + let url = format!("{}{}", self.endpoint, percent_encode_path(&path)); + + let mut req = Request::delete(&url); + + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth.clone()) + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn webdav_copy(&self, from: &str, to: &str) -> Result> { + // Check if source file exists. + let _ = self.webdav_stat(from).await?; + // Make sure target's dir is exist. + self.webdav_mkcol(get_parent(to)).await?; + + let source = build_rooted_abs_path(&self.root, from); + let source_uri = format!("{}{}", self.endpoint, percent_encode_path(&source)); + + let target = build_rooted_abs_path(&self.root, to); + let target_uri = format!("{}{}", self.endpoint, percent_encode_path(&target)); + + let mut req = Request::builder().method("COPY").uri(&source_uri); + + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + + req = req.header(HEADER_DESTINATION, target_uri); + req = req.header(HEADER_OVERWRITE, "T"); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn webdav_move(&self, from: &str, to: &str) -> Result> { + // Check if source file exists. + let _ = self.webdav_stat(from).await?; + // Make sure target's dir is exist. + self.webdav_mkcol(get_parent(to)).await?; + + let source = build_rooted_abs_path(&self.root, from); + let source_uri = format!("{}{}", self.endpoint, percent_encode_path(&source)); + + let target = build_rooted_abs_path(&self.root, to); + let target_uri = format!("{}{}", self.endpoint, percent_encode_path(&target)); + + let mut req = Request::builder().method("MOVE").uri(&source_uri); + + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + + req = req.header(HEADER_DESTINATION, target_uri); + req = req.header(HEADER_OVERWRITE, "T"); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn webdav_list( + &self, + path: &str, + args: &OpList, + ) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + let url = format!("{}{}", self.endpoint, percent_encode_path(&path)); + + let mut req = Request::builder().method("PROPFIND").uri(&url); + + req = req.header(header::CONTENT_TYPE, "application/xml"); + req = req.header(header::CONTENT_LENGTH, PROPFIND_REQUEST.len()); + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + + if args.recursive() { + req = req.header(HEADER_DEPTH, "infinity"); + } else { + req = req.header(HEADER_DEPTH, "1"); + } + + let req = req + .body(AsyncBody::Bytes(Bytes::from(PROPFIND_REQUEST))) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + /// Create dir recursively for given path. + /// + /// # Notes + /// + /// We only expose this method to the backend since there are dependencies on input path. + pub async fn webdav_mkcol(&self, path: &str) -> Result<()> { + let path = build_rooted_abs_path(&self.root, path); + let mut path = path.as_str(); + + let mut dirs = VecDeque::default(); + + loop { + match self.webdav_stat_rooted_abs_path(path).await { + // Dir is exist, break the loop. + Ok(_) => { + break; + } + // Dir not found, keep going. + Err(err) if err.kind() == ErrorKind::NotFound => { + dirs.push_front(path); + path = get_parent(path); + } + // Unexpected error found, return it. + Err(err) => return Err(err), + } + + if path == "/" { + break; + } + } + + for dir in dirs { + self.webdav_mkcol_rooted_abs_path(dir).await?; + } + Ok(()) + } + + /// Create a dir + /// + /// Input path must be `rooted_abs_path` + /// + /// Reference: [RFC4918: 9.3.1. MKCOL Status Codes](https://datatracker.ietf.org/doc/html/rfc4918#section-9.3.1) + async fn webdav_mkcol_rooted_abs_path(&self, rooted_abs_path: &str) -> Result<()> { + let url = format!("{}{}", self.endpoint, percent_encode_path(rooted_abs_path)); + + let mut req = Request::builder().method("MKCOL").uri(&url); + + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth.clone()) + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + // 201 (Created) - The collection was created. + StatusCode::CREATED + // 405 (Method Not Allowed) - MKCOL can only be executed on an unmapped URL. + // + // The MKCOL method can only be performed on a deleted or non-existent resource. + // This error means the directory already exists which is allowed by create_dir. + | StatusCode::METHOD_NOT_ALLOWED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} + +pub fn deserialize_multistatus(bs: &[u8]) -> Result { + let s = String::from_utf8_lossy(bs); + // HACKS! HACKS! HACKS! + // + // Make sure the string is escaped. + // Related to + // + // This is a temporary solution, we should find a better way to handle this. + let s = s.replace("&()_+-=;", "%26%28%29_%2B-%3D%3B"); + + quick_xml::de::from_str(&s).map_err(new_xml_deserialize_error) +} + +pub fn parse_propstat(propstat: &Propstat) -> Result { + let Propstat { + prop: + Prop { + getlastmodified, + getcontentlength, + getcontenttype, + getetag, + resourcetype, + .. + }, + status, + } = propstat; + + if let [_, code, text] = status.splitn(3, ' ').collect::>()[..3] { + // As defined in https://tools.ietf.org/html/rfc2068#section-6.1 + let code = code.parse::().unwrap(); + if code >= 400 { + return Err(Error::new( + ErrorKind::Unexpected, + &format!("propfind response is unexpected: {} {}", code, text), + )); + } + } + + let mode: EntryMode = if resourcetype.value == Some(ResourceType::Collection) { + EntryMode::DIR + } else { + EntryMode::FILE + }; + let mut m = Metadata::new(mode); + + if let Some(v) = getcontentlength { + m.set_content_length(v.parse::().unwrap()); + } + + if let Some(v) = getcontenttype { + m.set_content_type(v); + } + + if let Some(v) = getetag { + m.set_etag(v); + } + + // https://www.rfc-editor.org/rfc/rfc4918#section-14.18 + m.set_last_modified(parse_datetime_from_rfc2822(getlastmodified)?); + + // the storage services have returned all the properties + Ok(m.with_metakey(Metakey::Complete)) +} + +#[derive(Deserialize, Debug, PartialEq, Eq, Clone, Default)] +#[serde(default)] +pub struct Multistatus { + pub response: Vec, +} + +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct PropfindResponse { + pub href: String, + pub propstat: Propstat, +} + +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct Propstat { + pub status: String, + pub prop: Prop, +} + +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct Prop { + pub getlastmodified: String, + pub getetag: Option, + pub getcontentlength: Option, + pub getcontenttype: Option, + pub resourcetype: ResourceTypeContainer, +} + +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct ResourceTypeContainer { + #[serde(rename = "$value")] + pub value: Option, +} + +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] +#[serde(rename_all = "lowercase")] +pub enum ResourceType { + Collection, +} + +#[cfg(test)] +mod tests { + use quick_xml::de::from_str; + + use super::*; + + #[test] + fn test_propstat() { + let xml = r#" + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + "#; + + let propstat = from_str::(xml).unwrap(); + assert_eq!( + propstat.prop.getlastmodified, + "Tue, 01 May 2022 06:39:47 GMT" + ); + assert_eq!( + propstat.prop.resourcetype.value.unwrap(), + ResourceType::Collection + ); + + assert_eq!(propstat.status, "HTTP/1.1 200 OK"); + } + + #[test] + fn test_response_simple() { + let xml = r#" + / + + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + + "#; + + let response = from_str::(xml).unwrap(); + assert_eq!(response.href, "/"); + + assert_eq!( + response.propstat.prop.getlastmodified, + "Tue, 01 May 2022 06:39:47 GMT" + ); + assert_eq!( + response.propstat.prop.resourcetype.value.unwrap(), + ResourceType::Collection + ); + assert_eq!(response.propstat.status, "HTTP/1.1 200 OK"); + } + + #[test] + fn test_response_file() { + let xml = r#" + /test_file + + + test_file + 1 + Tue, 07 May 2022 05:52:22 GMT + + + + + + + + + + + + + + HTTP/1.1 200 OK + + "#; + + let response = from_str::(xml).unwrap(); + assert_eq!(response.href, "/test_file"); + assert_eq!( + response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 05:52:22 GMT" + ); + assert_eq!(response.propstat.prop.getcontentlength.unwrap(), "1"); + assert_eq!(response.propstat.prop.resourcetype.value, None); + assert_eq!(response.propstat.status, "HTTP/1.1 200 OK"); + } + + #[test] + fn test_with_multiple_items_simple() { + let xml = r#" + + / + + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + + + + / + + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + + + "#; + + let multistatus = from_str::(xml).unwrap(); + + let response = multistatus.response; + assert_eq!(response.len(), 2); + assert_eq!(response[0].href, "/"); + assert_eq!( + response[0].propstat.prop.getlastmodified, + "Tue, 01 May 2022 06:39:47 GMT" + ); + } + + #[test] + fn test_with_multiple_items_mixed() { + let xml = r#" + + + / + + + / + Tue, 07 May 2022 06:39:47 GMT + + + + + + + + + + + + + + + + HTTP/1.1 200 OK + + + + /testdir/ + + + testdir + Tue, 07 May 2022 06:40:10 GMT + + + + + + + + + + + + + + + + HTTP/1.1 200 OK + + + + /test_file + + + test_file + 1 + Tue, 07 May 2022 05:52:22 GMT + + + + + + + + + + + + + + HTTP/1.1 200 OK + + + "#; + + let multistatus = from_str::(xml).unwrap(); + + let response = multistatus.response; + assert_eq!(response.len(), 3); + let first_response = &response[0]; + assert_eq!(first_response.href, "/"); + assert_eq!( + first_response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 06:39:47 GMT" + ); + + let second_response = &response[1]; + assert_eq!(second_response.href, "/testdir/"); + assert_eq!( + second_response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 06:40:10 GMT" + ); + + let third_response = &response[2]; + assert_eq!(third_response.href, "/test_file"); + assert_eq!( + third_response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 05:52:22 GMT" + ); + } + + #[test] + fn test_with_multiple_items_mixed_nginx() { + let xml = r#" + + + / + + + Fri, 17 Feb 2023 03:37:22 GMT + + + + + HTTP/1.1 200 OK + + + + /test_file_75 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_36 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_38 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_59 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_9 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_93 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_43 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_95 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + "#; + + let multistatus: Multistatus = from_str(xml).unwrap(); + + let response = multistatus.response; + assert_eq!(response.len(), 9); + + let first_response = &response[0]; + assert_eq!(first_response.href, "/"); + assert_eq!( + first_response.propstat.prop.getlastmodified, + "Fri, 17 Feb 2023 03:37:22 GMT" + ); + } +} diff --git a/core/src/services/webdav/lister.rs b/core/src/services/webdav/lister.rs index 84f52f65eeb6..b69efb61b559 100644 --- a/core/src/services/webdav/lister.rs +++ b/core/src/services/webdav/lister.rs @@ -16,33 +16,27 @@ // under the License. use async_trait::async_trait; -use serde::Deserialize; -use std::str::FromStr; +use http::StatusCode; +use std::sync::Arc; +use super::core::*; +use super::error::*; use crate::raw::*; use crate::*; pub struct WebdavLister { - server_path: String, - root: String, + core: Arc, + path: String, - response: Vec, + args: OpList, } impl WebdavLister { - /// TODO: sending request in `next_page` instead of in `new`. - pub fn new(endpoint: &str, root: &str, path: &str, response: Vec) -> Self { - // Some services might return the path with suffix `/remote.php/webdav/`, we need to trim them. - let server_path = http::Uri::from_str(endpoint) - .expect("must be valid http uri") - .path() - .trim_end_matches('/') - .to_string(); + pub fn new(core: Arc, path: &str, args: OpList) -> Self { Self { - server_path, - root: root.into(), - path: path.into(), - response, + core, + path: path.to_string(), + args, } } } @@ -50,14 +44,37 @@ impl WebdavLister { #[async_trait] impl oio::PageList for WebdavLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { - for res in &self.response { + let resp = self.core.webdav_list(&self.path, &self.args).await?; + + // jfrog artifactory's webdav services have some strange behavior. + // We add this flag to check if the server is jfrog artifactory. + // + // Example: `"x-jfrog-version": "Artifactory/7.77.5 77705900"` + let is_jfrog_artifactory = if let Some(v) = resp.headers().get("x-jfrog-version") { + v.to_str().unwrap_or_default().starts_with("Artifactory") + } else { + false + }; + + let bs = if resp.status().is_success() { + resp.into_body().bytes().await? + } else if resp.status() == StatusCode::NOT_FOUND && self.path.ends_with('/') { + ctx.done = true; + return Ok(()); + } else { + return Err(parse_error(resp).await?); + }; + + let result: Multistatus = deserialize_multistatus(&bs)?; + + for res in result.response { let mut path = res .href - .strip_prefix(&self.server_path) + .strip_prefix(&self.core.server_path) .unwrap_or(&res.href) .to_string(); - let meta = res.parse_into_metadata()?; + let meta = parse_propstat(&res.propstat)?; // Append `/` to path if it's a dir if !path.ends_with('/') && meta.is_dir() { @@ -65,25 +82,24 @@ impl oio::PageList for WebdavLister { } // Ignore the root path itself. - if self.root == path { + if self.core.root == path { continue; } - let normalized_path = build_rel_path(&self.root, &path); - let decoded_path = percent_decode_path(normalized_path.as_str()); + let normalized_path = build_rel_path(&self.core.root, &path); + let decoded_path = percent_decode_path(&normalized_path); if normalized_path == self.path || decoded_path == self.path { - // WebDav server may return the current path as an entry. + // WebDAV server may return the current path as an entry. continue; } - // Mark files complete if it's an `application/x-checksum` file. + // HACKS! HACKS! HACKS! // - // AFAIK, this content type is only used by jfrog artifactory. And this file is - // a shadow file that can't be stat, so we mark it as complete. - if meta.contains_metakey(Metakey::ContentType) - && meta.content_type() == Some("application/x-checksum") - { + // jfrog artifactory will generate a virtual checksum file for each file. + // The checksum file can't be stated, but can be listed and read. + // We ignore the checksum files to avoid listing unexpected files. + if is_jfrog_artifactory && meta.content_type() == Some("application/x-checksum") { continue; } @@ -94,484 +110,3 @@ impl oio::PageList for WebdavLister { Ok(()) } } - -#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct Multistatus { - pub response: Option>, -} - -#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct ListOpResponse { - pub href: String, - pub propstat: Propstat, -} - -impl ListOpResponse { - pub fn parse_into_metadata(&self) -> Result { - let ListOpResponse { - propstat: - Propstat { - prop: - Prop { - getlastmodified, - getcontentlength, - getcontenttype, - getetag, - resourcetype, - .. - }, - status, - }, - .. - } = self; - if let [_, code, text] = status.split(' ').collect::>()[..3] { - // As defined in https://tools.ietf.org/html/rfc2068#section-6.1 - let code = code.parse::().unwrap(); - if code >= 400 { - return Err(Error::new( - ErrorKind::Unexpected, - &format!("Invalid response: {} {}", code, text), - )); - } - } - - let mode: EntryMode = if resourcetype.value == Some(ResourceType::Collection) { - EntryMode::DIR - } else { - EntryMode::FILE - }; - let mut m = Metadata::new(mode); - - if let Some(v) = getcontentlength { - m.set_content_length(v.parse::().unwrap()); - } - - if let Some(v) = getcontenttype { - m.set_content_type(v); - } - - if let Some(v) = getetag { - m.set_etag(v); - } - - // https://www.rfc-editor.org/rfc/rfc4918#section-14.18 - m.set_last_modified(parse_datetime_from_rfc2822(getlastmodified)?); - Ok(m) - } -} - -#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct Propstat { - pub prop: Prop, - pub status: String, -} - -#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct Prop { - #[serde(default)] - pub displayname: String, - pub getlastmodified: String, - pub getetag: Option, - pub getcontentlength: Option, - pub getcontenttype: Option, - pub resourcetype: ResourceTypeContainer, -} - -#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct ResourceTypeContainer { - #[serde(rename = "$value")] - pub value: Option, -} - -#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] -#[serde(rename_all = "lowercase")] -pub enum ResourceType { - Collection, -} - -#[cfg(test)] -mod tests { - use quick_xml::de::from_str; - - use super::*; - - #[test] - fn test_propstat() { - let xml = r#" - - / - Tue, 01 May 2022 06:39:47 GMT - - - - - - - - - - HTTP/1.1 200 OK - "#; - - let propstat = from_str::(xml).unwrap(); - assert_eq!( - propstat.prop.getlastmodified, - "Tue, 01 May 2022 06:39:47 GMT" - ); - assert_eq!( - propstat.prop.resourcetype.value.unwrap(), - ResourceType::Collection - ); - - assert_eq!(propstat.status, "HTTP/1.1 200 OK"); - } - - #[test] - fn test_response_simple() { - let xml = r#" - / - - - / - Tue, 01 May 2022 06:39:47 GMT - - - - - - - - - - HTTP/1.1 200 OK - - "#; - - let response = from_str::(xml).unwrap(); - assert_eq!(response.href, "/"); - - assert_eq!(response.propstat.prop.displayname, "/"); - - assert_eq!( - response.propstat.prop.getlastmodified, - "Tue, 01 May 2022 06:39:47 GMT" - ); - assert_eq!( - response.propstat.prop.resourcetype.value.unwrap(), - ResourceType::Collection - ); - assert_eq!(response.propstat.status, "HTTP/1.1 200 OK"); - } - - #[test] - fn test_response_file() { - let xml = r#" - /test_file - - - test_file - 1 - Tue, 07 May 2022 05:52:22 GMT - - - - - - - - - - - - - - HTTP/1.1 200 OK - - "#; - - let response = from_str::(xml).unwrap(); - assert_eq!(response.href, "/test_file"); - assert_eq!( - response.propstat.prop.getlastmodified, - "Tue, 07 May 2022 05:52:22 GMT" - ); - assert_eq!(response.propstat.prop.getcontentlength.unwrap(), "1"); - assert_eq!(response.propstat.prop.resourcetype.value, None); - assert_eq!(response.propstat.status, "HTTP/1.1 200 OK"); - } - - #[test] - fn test_with_multiple_items_simple() { - let xml = r#" - - / - - - / - Tue, 01 May 2022 06:39:47 GMT - - - - - - - - - - HTTP/1.1 200 OK - - - - / - - - / - Tue, 01 May 2022 06:39:47 GMT - - - - - - - - - - HTTP/1.1 200 OK - - - "#; - - let multistatus = from_str::(xml).unwrap(); - - let response = multistatus.response.unwrap(); - assert_eq!(response.len(), 2); - assert_eq!(response[0].href, "/"); - assert_eq!( - response[0].propstat.prop.getlastmodified, - "Tue, 01 May 2022 06:39:47 GMT" - ); - } - - #[test] - fn test_with_multiple_items_mixed() { - let xml = r#" - - - / - - - / - Tue, 07 May 2022 06:39:47 GMT - - - - - - - - - - - - - - - - HTTP/1.1 200 OK - - - - /testdir/ - - - testdir - Tue, 07 May 2022 06:40:10 GMT - - - - - - - - - - - - - - - - HTTP/1.1 200 OK - - - - /test_file - - - test_file - 1 - Tue, 07 May 2022 05:52:22 GMT - - - - - - - - - - - - - - HTTP/1.1 200 OK - - - "#; - - let multistatus = from_str::(xml).unwrap(); - - let response = multistatus.response.unwrap(); - assert_eq!(response.len(), 3); - let first_response = &response[0]; - assert_eq!(first_response.href, "/"); - assert_eq!( - first_response.propstat.prop.getlastmodified, - "Tue, 07 May 2022 06:39:47 GMT" - ); - - let second_response = &response[1]; - assert_eq!(second_response.href, "/testdir/"); - assert_eq!( - second_response.propstat.prop.getlastmodified, - "Tue, 07 May 2022 06:40:10 GMT" - ); - - let third_response = &response[2]; - assert_eq!(third_response.href, "/test_file"); - assert_eq!( - third_response.propstat.prop.getlastmodified, - "Tue, 07 May 2022 05:52:22 GMT" - ); - } - - #[test] - fn test_with_multiple_items_mixed_nginx() { - let xml = r#" - - - / - - - Fri, 17 Feb 2023 03:37:22 GMT - - - - - HTTP/1.1 200 OK - - - - /test_file_75 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_36 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_38 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_59 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_9 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_93 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_43 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - /test_file_95 - - - 1 - Fri, 17 Feb 2023 03:36:54 GMT - - - HTTP/1.1 200 OK - - - - "#; - - let multistatus: Multistatus = from_str(xml).unwrap(); - - let response = multistatus.response.unwrap(); - assert_eq!(response.len(), 9); - - let first_response = &response[0]; - assert_eq!(first_response.href, "/"); - assert_eq!( - first_response.propstat.prop.getlastmodified, - "Fri, 17 Feb 2023 03:37:22 GMT" - ); - } -} diff --git a/core/src/services/webdav/mod.rs b/core/src/services/webdav/mod.rs index f3002413717f..1e18871a98c8 100644 --- a/core/src/services/webdav/mod.rs +++ b/core/src/services/webdav/mod.rs @@ -19,6 +19,7 @@ mod backend; pub use backend::WebdavBuilder as Webdav; pub use backend::WebdavConfig; +mod core; mod error; mod lister; mod writer; diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 7bf365b00f95..72ec6c098e1f 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -17,23 +17,24 @@ use async_trait::async_trait; use http::StatusCode; +use std::sync::Arc; -use super::backend::WebdavBackend; +use super::core::*; use super::error::parse_error; use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; pub struct WebdavWriter { - backend: WebdavBackend, + core: Arc, op: OpWrite, path: String, } impl WebdavWriter { - pub fn new(backend: WebdavBackend, op: OpWrite, path: String) -> Self { - WebdavWriter { backend, op, path } + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + WebdavWriter { core, op, path } } } @@ -43,7 +44,7 @@ impl oio::OneShotWrite for WebdavWriter { let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let resp = self - .backend + .core .webdav_put( &self.path, Some(bs.len() as u64),