From dc2a634ab71d7a81cb9c84ff7d26540ed67adc9c Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Tue, 5 Nov 2024 17:32:59 -0500 Subject: [PATCH] Address review feedback --- object_store/src/aws/client.rs | 12 +++++- object_store/src/aws/mod.rs | 55 +++++++++++++++++----------- object_store/src/aws/precondition.rs | 14 +++++-- 3 files changed, 55 insertions(+), 26 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 2e04683e7b30..a610e635178d 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -641,7 +641,7 @@ impl S3Client { PutPartPayload::Part(payload) => request.with_payload(payload), PutPartPayload::Copy(path) => request.header( "x-amz-copy-source", - &format!("{}/{}", self.config.bucket, path), + &format!("{}/{}", self.config.bucket, encode_path(path)), ), }; @@ -671,6 +671,16 @@ impl S3Client { Ok(PartId { content_id }) } + pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { + self.request(Method::DELETE, location) + .query(&[("uploadId", upload_id)]) + .with_encryption_headers() + .send() + .await?; + + Ok(()) + } + pub(crate) async fn complete_multipart( &self, location: &Path, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index acfa05395901..21561320ec9b 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -302,27 +302,40 @@ impl ObjectStore for AmazonS3 { .client .create_multipart(to, PutMultipartOpts::default()) .await?; - let part_id = self - .client - .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) - .await?; - let res = match self - .client - .complete_multipart( - to, - &upload_id, - vec![part_id], - CompleteMultipartMode::Create, - ) - .await - { - Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { - path: to.to_string(), - source: Box::new(e), - }), - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - }; + + let res = async { + let part_id = self + .client + .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) + .await?; + match self + .client + .complete_multipart( + to, + &upload_id, + vec![part_id], + CompleteMultipartMode::Create, + ) + .await + { + Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + path: to.to_string(), + source: Box::new(e), + }), + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } + .await; + + // If the multipart upload failed, make a best effort attempt to + // clean it up. It's the caller's responsibility to add a + // lifecycle rule if guaranteed cleanup is required, as we + // cannot protect against an ill-timed process crash. + if res.is_err() { + let _ = self.client.abort_multipart(to, &upload_id).await; + } + return res; } Some(S3CopyIfNotExists::Dynamo(lock)) => { diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index 80f3c1a03615..ff7f6d934316 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -47,11 +47,17 @@ pub enum S3CopyIfNotExists { /// Encoded as `header-with-status:::` ignoring whitespace HeaderWithStatus(String, String, reqwest::StatusCode), /// Native Amazon S3 supports copy if not exists through a multipart upload - /// where the upload copies an existing object and is completed only if - /// the new object does not already exist. + /// where the upload copies an existing object and is completed only if the + /// new object does not already exist. /// - /// WARNING: When using this mode, `copy_if_not_exists` does not copy - /// tags or attributes from the source object. + /// WARNING: When using this mode, `copy_if_not_exists` does not copy tags + /// or attributes from the source object. + /// + /// WARNING: When using this mode, `copy_if_not_exists` makes only a best + /// effort attempt to clean up the multipart upload if the copy operation + /// fails. Consider using a lifecycle rule to automatically clean up + /// abandoned multipart uploads. See [the module + /// docs](super#multipart-uploads) for details. /// /// Encoded as `multipart` ignoring whitespace. Multipart,