From b4dda8f3fad8364bafd44c496f17df435f4b7f6c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Jun 2024 23:12:09 +0800 Subject: [PATCH] feat(core/gcs): Add concurrent write for gcs back (#4820) Signed-off-by: Xuanwo --- core/src/services/gcs/backend.rs | 32 ++++---- core/src/services/gcs/core.rs | 124 ++++++++++++++++++++++++++++++- core/src/services/gcs/error.rs | 9 +-- core/src/services/gcs/lister.rs | 2 +- core/src/services/gcs/writer.rs | 121 +++++++++++++++++------------- 5 files changed, 218 insertions(+), 70 deletions(-) diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index 248de35916fa..208b6cb71b6b 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -357,12 +357,18 @@ impl Access for GcsBackend { write_can_empty: true, write_can_multi: true, write_with_content_type: true, - // The buffer size should be a multiple of 256 KiB (256 x 1024 bytes), unless it's the last chunk that completes the upload. - // Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage. - // It's recommended that you use at least 8 MiB for the chunk size. + // The min multipart size of Gcs is 5 MiB. // - // Reference: [Perform resumable uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads) - write_multi_align_size: Some(8 * 1024 * 1024), + // ref: + write_multi_min_size: Some(5 * 1024 * 1024), + // The max multipart size of Gcs is 5 GiB. + // + // ref: + write_multi_max_size: if cfg!(target_pointer_width = "64") { + Some(5 * 1024 * 1024 * 1024) + } else { + Some(usize::MAX) + }, delete: true, copy: true, @@ -388,7 +394,7 @@ impl Access for GcsBackend { let resp = self.core.gcs_get_object_metadata(path, &args).await?; if !resp.status().is_success() { - return Err(parse_error(resp).await?); + return Err(parse_error(resp)); } let slc = resp.into_body(); @@ -427,16 +433,16 @@ impl Access for GcsBackend { _ => { let (part, mut body) = resp.into_parts(); let buf = body.to_buffer().await?; - Err(parse_error(Response::from_parts(part, buf)).await?) + Err(parse_error(Response::from_parts(part, buf))) } } } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let concurrent = args.concurrent(); let executor = args.executor().cloned(); let w = GcsWriter::new(self.core.clone(), path, args); - // Gcs can't support concurrent write, always use concurrent=1 for now. - let w = oio::RangeWriter::new(w, executor, 1); + let w = oio::MultipartWriter::new(w, executor, concurrent); Ok((RpWrite::default(), w)) } @@ -448,7 +454,7 @@ impl Access for GcsBackend { if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { Ok(RpDelete::default()) } else { - Err(parse_error(resp).await?) + Err(parse_error(resp)) } } @@ -470,7 +476,7 @@ impl Access for GcsBackend { if resp.status().is_success() { Ok(RpCopy::default()) } else { - Err(parse_error(resp).await?) + Err(parse_error(resp)) } } @@ -544,7 +550,7 @@ impl Access for GcsBackend { if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { batched_result.push((path, Ok(RpDelete::default().into()))); } else { - batched_result.push((path, Err(parse_error(resp).await?))); + batched_result.push((path, Err(parse_error(resp)))); } } @@ -552,7 +558,7 @@ impl Access for GcsBackend { } else { // If the overall request isn't formatted correctly and Cloud Storage is unable to parse it into sub-requests, you receive a 400 error. // Otherwise, Cloud Storage returns a 200 status code, even if some or all of the sub-requests fail. - Err(parse_error(resp).await?) + Err(parse_error(resp)) } } } diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 4a7f0b4657e3..47da3904f764 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -23,6 +23,7 @@ use std::time::Duration; use backon::ExponentialBuilder; use backon::Retryable; +use bytes::Bytes; use http::header::CONTENT_LENGTH; use http::header::CONTENT_RANGE; use http::header::CONTENT_TYPE; @@ -37,7 +38,7 @@ use reqsign::GoogleCredentialLoader; use reqsign::GoogleSigner; use reqsign::GoogleToken; use reqsign::GoogleTokenLoader; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::json; use super::uri::percent_encode_path; @@ -488,6 +489,20 @@ impl GcsCore { self.send(req).await } + pub async fn gcs_initiate_multipart_upload(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}/{}?uploads", self.endpoint, self.bucket, p); + + let mut req = Request::post(&url) + .header(CONTENT_LENGTH, 0) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + pub async fn gcs_initiate_resumable_upload(&self, path: &str) -> Result> { let p = build_abs_path(&self.root, path); let url = format!( @@ -504,6 +519,90 @@ impl GcsCore { self.send(req).await } + pub async fn gcs_upload_part( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: u64, + body: Buffer, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}?partNumber={}&uploadId={}", + self.endpoint, + self.bucket, + percent_encode_path(&p), + part_number, + percent_encode_path(upload_id) + ); + + let mut req = Request::put(&url); + + req = req.header(CONTENT_LENGTH, size); + + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn gcs_complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + parts: Vec, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}?uploadId={}", + self.endpoint, + self.bucket, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let req = Request::post(&url); + + let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts }) + .map_err(new_xml_deserialize_error)?; + // Make sure content length has been set to avoid post with chunked encoding. + let req = req.header(CONTENT_LENGTH, content.len()); + // Set content-type to `application/xml` to avoid mixed with form post. + let req = req.header(CONTENT_TYPE, "application/xml"); + + let mut req = req + .body(Buffer::from(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn gcs_abort_multipart_upload( + &self, + path: &str, + upload_id: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}?uploadId={}", + self.endpoint, + self.bucket, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let mut req = Request::delete(&url) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + pub fn gcs_upload_in_resumable_upload( &self, location: &str, @@ -592,6 +691,29 @@ pub struct ListResponseItem { pub content_type: String, } +/// Result of CreateMultipartUpload +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct InitiateMultipartUploadResult { + pub upload_id: String, +} + +/// Request of CompleteMultipartUploadRequest +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequest { + pub part: Vec, +} + +#[derive(Clone, Default, Debug, Serialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequestPart { + #[serde(rename = "PartNumber")] + pub part_number: usize, + #[serde(rename = "ETag")] + pub etag: String, +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/services/gcs/error.rs b/core/src/services/gcs/error.rs index 4859615ebbfe..f3df0066ceeb 100644 --- a/core/src/services/gcs/error.rs +++ b/core/src/services/gcs/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -49,9 +48,9 @@ struct GcsErrorDetail { } /// Parse error response into Error. -pub async fn parse_error(resp: Response) -> Result { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); +pub fn parse_error(resp: Response) -> Error { + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), @@ -79,7 +78,7 @@ pub async fn parse_error(resp: Response) -> Result { err = err.set_temporary(); } - Ok(err) + err } #[cfg(test)] diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs index b1674db5334e..ef37a767a6ac 100644 --- a/core/src/services/gcs/lister.rs +++ b/core/src/services/gcs/lister.rs @@ -78,7 +78,7 @@ impl oio::PageList for GcsLister { .await?; if !resp.status().is_success() { - return Err(parse_error(resp).await?); + return Err(parse_error(resp)); } let bytes = resp.into_body(); diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index bd2827b76475..c0d234b29177 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - +use bytes::Buf; use http::StatusCode; +use std::sync::Arc; -use super::core::GcsCore; +use super::core::{CompleteMultipartUploadRequestPart, GcsCore, InitiateMultipartUploadResult}; use super::error::parse_error; use crate::raw::*; use crate::*; -pub type GcsWriters = oio::RangeWriter; +pub type GcsWriters = oio::MultipartWriter; pub struct GcsWriter { core: Arc, @@ -42,8 +42,8 @@ impl GcsWriter { } } -impl oio::RangeWrite for GcsWriter { - async fn write_once(&self, body: Buffer) -> Result<()> { +impl oio::MultipartWrite for GcsWriter { + async fn write_once(&self, _: u64, body: Buffer) -> Result<()> { let size = body.len() as u64; let mut req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), @@ -60,69 +60,90 @@ impl oio::RangeWrite for GcsWriter { match status { StatusCode::CREATED | StatusCode::OK => Ok(()), - _ => Err(parse_error(resp).await?), + _ => Err(parse_error(resp)), } } - async fn initiate_range(&self) -> Result { - let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?; - let status = resp.status(); + async fn initiate_part(&self) -> Result { + let resp = self + .core + .gcs_initiate_multipart_upload(&percent_encode_path(&self.path)) + .await?; - match status { - StatusCode::OK => { - let bs = parse_location(resp.headers())?; - if let Some(location) = bs { - Ok(location.to_string()) - } else { - Err(Error::new( - ErrorKind::Unexpected, - "location is not in the response header", - )) - } - } - _ => Err(parse_error(resp).await?), + if !resp.status().is_success() { + return Err(parse_error(resp)); } - } - async fn write_range(&self, location: &str, written: u64, body: Buffer) -> Result<()> { - let size = body.len() as u64; - let mut req = self - .core - .gcs_upload_in_resumable_upload(location, size, written, body)?; + let buf = resp.into_body(); + let upload_id: InitiateMultipartUploadResult = + quick_xml::de::from_reader(buf.reader()).map_err(new_xml_deserialize_error)?; + Ok(upload_id.upload_id) + } - self.core.sign(&mut req).await?; + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + body: Buffer, + ) -> Result { + // Gcs requires part number must between [1..=10000] + let part_number = part_number + 1; - let resp = self.core.send(req).await?; + let resp = self + .core + .gcs_upload_part(&self.path, upload_id, part_number, size, body) + .await?; - let status = resp.status(); - match status { - StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()), - _ => Err(parse_error(resp).await?), + if !resp.status().is_success() { + return Err(parse_error(resp)); } + + let etag = parse_etag(resp.headers())? + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "ETag not present in returning response", + ) + })? + .to_string(); + + Ok(oio::MultipartPart { + part_number, + etag, + checksum: None, + }) } - async fn complete_range(&self, location: &str, written: u64, body: Buffer) -> Result<()> { - let size = body.len() as u64; + async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + let parts = parts + .iter() + .map(|p| CompleteMultipartUploadRequestPart { + part_number: p.part_number, + etag: p.etag.clone(), + }) + .collect(); + let resp = self .core - .gcs_complete_resumable_upload(location, written, size, body) + .gcs_complete_multipart_upload(&self.path, upload_id, parts) .await?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - _ => Err(parse_error(resp).await?), + if !resp.status().is_success() { + return Err(parse_error(resp)); } + Ok(()) } - async fn abort_range(&self, location: &str) -> Result<()> { - let resp = self.core.gcs_abort_resumable_upload(location).await?; - - match resp.status().as_u16() { - // gcs returns 499 if the upload aborted successfully - // reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json - 499 => Ok(()), - _ => Err(parse_error(resp).await?), + async fn abort_part(&self, upload_id: &str) -> Result<()> { + let resp = self + .core + .gcs_abort_multipart_upload(&self.path, upload_id) + .await?; + match resp.status() { + // gcs returns code 204 if abort succeeds. + StatusCode::NO_CONTENT => Ok(()), + _ => Err(parse_error(resp)), } } }