From b9e78dfd8e5e4a4b35cf9fc2b862e465f31ae6e0 Mon Sep 17 00:00:00 2001 From: Frank Wang <1454884738@qq.com> Date: Wed, 18 Dec 2024 16:37:40 -0600 Subject: [PATCH] feat(services/s3): add append support --- core/src/services/s3/backend.rs | 20 ++++-- core/src/services/s3/config.rs | 1 + core/src/services/s3/core.rs | 111 +++++++++++++++++++++----------- core/src/services/s3/docs.md | 1 + core/src/services/s3/writer.rs | 36 ++++++++++- 5 files changed, 128 insertions(+), 41 deletions(-) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index eb9a4370b089..31d0f722b7aa 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -942,6 +942,12 @@ impl Access for S3Backend { write: true, write_can_empty: true, write_can_multi: true, + // Only S3 Express One Zone storage class supports append. + write_can_append: self + .core + .default_storage_class + .as_ref() + .is_some_and(|v| v == "EXPRESS_ONEZONE"), write_with_cache_control: true, write_with_content_type: true, write_with_content_encoding: true, @@ -1029,11 +1035,17 @@ impl Access for S3Backend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let concurrent = args.concurrent(); - let executor = args.executor().cloned(); - let writer = S3Writer::new(self.core.clone(), path, args); + let writer = S3Writer::new(self.core.clone(), path, args.clone()); - let w = oio::MultipartWriter::new(writer, executor, concurrent); + let w = if args.append() { + S3Writers::Two(oio::AppendWriter::new(writer)) + } else { + S3Writers::One(oio::MultipartWriter::new( + writer, + args.executor().cloned(), + args.concurrent(), + )) + }; Ok((RpWrite::default(), w)) } diff --git a/core/src/services/s3/config.rs b/core/src/services/s3/config.rs index 41899e9e4b90..2628f0d7e01b 100644 --- a/core/src/services/s3/config.rs +++ b/core/src/services/s3/config.rs @@ -143,6 +143,7 @@ pub struct S3Config { /// - `GLACIER_IR` /// - `INTELLIGENT_TIERING` /// - `ONEZONE_IA` + /// - `EXPRESS_ONEZONE` /// - `OUTPOSTS` /// - `REDUCED_REDUNDANCY` /// - `STANDARD` diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 19bdbfdf7209..3ae098822a7f 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -70,6 +70,8 @@ pub mod constants { pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = "x-amz-copy-source-server-side-encryption-customer-key-md5"; + pub const X_AMZ_WRITE_OFFSET_BYTES: &str = "x-amz-write-offset-bytes"; + pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-"; pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; @@ -289,6 +291,54 @@ impl S3Core { } req } + + pub fn insert_metadata_headers( + &self, + mut req: http::request::Builder, + size: Option, + args: &OpWrite, + ) -> http::request::Builder { + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos) + } + + if let Some(encoding) = args.content_encoding() { + req = req.header(CONTENT_ENCODING, encoding); + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control) + } + + if let Some(if_match) = args.if_match() { + req = req.header(IF_MATCH, if_match); + } + + if args.if_not_exists() { + req = req.header(IF_NONE_MATCH, "*"); + } + + // Set storage class header + if let Some(v) = &self.default_storage_class { + req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); + } + + // Set user metadata headers. + if let Some(user_metadata) = args.user_metadata() { + for (key, value) in user_metadata { + req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value) + } + } + req + } } impl S3Core { @@ -441,55 +491,44 @@ impl S3Core { let mut req = Request::put(&url); - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size.to_string()) - } + req = self.insert_metadata_headers(req, size, args); - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime) - } + // Set SSE headers. + req = self.insert_sse_headers(req, true); - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos) + // Calculate Checksum. + if let Some(checksum) = self.calculate_checksum(&body) { + // Set Checksum header. + req = self.insert_checksum_header(req, &checksum); } - if let Some(encoding) = args.content_encoding() { - req = req.header(CONTENT_ENCODING, encoding); - } + // Set body + let req = req.body(body).map_err(new_request_build_error)?; - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } + Ok(req) + } - if let Some(if_match) = args.if_match() { - req = req.header(IF_MATCH, if_match); - } + pub async fn s3_append_object_request( + &self, + path: &str, + position: u64, + size: u64, + args: &OpWrite, + body: Buffer, + ) -> Result> { + let p = build_abs_path(&self.root, path); - if args.if_not_exists() { - req = req.header(IF_NONE_MATCH, "*"); - } + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); - } + let mut req = Request::put(&url); - // Set user metadata headers. - if let Some(user_metadata) = args.user_metadata() { - for (key, value) in user_metadata { - req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value) - } - } + req = self.insert_metadata_headers(req, Some(size), args); + + req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string()); // Set SSE headers. req = self.insert_sse_headers(req, true); - // Calculate Checksum. - if let Some(checksum) = self.calculate_checksum(&body) { - // Set Checksum header. - req = self.insert_checksum_header(req, &checksum); - } - // Set body let req = req.body(body).map_err(new_request_build_error)?; diff --git a/core/src/services/s3/docs.md b/core/src/services/s3/docs.md index 1bae73dedfcd..6386aaf5779d 100644 --- a/core/src/services/s3/docs.md +++ b/core/src/services/s3/docs.md @@ -5,6 +5,7 @@ This service can be used to: - [x] stat - [x] read - [x] write +- [x] append - [x] create_dir - [x] delete - [x] copy diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index e0c7c5084b59..e3471bd7b844 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -27,7 +27,7 @@ use super::error::S3Error; use crate::raw::*; use crate::*; -pub type S3Writers = oio::MultipartWriter; +pub type S3Writers = TwoWays, oio::AppendWriter>; pub struct S3Writer { core: Arc, @@ -188,3 +188,37 @@ impl oio::MultipartWrite for S3Writer { } } } + +impl oio::AppendWrite for S3Writer { + async fn offset(&self) -> Result { + let resp = self + .core + .s3_head_object(&self.path, OpStat::default()) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()), + StatusCode::NOT_FOUND => Ok(0), + _ => Err(parse_error(resp)), + } + } + + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + let mut req = self + .core + .s3_append_object_request(&self.path, offset, size, &self.op, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp)), + } + } +}