diff --git a/.github/services/cos/cos/action.yml b/.github/services/cos/cos/action.yml index 6dc23ede7a1d..c310a6d14e15 100644 --- a/.github/services/cos/cos/action.yml +++ b/.github/services/cos/cos/action.yml @@ -16,7 +16,7 @@ # under the License. name: cos -description: 'Behavior test for COS. This service is sponsored by @datafuse_labs.' +description: "Behavior test for COS. This service is sponsored by @datafuse_labs." runs: using: "composite" @@ -30,3 +30,8 @@ runs: OPENDAL_COS_ENDPOINT: op://services/cos/endpoint OPENDAL_COS_SECRET_ID: op://services/cos/secret_id OPENDAL_COS_SECRET_KEY: op://services/cos/secret_key + + - name: Add extra settings + shell: bash + run: | + echo "OPENDAL_COS_ENABLE_VERSIONING=true" >> $GITHUB_ENV diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 86d7e342ca36..4bbb658c8f48 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -29,9 +29,10 @@ use reqsign::TencentCosSigner; use super::core::*; use super::delete::CosDeleter; use super::error::parse_error; -use super::lister::CosLister; +use super::lister::{CosLister, CosListers, CosObjectVersionsLister}; use super::writer::CosWriter; use super::writer::CosWriters; +use crate::raw::oio::PageLister; use crate::raw::*; use crate::services::CosConfig; use crate::*; @@ -123,6 +124,13 @@ impl CosBuilder { self } + /// Set bucket versioning status for this backend + pub fn enable_versioning(mut self, enabled: bool) -> Self { + self.config.enable_versioning = enabled; + + self + } + /// Disable config load so that opendal will not load config from /// environment. /// @@ -215,6 +223,7 @@ impl Builder for CosBuilder { bucket: bucket.clone(), root, endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint), + enable_versioning: self.config.enable_versioning, signer, loader: cred_loader, client, @@ -232,7 +241,7 @@ pub struct CosBackend { impl Access for CosBackend { type Reader = HttpBody; type Writer = CosWriters; - type Lister = oio::PageLister; + type Lister = CosListers; type Deleter = oio::OneShotDeleter; type BlockingReader = (); type BlockingWriter = (); @@ -253,15 +262,18 @@ impl Access for CosBackend { stat_has_content_type: true, stat_has_content_encoding: true, stat_has_content_range: true, + stat_with_version: self.core.enable_versioning, stat_has_etag: true, stat_has_content_md5: true, stat_has_last_modified: true, stat_has_content_disposition: true, + stat_has_version: true, read: true, read_with_if_match: true, read_with_if_none_match: true, + read_with_version: self.core.enable_versioning, write: true, write_can_empty: true, @@ -270,8 +282,8 @@ impl Access for CosBackend { write_with_content_type: true, write_with_cache_control: true, write_with_content_disposition: true, - // TODO: set this to false while version has been enabled. - write_with_if_not_exists: true, + // Cos doesn't support forbid overwrite while version has been enabled. + write_with_if_not_exists: !self.core.enable_versioning, // The min multipart size of COS is 1 MiB. // // ref: @@ -286,10 +298,13 @@ impl Access for CosBackend { }, delete: true, + delete_with_version: self.core.enable_versioning, copy: true, list: true, list_with_recursive: true, + list_with_versions: self.core.enable_versioning, + list_with_deleted: self.core.enable_versioning, list_has_content_length: true, presign: true, @@ -311,7 +326,16 @@ impl Access for CosBackend { let status = resp.status(); match status { - StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), + StatusCode::OK => { + let headers = resp.headers(); + let mut meta = parse_into_metadata(path, headers)?; + + if let Some(v) = parse_header_to_str(headers, "x-cos-version-id")? { + meta.set_version(v); + } + + Ok(RpStat::new(meta)) + } _ => Err(parse_error(resp)), } } @@ -357,8 +381,22 @@ impl Access for CosBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let l = CosLister::new(self.core.clone(), path, args.recursive(), args.limit()); - Ok((RpList::default(), oio::PageLister::new(l))) + let l = if args.versions() || args.deleted() { + TwoWays::Two(PageLister::new(CosObjectVersionsLister::new( + self.core.clone(), + path, + args, + ))) + } else { + TwoWays::One(PageLister::new(CosLister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + ))) + }; + + Ok((RpList::default(), l)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { diff --git a/core/src/services/cos/config.rs b/core/src/services/cos/config.rs index 8c73f00ca8c1..87436a84b61f 100644 --- a/core/src/services/cos/config.rs +++ b/core/src/services/cos/config.rs @@ -35,6 +35,8 @@ pub struct CosConfig { pub secret_key: Option, /// Bucket of this backend. pub bucket: Option, + /// is bucket versioning enabled for this bucket + pub enable_versioning: bool, /// Disable config load so that opendal will not load config from pub disable_config_load: bool, } diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs index 4d28b268895b..020b57babd19 100644 --- a/core/src/services/cos/core.rs +++ b/core/src/services/cos/core.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::fmt::Write; use std::time::Duration; use bytes::Bytes; @@ -37,10 +38,15 @@ use serde::Serialize; use crate::raw::*; use crate::*; +pub mod constants { + pub const COS_QUERY_VERSION_ID: &str = "versionId"; +} + pub struct CosCore { pub bucket: String, pub root: String, pub endpoint: String, + pub enable_versioning: bool, pub signer: TencentCosSigner, pub loader: TencentCosCredentialLoader, @@ -125,7 +131,19 @@ impl CosCore { ) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::COS_QUERY_VERSION_ID, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } let mut req = Request::get(&url); @@ -200,7 +218,19 @@ impl CosCore { pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::COS_QUERY_VERSION_ID, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } let mut req = Request::head(&url); @@ -217,10 +247,22 @@ impl CosCore { Ok(req) } - pub async fn cos_delete_object(&self, path: &str) -> Result> { + pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::COS_QUERY_VERSION_ID, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } let req = Request::delete(&url); @@ -434,6 +476,50 @@ impl CosCore { self.sign(&mut req).await?; self.send(req).await } + + pub async fn cos_list_object_versions( + &self, + prefix: &str, + delimiter: &str, + limit: Option, + key_marker: &str, + version_id_marker: &str, + ) -> Result> { + let p = build_abs_path(&self.root, prefix); + + let mut url = format!("{}?versions", self.endpoint); + if !p.is_empty() { + write!(url, "&prefix={}", percent_encode_path(p.as_str())) + .expect("write into string must succeed"); + } + if !delimiter.is_empty() { + write!(url, "&delimiter={}", delimiter).expect("write into string must succeed"); + } + + if let Some(limit) = limit { + write!(url, "&max-keys={}", limit).expect("write into string must succeed"); + } + if !key_marker.is_empty() { + write!(url, "&key-marker={}", percent_encode_path(key_marker)) + .expect("write into string must succeed"); + } + if !version_id_marker.is_empty() { + write!( + url, + "&version-id-marker={}", + percent_encode_path(version_id_marker) + ) + .expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } } /// Result of CreateMultipartUpload @@ -511,6 +597,45 @@ pub struct ListObjectsOutputContent { pub size: u64, } +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct OutputCommonPrefix { + pub prefix: String, +} + +/// Output of ListObjectVersions +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct ListObjectVersionsOutput { + pub is_truncated: Option, + pub next_key_marker: Option, + pub next_version_id_marker: Option, + pub common_prefixes: Vec, + pub version: Vec, + pub delete_marker: Vec, +} + +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectVersionsOutputVersion { + pub key: String, + pub version_id: String, + pub is_latest: bool, + pub size: u64, + pub last_modified: String, + #[serde(rename = "ETag")] + pub etag: Option, +} + +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectVersionsOutputDeleteMarker { + pub key: String, + pub version_id: String, + pub is_latest: bool, + pub last_modified: String, +} + #[cfg(test)] mod tests { use bytes::Buf; diff --git a/core/src/services/cos/delete.rs b/core/src/services/cos/delete.rs index 17319ba91cd1..bd29f8521d95 100644 --- a/core/src/services/cos/delete.rs +++ b/core/src/services/cos/delete.rs @@ -33,8 +33,8 @@ impl CosDeleter { } impl oio::OneShotDelete for CosDeleter { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let resp = self.core.cos_delete_object(&path).await?; + async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> { + let resp = self.core.cos_delete_object(&path, &args).await?; let status = resp.status(); diff --git a/core/src/services/cos/lister.rs b/core/src/services/cos/lister.rs index 880544425008..465bf97bce31 100644 --- a/core/src/services/cos/lister.rs +++ b/core/src/services/cos/lister.rs @@ -22,11 +22,15 @@ use quick_xml::de; use super::core::*; use super::error::parse_error; +use crate::raw::oio::PageContext; use crate::raw::*; use crate::EntryMode; +use crate::Error; use crate::Metadata; use crate::Result; +pub type CosListers = TwoWays, oio::PageLister>; + pub struct CosLister { core: Arc, path: String, @@ -95,3 +99,139 @@ impl oio::PageList for CosLister { Ok(()) } } + +/// refer: https://cloud.tencent.com/document/product/436/35521 +pub struct CosObjectVersionsLister { + core: Arc, + + prefix: String, + args: OpList, + + delimiter: &'static str, + abs_start_after: Option, +} + +impl CosObjectVersionsLister { + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + let abs_start_after = args + .start_after() + .map(|start_after| build_abs_path(&core.root, start_after)); + + Self { + core, + prefix: path.to_string(), + args, + delimiter, + abs_start_after, + } + } +} + +impl oio::PageList for CosObjectVersionsLister { + async fn next_page(&self, ctx: &mut PageContext) -> Result<()> { + let markers = ctx.token.rsplit_once(" "); + let (key_marker, version_id_marker) = if let Some(data) = markers { + data + } else if let Some(start_after) = &self.abs_start_after { + (start_after.as_str(), "") + } else { + ("", "") + }; + + let resp = self + .core + .cos_list_object_versions( + &self.prefix, + self.delimiter, + self.args.limit(), + key_marker, + version_id_marker, + ) + .await?; + if resp.status() != http::StatusCode::OK { + return Err(parse_error(resp)); + } + + let body = resp.into_body(); + let output: ListObjectVersionsOutput = de::from_reader(body.reader()) + .map_err(new_xml_deserialize_error) + // Allow Cos list to retry on XML deserialization errors. + // + // This is because the Cos list API may return incomplete XML data under high load. + // We are confident that our XML decoding logic is correct. When this error occurs, + // we allow retries to obtain the correct data. + .map_err(Error::set_temporary)?; + + ctx.done = if let Some(is_truncated) = output.is_truncated { + !is_truncated + } else { + false + }; + ctx.token = format!( + "{} {}", + output.next_key_marker.unwrap_or_default(), + output.next_version_id_marker.unwrap_or_default() + ); + + for prefix in output.common_prefixes { + let de = oio::Entry::new( + &build_rel_path(&self.core.root, &prefix.prefix), + Metadata::new(EntryMode::DIR), + ); + ctx.entries.push_back(de); + } + + for version_object in output.version { + // `list` must be additive, so we need to include the latest version object + // even if `versions` is not enabled. + // + // Here we skip all non-latest version objects if `versions` is not enabled. + if !(self.args.versions() || version_object.is_latest) { + continue; + } + + let mut path = build_rel_path(&self.core.root, &version_object.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + meta.set_version(&version_object.version_id); + meta.set_is_current(version_object.is_latest); + meta.set_content_length(version_object.size); + meta.set_last_modified(parse_datetime_from_rfc3339( + version_object.last_modified.as_str(), + )?); + if let Some(etag) = version_object.etag { + meta.set_etag(&etag); + meta.set_content_md5(etag.trim_matches('"')); + } + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); + } + + if self.args.deleted() { + for delete_marker in output.delete_marker { + let mut path = build_rel_path(&self.core.root, &delete_marker.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_version(&delete_marker.version_id); + meta.set_is_deleted(true); + meta.set_is_current(delete_marker.is_latest); + meta.set_last_modified(parse_datetime_from_rfc3339( + delete_marker.last_modified.as_str(), + )?); + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); + } + } + + Ok(()) + } +}