From 3c9ffb25dd4ed6b89e091c5dccbf282e4281f906 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Fri, 4 Oct 2024 12:56:59 +0200 Subject: [PATCH 01/19] logs --- object_store/src/aws/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index f5204a5365ed..cba6abdd6709 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -34,6 +34,7 @@ use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; +use tracing::info; use url::Url; use crate::aws::client::{RequestError, S3Client}; @@ -337,18 +338,21 @@ impl MultipartUpload for S3MultiPartUpload { let idx = self.part_idx; self.part_idx += 1; let state = Arc::clone(&self.state); + info!(?idx, path = ?state.location, "uploading part"); Box::pin(async move { let part = state .client .put_part(&state.location, &state.upload_id, idx, data) .await?; state.parts.put(idx, part); + info!(?idx, path = ?state.location, upload_id = state.upload_id, "uploaded part"); Ok(()) }) } async fn complete(&mut self) -> Result { let parts = self.state.parts.finish(self.part_idx)?; + info!(upload_id = self.state.upload_id, part_idx = self.part_idx, ?parts, location = ?self.state.location, "completing multipart upload"); self.state .client @@ -357,6 +361,7 @@ impl MultipartUpload for S3MultiPartUpload { } async fn abort(&mut self) -> Result<()> { + info!(upload_id = self.state.upload_id, part_idx = self.part_idx, location = ?self.state.location, "aborting multipart upload"); self.state .client .request(Method::DELETE, &self.state.location) From 3362fba2b8e2bea3e3806a02f6b4ea0a8edabb3f Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 14 Oct 2024 14:48:57 +0200 Subject: [PATCH 02/19] printlns --- object_store/src/aws/mod.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index cba6abdd6709..cd06cf98d177 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -34,7 +34,6 @@ use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; -use tracing::info; use url::Url; use crate::aws::client::{RequestError, S3Client}; @@ -338,21 +337,27 @@ impl MultipartUpload for S3MultiPartUpload { let idx = self.part_idx; self.part_idx += 1; let state = Arc::clone(&self.state); - info!(?idx, path = ?state.location, "uploading part"); + println!("uploading part: {}, location: {:?}", idx, state.location); Box::pin(async move { let part = state .client .put_part(&state.location, &state.upload_id, idx, data) .await?; state.parts.put(idx, part); - info!(?idx, path = ?state.location, upload_id = state.upload_id, "uploaded part"); + println!( + "uploaded part: {}, location: {:?}, upload_id: {}", + idx, state.location, state.upload_id + ); Ok(()) }) } async fn complete(&mut self) -> Result { let parts = self.state.parts.finish(self.part_idx)?; - info!(upload_id = self.state.upload_id, part_idx = self.part_idx, ?parts, location = ?self.state.location, "completing multipart upload"); + println!( + "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", + self.state.upload_id, self.part_idx, self.state.location, parts + ); self.state .client @@ -361,7 +366,10 @@ impl MultipartUpload for S3MultiPartUpload { } async fn abort(&mut self) -> Result<()> { - info!(upload_id = self.state.upload_id, part_idx = self.part_idx, location = ?self.state.location, "aborting multipart upload"); + println!( + "aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}", + self.state.upload_id, self.part_idx, self.state.location + ); self.state .client .request(Method::DELETE, &self.state.location) From fd846a82c35e8204e3210844931935dbaa1f1787 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 14 Oct 2024 15:01:58 +0200 Subject: [PATCH 03/19] more logs --- object_store/src/aws/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index cd06cf98d177..f9ee83b87294 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -334,10 +334,14 @@ struct UploadState { #[async_trait] impl MultipartUpload for S3MultiPartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { + let len = data.content_length(); let idx = self.part_idx; self.part_idx += 1; let state = Arc::clone(&self.state); - println!("uploading part: {}, location: {:?}", idx, state.location); + println!( + "uploading part: {}, location: {:?}, size: {}", + idx, state.location, len + ); Box::pin(async move { let part = state .client @@ -345,8 +349,8 @@ impl MultipartUpload for S3MultiPartUpload { .await?; state.parts.put(idx, part); println!( - "uploaded part: {}, location: {:?}, upload_id: {}", - idx, state.location, state.upload_id + "uploaded part: {}, location: {:?}, upload_id: {}, size: {}", + idx, state.location, state.upload_id, len ); Ok(()) }) From 2b306cad2c8434b6b9c9e4298a44e11b38d7ee9b Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 15 Oct 2024 11:44:49 +0200 Subject: [PATCH 04/19] sort by size too --- object_store/src/aws/client.rs | 3 +- object_store/src/aws/mod.rs | 4 +-- object_store/src/azure/client.rs | 3 +- object_store/src/client/parts.rs | 54 +++++++++++++++++++++++++++++++- object_store/src/gcp/client.rs | 2 ++ object_store/src/memory.rs | 2 ++ object_store/src/multipart.rs | 2 ++ 7 files changed, 65 insertions(+), 5 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index ab4da86f504b..0e1ca67c422e 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -561,6 +561,7 @@ impl S3Client { data: PutPayload, ) -> Result { let part = (part_idx + 1).to_string(); + let size = data.content_length(); let response = self .request(Method::PUT, path) @@ -571,7 +572,7 @@ impl S3Client { .await?; let content_id = get_etag(response.headers()).context(MetadataSnafu)?; - Ok(PartId { content_id }) + Ok(PartId { content_id, size }) } pub async fn complete_multipart( diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index f9ee83b87294..7ef295b02e59 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -339,8 +339,8 @@ impl MultipartUpload for S3MultiPartUpload { self.part_idx += 1; let state = Arc::clone(&self.state); println!( - "uploading part: {}, location: {:?}, size: {}", - idx, state.location, len + "uploading part: {}, location: {:?}, size: {}, upload_id: {}", + idx, state.location, len, state.upload_id ); Box::pin(async move { let part = state diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index b5e82c2a8585..1d962e5c624e 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -312,6 +312,7 @@ impl AzureClient { ) -> Result { let content_id = format!("{part_idx:20}"); let block_id = BASE64_STANDARD.encode(&content_id); + let size = payload.content_length(); self.put_request(path, payload) .query(&[("comp", "block"), ("blockid", &block_id)]) @@ -319,7 +320,7 @@ impl AzureClient { .send() .await?; - Ok(PartId { content_id }) + Ok(PartId { content_id, size }) } /// PUT a block list diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs index 9fc301edcf81..b6f603f0d20e 100644 --- a/object_store/src/client/parts.rs +++ b/object_store/src/client/parts.rs @@ -42,7 +42,59 @@ impl Parts { source: "Missing part".to_string().into(), }); } - parts.sort_unstable_by_key(|(idx, _)| *idx); + sort(&mut parts); Ok(parts.drain(..).map(|(_, v)| v).collect()) } } + +fn sort(parts: &mut [(usize, PartId)]) { + parts.sort_unstable_by(|a, b| match (a, b) { + ((idx_a, part_a), (idx_b, part_b)) if part_a.size == part_b.size => idx_a.cmp(idx_b), + ((_, part_a), (_, part_b)) => part_b.size.cmp(&part_a.size), + }); +} + +#[cfg(test)] +mod tests { + use crate::multipart::PartId; + + #[test] + fn test_sort() { + let mut parts = vec![ + ( + 1, + PartId { + content_id: "1".to_string(), + size: 100, + }, + ), + ( + 2, + PartId { + content_id: "2".to_string(), + size: 50, + }, + ), + ( + 3, + PartId { + content_id: "3".to_string(), + size: 100, + }, + ), + ( + 4, + PartId { + content_id: "4".to_string(), + size: 100, + }, + ), + ]; + super::sort(&mut parts); + + assert_eq!(parts[0].1.content_id, "1"); + assert_eq!(parts[1].1.content_id, "3"); + assert_eq!(parts[2].1.content_id, "4"); + assert_eq!(parts[3].1.content_id, "2"); + } +} diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 0045383ee000..9d8ea6214c61 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -409,6 +409,7 @@ impl GoogleCloudStorageClient { ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), ]; + let size = data.content_length(); let result = self .request(Method::PUT, path) .with_payload(data) @@ -419,6 +420,7 @@ impl GoogleCloudStorageClient { Ok(PartId { content_id: result.e_tag.unwrap(), + size, }) } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 0d72983b0495..61ad219cba59 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -416,12 +416,14 @@ impl MultipartStore for InMemory { ) -> Result { let mut storage = self.storage.write(); let upload = storage.upload_mut(id)?; + let size = payload.content_length(); if part_idx <= upload.parts.len() { upload.parts.resize(part_idx + 1, None); } upload.parts[part_idx] = Some(payload.into()); Ok(PartId { content_id: Default::default(), + size, }) } diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index d94e7f150513..f08b752a9df6 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -31,6 +31,8 @@ use crate::{MultipartId, PutPayload, PutResult, Result}; pub struct PartId { /// Id of this part pub content_id: String, + /// Size of this part + pub size: usize, } /// A low-level interface for interacting with multipart upload APIs From 083ebe8ee2a188eead86c2eec33cc8df5d78ae3c Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 15 Oct 2024 12:13:21 +0200 Subject: [PATCH 05/19] sort parts --- object_store/src/aws/mod.rs | 52 +++++++++++++++++++++++- object_store/src/client/parts.rs | 70 ++++++++------------------------ 2 files changed, 68 insertions(+), 54 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 7ef295b02e59..5227f8276c9e 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -33,6 +33,7 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; +use std::cmp::Ordering; use std::{sync::Arc, time::Duration}; use url::Url; @@ -357,7 +358,7 @@ impl MultipartUpload for S3MultiPartUpload { } async fn complete(&mut self) -> Result { - let parts = self.state.parts.finish(self.part_idx)?; + let parts = self.state.parts.finish_sorted_by(self.part_idx, sort)?; println!( "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", self.state.upload_id, self.part_idx, self.state.location, parts @@ -423,6 +424,15 @@ impl MultipartStore for AmazonS3 { } } +// smaller parts last, everything else by index +fn sort(a: &(usize, PartId), b: &(usize, PartId)) -> Ordering { + if a.1.size == b.1.size { + a.0.cmp(&b.0) + } else { + b.1.size.cmp(&a.1.size) + } +} + #[cfg(test)] mod tests { use super::*; @@ -622,4 +632,44 @@ mod tests { store.delete(location).await.unwrap(); } } + + #[test] + fn test_sort() { + let mut parts = [ + ( + 1, + PartId { + content_id: "1".to_string(), + size: 100, + }, + ), + ( + 2, + PartId { + content_id: "2".to_string(), + size: 50, + }, + ), + ( + 3, + PartId { + content_id: "3".to_string(), + size: 100, + }, + ), + ( + 4, + PartId { + content_id: "4".to_string(), + size: 100, + }, + ), + ]; + parts.sort_unstable_by(sort); + + assert_eq!(parts[0].1.content_id, "1"); + assert_eq!(parts[1].1.content_id, "3"); + assert_eq!(parts[2].1.content_id, "4"); + assert_eq!(parts[3].1.content_id, "2"); + } } diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs index b6f603f0d20e..3553f6eb8353 100644 --- a/object_store/src/client/parts.rs +++ b/object_store/src/client/parts.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::Ordering; + use crate::multipart::PartId; use parking_lot::Mutex; @@ -35,6 +37,20 @@ impl Parts { /// /// `expected` is the number of parts expected in the final result pub(crate) fn finish(&self, expected: usize) -> crate::Result> { + self.finish_sorted_by(expected, |(idx_a, _), (idx_b, _)| idx_a.cmp(idx_b)) + } + + /// Produce the final list of [`PartId`] ordered by sorting function + /// + /// `expected` is the number of parts expected in the final result + pub(crate) fn finish_sorted_by( + &self, + expected: usize, + sort_by: F, + ) -> crate::Result> + where + F: FnMut(&(usize, PartId), &(usize, PartId)) -> Ordering, + { let mut parts = self.0.lock(); if parts.len() != expected { return Err(crate::Error::Generic { @@ -42,59 +58,7 @@ impl Parts { source: "Missing part".to_string().into(), }); } - sort(&mut parts); + parts.sort_unstable_by(sort_by); Ok(parts.drain(..).map(|(_, v)| v).collect()) } } - -fn sort(parts: &mut [(usize, PartId)]) { - parts.sort_unstable_by(|a, b| match (a, b) { - ((idx_a, part_a), (idx_b, part_b)) if part_a.size == part_b.size => idx_a.cmp(idx_b), - ((_, part_a), (_, part_b)) => part_b.size.cmp(&part_a.size), - }); -} - -#[cfg(test)] -mod tests { - use crate::multipart::PartId; - - #[test] - fn test_sort() { - let mut parts = vec![ - ( - 1, - PartId { - content_id: "1".to_string(), - size: 100, - }, - ), - ( - 2, - PartId { - content_id: "2".to_string(), - size: 50, - }, - ), - ( - 3, - PartId { - content_id: "3".to_string(), - size: 100, - }, - ), - ( - 4, - PartId { - content_id: "4".to_string(), - size: 100, - }, - ), - ]; - super::sort(&mut parts); - - assert_eq!(parts[0].1.content_id, "1"); - assert_eq!(parts[1].1.content_id, "3"); - assert_eq!(parts[2].1.content_id, "4"); - assert_eq!(parts[3].1.content_id, "2"); - } -} From b69b1ca82333d51860dd5e8bf97d53a867f34cc3 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 16 Oct 2024 10:24:20 +0200 Subject: [PATCH 06/19] more printlns --- object_store/src/buffered.rs | 2 ++ object_store/src/upload.rs | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index fcd7e064e7c1..a1bd0475ed11 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -363,6 +363,7 @@ impl AsyncWrite for BufWriter { ) -> Poll> { let cap = self.capacity; let max_concurrency = self.max_concurrency; + println!("BufWriter::poll_write writing chunk of {} bytes", buf.len()); loop { return match &mut self.state { BufWriterState::Write(Some(write)) => { @@ -390,6 +391,7 @@ impl AsyncWrite for BufWriter { let upload = store.put_multipart_opts(&path, opts).await?; let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap); for chunk in buffer.freeze() { + println!("putting {} bytes", chunk.len()); chunked.put(chunk); } Ok(chunked) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 4df4d8fd46ad..9b519f00ad31 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -176,6 +176,10 @@ impl WriteMultipart { /// Back pressure can optionally be applied to producers by calling /// [`Self::wait_for_capacity`] prior to calling this method pub fn write(&mut self, mut buf: &[u8]) { + println!( + "WriteMultipart::write writting chunk of {} bytes", + buf.len() + ); while !buf.is_empty() { let remaining = self.chunk_size - self.buffer.content_length(); let to_read = buf.len().min(remaining); @@ -196,6 +200,7 @@ impl WriteMultipart { /// /// See [`Self::write`] for information on backpressure pub fn put(&mut self, mut bytes: Bytes) { + println!("WriteMultipart::put putting chunk of {} bytes", bytes.len()); while !bytes.is_empty() { let remaining = self.chunk_size - self.buffer.content_length(); if bytes.len() < remaining { @@ -221,6 +226,10 @@ impl WriteMultipart { /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(mut self) -> Result { if !self.buffer.is_empty() { + println!( + "WriteMultipart::finish: flushing final chunk of {} bytes", + self.buffer.content_length() + ); let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) } From 5e2c81c01c8e91c1fd037bb1df58926c47241dbb Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 16 Oct 2024 13:29:17 +0200 Subject: [PATCH 07/19] small tweaks --- object_store/src/aws/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 5227f8276c9e..8831a16e2877 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -335,9 +335,9 @@ struct UploadState { #[async_trait] impl MultipartUpload for S3MultiPartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { - let len = data.content_length(); let idx = self.part_idx; self.part_idx += 1; + let len = data.content_length(); let state = Arc::clone(&self.state); println!( "uploading part: {}, location: {:?}, size: {}, upload_id: {}", @@ -358,7 +358,7 @@ impl MultipartUpload for S3MultiPartUpload { } async fn complete(&mut self) -> Result { - let parts = self.state.parts.finish_sorted_by(self.part_idx, sort)?; + let parts = self.state.parts.finish(self.part_idx)?; println!( "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", self.state.upload_id, self.part_idx, self.state.location, parts @@ -424,6 +424,7 @@ impl MultipartStore for AmazonS3 { } } +#[allow(unused)] // smaller parts last, everything else by index fn sort(a: &(usize, PartId), b: &(usize, PartId)) -> Ordering { if a.1.size == b.1.size { From 58c9e6e685fd0354db4784a33163655c45858c10 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 16 Oct 2024 14:50:35 +0200 Subject: [PATCH 08/19] handle indexes outsize of the upload --- object_store/src/aws/mod.rs | 20 ++++++++------------ object_store/src/azure/mod.rs | 10 +++------- object_store/src/gcp/mod.rs | 10 +++------- object_store/src/integration.rs | 10 +++++----- object_store/src/lib.rs | 4 ++-- object_store/src/limit.rs | 8 ++++---- object_store/src/local.rs | 16 ++++++++-------- object_store/src/memory.rs | 4 ++-- object_store/src/throttle.rs | 8 ++++---- object_store/src/upload.rs | 25 +++++++++++++++---------- 10 files changed, 54 insertions(+), 61 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 8831a16e2877..e8a56830b9ee 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -216,7 +216,6 @@ impl ObjectStore for AmazonS3 { let upload_id = self.client.create_multipart(location, opts).await?; Ok(Box::new(S3MultiPartUpload { - part_idx: 0, state: Arc::new(UploadState { client: Arc::clone(&self.client), location: location.clone(), @@ -320,7 +319,6 @@ impl ObjectStore for AmazonS3 { #[derive(Debug)] struct S3MultiPartUpload { - part_idx: usize, state: Arc, } @@ -334,9 +332,7 @@ struct UploadState { #[async_trait] impl MultipartUpload for S3MultiPartUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { - let idx = self.part_idx; - self.part_idx += 1; + fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { let len = data.content_length(); let state = Arc::clone(&self.state); println!( @@ -357,11 +353,11 @@ impl MultipartUpload for S3MultiPartUpload { }) } - async fn complete(&mut self) -> Result { - let parts = self.state.parts.finish(self.part_idx)?; + async fn complete(&mut self, idx: usize) -> Result { + let parts = self.state.parts.finish(idx)?; println!( "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", - self.state.upload_id, self.part_idx, self.state.location, parts + self.state.upload_id, idx, self.state.location, parts ); self.state @@ -372,8 +368,8 @@ impl MultipartUpload for S3MultiPartUpload { async fn abort(&mut self) -> Result<()> { println!( - "aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}", - self.state.upload_id, self.part_idx, self.state.location + "aborting multipart upload, upload_id: {}, location: {:?}", + self.state.upload_id, self.state.location ); self.state .client @@ -613,8 +609,8 @@ mod tests { store.copy(&locations[0], &locations[1]).await.unwrap(); let mut upload = store.put_multipart(&locations[2]).await.unwrap(); - upload.put_part(data.clone()).await.unwrap(); - upload.complete().await.unwrap(); + upload.put_part(1, data.clone()).await.unwrap(); + upload.complete(1).await.unwrap(); for location in &locations { let res = store diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index f89a184f9523..e57b51f9486e 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -101,7 +101,6 @@ impl ObjectStore for MicrosoftAzure { opts: PutMultipartOpts, ) -> Result> { Ok(Box::new(AzureMultiPartUpload { - part_idx: 0, opts, state: Arc::new(UploadState { client: Arc::clone(&self.client), @@ -199,7 +198,6 @@ impl Signer for MicrosoftAzure { /// abort -> No equivalent; blocks are simply dropped after 7 days #[derive(Debug)] struct AzureMultiPartUpload { - part_idx: usize, state: Arc, opts: PutMultipartOpts, } @@ -213,9 +211,7 @@ struct UploadState { #[async_trait] impl MultipartUpload for AzureMultiPartUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { - let idx = self.part_idx; - self.part_idx += 1; + fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { let state = Arc::clone(&self.state); Box::pin(async move { let part = state.client.put_block(&state.location, idx, data).await?; @@ -224,8 +220,8 @@ impl MultipartUpload for AzureMultiPartUpload { }) } - async fn complete(&mut self) -> Result { - let parts = self.state.parts.finish(self.part_idx)?; + async fn complete(&mut self, idx: usize) -> Result { + let parts = self.state.parts.finish(idx)?; self.state .client diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 039ec46b68c2..5b483e803ef8 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -102,7 +102,6 @@ impl GoogleCloudStorage { #[derive(Debug)] struct GCSMultipartUpload { state: Arc, - part_idx: usize, } #[derive(Debug)] @@ -115,9 +114,7 @@ struct UploadState { #[async_trait] impl MultipartUpload for GCSMultipartUpload { - fn put_part(&mut self, payload: PutPayload) -> UploadPart { - let idx = self.part_idx; - self.part_idx += 1; + fn put_part(&mut self, idx: usize, payload: PutPayload) -> UploadPart { let state = Arc::clone(&self.state); Box::pin(async move { let part = state @@ -129,8 +126,8 @@ impl MultipartUpload for GCSMultipartUpload { }) } - async fn complete(&mut self) -> Result { - let parts = self.state.parts.finish(self.part_idx)?; + async fn complete(&mut self, idx: usize) -> Result { + let parts = self.state.parts.finish(idx)?; self.state .client @@ -165,7 +162,6 @@ impl ObjectStore for GoogleCloudStorage { let upload_id = self.client.multipart_initiate(location, opts).await?; Ok(Box::new(GCSMultipartUpload { - part_idx: 0, state: Arc::new(UploadState { client: Arc::clone(&self.client), path: location.clone(), diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 89b21bc61696..5634e13926a3 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -497,8 +497,8 @@ pub async fn put_get_attributes(integration: &dyn ObjectStore) { let opts = attributes.clone().into(); match integration.put_multipart_opts(&path, opts).await { Ok(mut w) => { - w.put_part("foo".into()).await.unwrap(); - w.complete().await.unwrap(); + w.put_part(0, "foo".into()).await.unwrap(); + w.complete(1).await.unwrap(); let r = integration.get(&path).await.unwrap(); assert_eq!(r.attributes, attributes); @@ -755,7 +755,7 @@ pub async fn stream_get(storage: &DynObjectStore) { let data = get_chunks(5 * 1024 * 1024, 3); let bytes_expected = data.concat(); let mut upload = storage.put_multipart(&location).await.unwrap(); - let uploads = data.into_iter().map(|x| upload.put_part(x.into())); + let uploads = data.into_iter().map(|x| upload.put_part(0, x.into())); futures::future::try_join_all(uploads).await.unwrap(); // Object should not yet exist in store @@ -772,7 +772,7 @@ pub async fn stream_get(storage: &DynObjectStore) { let result = storage.list_with_delimiter(None).await.unwrap(); assert_eq!(&result.objects, &[]); - upload.complete().await.unwrap(); + upload.complete(1).await.unwrap(); let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); @@ -826,7 +826,7 @@ pub async fn stream_get(storage: &DynObjectStore) { // We can abort an in-progress write let mut upload = storage.put_multipart(&location).await.unwrap(); upload - .put_part(data.first().unwrap().clone().into()) + .put_part(0, data.first().unwrap().clone().into()) .await .unwrap(); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index efbfe0bd4763..6abf342b4511 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1391,8 +1391,8 @@ mod tests { .await .unwrap(); - write.put_part("foo".into()).await.unwrap(); - write.complete().await.unwrap(); + write.put_part(0, "foo".into()).await.unwrap(); + write.complete(1).await.unwrap(); let buf_path = Path::from("tag_test_buf"); let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set); diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 64b96ad1a96c..311a8213bb31 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -251,8 +251,8 @@ impl LimitUpload { #[async_trait] impl MultipartUpload for LimitUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { - let upload = self.upload.put_part(data); + fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { + let upload = self.upload.put_part(idx, data); let s = Arc::clone(&self.semaphore); Box::pin(async move { let _permit = s.acquire().await.unwrap(); @@ -260,9 +260,9 @@ impl MultipartUpload for LimitUpload { }) } - async fn complete(&mut self) -> Result { + async fn complete(&mut self, num_parts: usize) -> Result { let _permit = self.semaphore.acquire().await.unwrap(); - self.upload.complete().await + self.upload.complete(num_parts).await } async fn abort(&mut self) -> Result<()> { diff --git a/object_store/src/local.rs b/object_store/src/local.rs index db4b4b05031e..42df6fae9c9b 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -790,7 +790,7 @@ impl LocalUpload { #[async_trait] impl MultipartUpload for LocalUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { + fn put_part(&mut self, _idx: usize, data: PutPayload) -> UploadPart { let offset = self.offset; self.offset += data.content_length() as u64; @@ -809,7 +809,7 @@ impl MultipartUpload for LocalUpload { .boxed() } - async fn complete(&mut self) -> Result { + async fn complete(&mut self, _num_parts: usize) -> Result { let src = self.src.take().context(AbortedSnafu)?; let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { @@ -1097,9 +1097,9 @@ mod tests { // Can't use stream_get test as WriteMultipart uses a tokio JoinSet let p = Path::from("manual_upload"); let mut upload = integration.put_multipart(&p).await.unwrap(); - upload.put_part("123".into()).await.unwrap(); - upload.put_part("45678".into()).await.unwrap(); - let r = upload.complete().await.unwrap(); + upload.put_part(0, "123".into()).await.unwrap(); + upload.put_part(1, "45678".into()).await.unwrap(); + let r = upload.complete(2).await.unwrap(); let get = integration.get(&p).await.unwrap(); assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap()); @@ -1406,10 +1406,10 @@ mod tests { let data = PutPayload::from("arbitrary data"); let mut u1 = integration.put_multipart(&location).await.unwrap(); - u1.put_part(data.clone()).await.unwrap(); + u1.put_part(0, data.clone()).await.unwrap(); let mut u2 = integration.put_multipart(&location).await.unwrap(); - u2.put_part(data).await.unwrap(); + u2.put_part(1, data).await.unwrap(); let list = flatten_list_stream(&integration, None).await.unwrap(); assert_eq!(list.len(), 0); @@ -1566,7 +1566,7 @@ mod not_wasm_tests { let location = Path::from("some_file"); let data = PutPayload::from_static(b"hello"); let mut upload = integration.put_multipart(&location).await.unwrap(); - upload.put_part(data).await.unwrap(); + upload.put_part(0, data).await.unwrap(); let file_count = std::fs::read_dir(root.path()).unwrap().count(); assert_eq!(file_count, 1); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 61ad219cba59..f27835f5f18d 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -501,12 +501,12 @@ struct InMemoryUpload { #[async_trait] impl MultipartUpload for InMemoryUpload { - fn put_part(&mut self, payload: PutPayload) -> UploadPart { + fn put_part(&mut self, _idx: usize, payload: PutPayload) -> UploadPart { self.parts.push(payload); Box::pin(futures::future::ready(Ok(()))) } - async fn complete(&mut self) -> Result { + async fn complete(&mut self, _num_parts: usize) -> Result { let cap = self.parts.iter().map(|x| x.content_length()).sum(); let mut buf = Vec::with_capacity(cap); let parts = self.parts.iter().flatten(); diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index d07276c3dcad..04829c576d25 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -377,17 +377,17 @@ struct ThrottledUpload { #[async_trait] impl MultipartUpload for ThrottledUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { + fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { let duration = self.sleep; - let put = self.upload.put_part(data); + let put = self.upload.put_part(idx, data); Box::pin(async move { sleep(duration).await; put.await }) } - async fn complete(&mut self) -> Result { - self.upload.complete().await + async fn complete(&mut self, num_parts: usize) -> Result { + self.upload.complete(num_parts).await } async fn abort(&mut self) -> Result<()> { diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 9b519f00ad31..c9b0686a6c55 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -64,7 +64,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// ``` /// /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations - fn put_part(&mut self, data: PutPayload) -> UploadPart; + fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart; /// Complete the multipart upload /// @@ -72,7 +72,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally, /// it is implementation defined behaviour to call [`MultipartUpload::complete`] /// on an already completed or aborted [`MultipartUpload`]. - async fn complete(&mut self) -> Result; + async fn complete(&mut self, num_parts: usize) -> Result; /// Abort the multipart upload /// @@ -94,12 +94,12 @@ pub trait MultipartUpload: Send + std::fmt::Debug { #[async_trait] impl MultipartUpload for Box { - fn put_part(&mut self, data: PutPayload) -> UploadPart { - (**self).put_part(data) + fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { + (**self).put_part(idx, data) } - async fn complete(&mut self) -> Result { - (**self).complete().await + async fn complete(&mut self, num_parts: usize) -> Result { + (**self).complete(num_parts).await } async fn abort(&mut self) -> Result<()> { @@ -118,6 +118,8 @@ impl MultipartUpload for Box { /// [`Sink`]: futures::sink::Sink #[derive(Debug)] pub struct WriteMultipart { + idx: usize, + upload: Box, buffer: PutPayloadMut, @@ -136,6 +138,7 @@ impl WriteMultipart { /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks pub fn new_with_chunk_size(upload: Box, chunk_size: usize) -> Self { Self { + idx: 0, upload, chunk_size, buffer: PutPayloadMut::new(), @@ -214,7 +217,9 @@ impl WriteMultipart { } pub(crate) fn put_part(&mut self, part: PutPayload) { - self.tasks.spawn(self.upload.put_part(part)); + let idx = self.idx; + self.idx += 1; + self.tasks.spawn(self.upload.put_part(idx, part)); } /// Abort this upload, attempting to clean up any successfully uploaded parts @@ -236,7 +241,7 @@ impl WriteMultipart { self.wait_for_capacity(0).await?; - match self.upload.complete().await { + match self.upload.complete(self.idx).await { Err(e) => { self.tasks.shutdown().await; self.upload.abort().await?; @@ -290,12 +295,12 @@ mod tests { #[async_trait] impl MultipartUpload for InstrumentedUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { + fn put_part(&mut self, _idx: usize, data: PutPayload) -> UploadPart { self.chunks.lock().push(data); futures::future::ready(Ok(())).boxed() } - async fn complete(&mut self) -> Result { + async fn complete(&mut self, _idx: usize) -> Result { Ok(PutResult { e_tag: None, version: None, From 45101d392acbf1320694fa28b2a043b7ae63cf86 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 16 Oct 2024 14:55:36 +0200 Subject: [PATCH 09/19] remove size --- object_store/src/aws/client.rs | 3 +- object_store/src/aws/mod.rs | 51 -------------------------------- object_store/src/azure/client.rs | 3 +- object_store/src/client/parts.rs | 18 +---------- object_store/src/gcp/client.rs | 2 -- object_store/src/memory.rs | 2 -- object_store/src/multipart.rs | 2 -- 7 files changed, 3 insertions(+), 78 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 0e1ca67c422e..ab4da86f504b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -561,7 +561,6 @@ impl S3Client { data: PutPayload, ) -> Result { let part = (part_idx + 1).to_string(); - let size = data.content_length(); let response = self .request(Method::PUT, path) @@ -572,7 +571,7 @@ impl S3Client { .await?; let content_id = get_etag(response.headers()).context(MetadataSnafu)?; - Ok(PartId { content_id, size }) + Ok(PartId { content_id }) } pub async fn complete_multipart( diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index e8a56830b9ee..518b68380557 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -33,7 +33,6 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; -use std::cmp::Ordering; use std::{sync::Arc, time::Duration}; use url::Url; @@ -420,16 +419,6 @@ impl MultipartStore for AmazonS3 { } } -#[allow(unused)] -// smaller parts last, everything else by index -fn sort(a: &(usize, PartId), b: &(usize, PartId)) -> Ordering { - if a.1.size == b.1.size { - a.0.cmp(&b.0) - } else { - b.1.size.cmp(&a.1.size) - } -} - #[cfg(test)] mod tests { use super::*; @@ -629,44 +618,4 @@ mod tests { store.delete(location).await.unwrap(); } } - - #[test] - fn test_sort() { - let mut parts = [ - ( - 1, - PartId { - content_id: "1".to_string(), - size: 100, - }, - ), - ( - 2, - PartId { - content_id: "2".to_string(), - size: 50, - }, - ), - ( - 3, - PartId { - content_id: "3".to_string(), - size: 100, - }, - ), - ( - 4, - PartId { - content_id: "4".to_string(), - size: 100, - }, - ), - ]; - parts.sort_unstable_by(sort); - - assert_eq!(parts[0].1.content_id, "1"); - assert_eq!(parts[1].1.content_id, "3"); - assert_eq!(parts[2].1.content_id, "4"); - assert_eq!(parts[3].1.content_id, "2"); - } } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 1d962e5c624e..b5e82c2a8585 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -312,7 +312,6 @@ impl AzureClient { ) -> Result { let content_id = format!("{part_idx:20}"); let block_id = BASE64_STANDARD.encode(&content_id); - let size = payload.content_length(); self.put_request(path, payload) .query(&[("comp", "block"), ("blockid", &block_id)]) @@ -320,7 +319,7 @@ impl AzureClient { .send() .await?; - Ok(PartId { content_id, size }) + Ok(PartId { content_id }) } /// PUT a block list diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs index 3553f6eb8353..9fc301edcf81 100644 --- a/object_store/src/client/parts.rs +++ b/object_store/src/client/parts.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::Ordering; - use crate::multipart::PartId; use parking_lot::Mutex; @@ -37,20 +35,6 @@ impl Parts { /// /// `expected` is the number of parts expected in the final result pub(crate) fn finish(&self, expected: usize) -> crate::Result> { - self.finish_sorted_by(expected, |(idx_a, _), (idx_b, _)| idx_a.cmp(idx_b)) - } - - /// Produce the final list of [`PartId`] ordered by sorting function - /// - /// `expected` is the number of parts expected in the final result - pub(crate) fn finish_sorted_by( - &self, - expected: usize, - sort_by: F, - ) -> crate::Result> - where - F: FnMut(&(usize, PartId), &(usize, PartId)) -> Ordering, - { let mut parts = self.0.lock(); if parts.len() != expected { return Err(crate::Error::Generic { @@ -58,7 +42,7 @@ impl Parts { source: "Missing part".to_string().into(), }); } - parts.sort_unstable_by(sort_by); + parts.sort_unstable_by_key(|(idx, _)| *idx); Ok(parts.drain(..).map(|(_, v)| v).collect()) } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 9d8ea6214c61..0045383ee000 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -409,7 +409,6 @@ impl GoogleCloudStorageClient { ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), ]; - let size = data.content_length(); let result = self .request(Method::PUT, path) .with_payload(data) @@ -420,7 +419,6 @@ impl GoogleCloudStorageClient { Ok(PartId { content_id: result.e_tag.unwrap(), - size, }) } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index f27835f5f18d..09baf3ebfc73 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -416,14 +416,12 @@ impl MultipartStore for InMemory { ) -> Result { let mut storage = self.storage.write(); let upload = storage.upload_mut(id)?; - let size = payload.content_length(); if part_idx <= upload.parts.len() { upload.parts.resize(part_idx + 1, None); } upload.parts[part_idx] = Some(payload.into()); Ok(PartId { content_id: Default::default(), - size, }) } diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index f08b752a9df6..d94e7f150513 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -31,8 +31,6 @@ use crate::{MultipartId, PutPayload, PutResult, Result}; pub struct PartId { /// Id of this part pub content_id: String, - /// Size of this part - pub size: usize, } /// A low-level interface for interacting with multipart upload APIs From f1e2968c1c5bb9773b32be2a5cbb742f62e9e007 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 16 Oct 2024 15:07:55 +0200 Subject: [PATCH 10/19] naming --- object_store/src/aws/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 518b68380557..01e4e21d795f 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -352,11 +352,11 @@ impl MultipartUpload for S3MultiPartUpload { }) } - async fn complete(&mut self, idx: usize) -> Result { - let parts = self.state.parts.finish(idx)?; + async fn complete(&mut self, num_parts: usize) -> Result { + let parts = self.state.parts.finish(num_parts)?; println!( "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", - self.state.upload_id, idx, self.state.location, parts + self.state.upload_id, num_parts, self.state.location, parts ); self.state From 4ba38e63c2c4eb5aef4fad2b4c8f590ef7495cc5 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 12:27:08 +0200 Subject: [PATCH 11/19] more logs --- object_store/src/buffered.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index a1bd0475ed11..909c85c17c12 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -419,6 +419,7 @@ impl AsyncWrite for BufWriter { } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + println!("BufWriter::poll_shutdown"); loop { match &mut self.state { BufWriterState::Prepare(f) => { From 2960f04a389665806fe88b1bf2694b562dab17dc Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 13:18:41 +0200 Subject: [PATCH 12/19] Revert "handle indexes outsize of the upload" This reverts commit 58c9e6e685fd0354db4784a33163655c45858c10. --- object_store/src/aws/mod.rs | 20 ++++++++++++-------- object_store/src/azure/mod.rs | 10 +++++++--- object_store/src/gcp/mod.rs | 10 +++++++--- object_store/src/integration.rs | 10 +++++----- object_store/src/lib.rs | 4 ++-- object_store/src/limit.rs | 8 ++++---- object_store/src/local.rs | 16 ++++++++-------- object_store/src/memory.rs | 4 ++-- object_store/src/throttle.rs | 8 ++++---- object_store/src/upload.rs | 25 ++++++++++--------------- 10 files changed, 61 insertions(+), 54 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 01e4e21d795f..dfbd1a72453a 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -215,6 +215,7 @@ impl ObjectStore for AmazonS3 { let upload_id = self.client.create_multipart(location, opts).await?; Ok(Box::new(S3MultiPartUpload { + part_idx: 0, state: Arc::new(UploadState { client: Arc::clone(&self.client), location: location.clone(), @@ -318,6 +319,7 @@ impl ObjectStore for AmazonS3 { #[derive(Debug)] struct S3MultiPartUpload { + part_idx: usize, state: Arc, } @@ -331,7 +333,9 @@ struct UploadState { #[async_trait] impl MultipartUpload for S3MultiPartUpload { - fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; let len = data.content_length(); let state = Arc::clone(&self.state); println!( @@ -352,11 +356,11 @@ impl MultipartUpload for S3MultiPartUpload { }) } - async fn complete(&mut self, num_parts: usize) -> Result { - let parts = self.state.parts.finish(num_parts)?; + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; println!( "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", - self.state.upload_id, num_parts, self.state.location, parts + self.state.upload_id, self.part_idx, self.state.location, parts ); self.state @@ -367,8 +371,8 @@ impl MultipartUpload for S3MultiPartUpload { async fn abort(&mut self) -> Result<()> { println!( - "aborting multipart upload, upload_id: {}, location: {:?}", - self.state.upload_id, self.state.location + "aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}", + self.state.upload_id, self.part_idx, self.state.location ); self.state .client @@ -598,8 +602,8 @@ mod tests { store.copy(&locations[0], &locations[1]).await.unwrap(); let mut upload = store.put_multipart(&locations[2]).await.unwrap(); - upload.put_part(1, data.clone()).await.unwrap(); - upload.complete(1).await.unwrap(); + upload.put_part(data.clone()).await.unwrap(); + upload.complete().await.unwrap(); for location in &locations { let res = store diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index e57b51f9486e..f89a184f9523 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -101,6 +101,7 @@ impl ObjectStore for MicrosoftAzure { opts: PutMultipartOpts, ) -> Result> { Ok(Box::new(AzureMultiPartUpload { + part_idx: 0, opts, state: Arc::new(UploadState { client: Arc::clone(&self.client), @@ -198,6 +199,7 @@ impl Signer for MicrosoftAzure { /// abort -> No equivalent; blocks are simply dropped after 7 days #[derive(Debug)] struct AzureMultiPartUpload { + part_idx: usize, state: Arc, opts: PutMultipartOpts, } @@ -211,7 +213,9 @@ struct UploadState { #[async_trait] impl MultipartUpload for AzureMultiPartUpload { - fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; let state = Arc::clone(&self.state); Box::pin(async move { let part = state.client.put_block(&state.location, idx, data).await?; @@ -220,8 +224,8 @@ impl MultipartUpload for AzureMultiPartUpload { }) } - async fn complete(&mut self, idx: usize) -> Result { - let parts = self.state.parts.finish(idx)?; + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; self.state .client diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 5b483e803ef8..039ec46b68c2 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -102,6 +102,7 @@ impl GoogleCloudStorage { #[derive(Debug)] struct GCSMultipartUpload { state: Arc, + part_idx: usize, } #[derive(Debug)] @@ -114,7 +115,9 @@ struct UploadState { #[async_trait] impl MultipartUpload for GCSMultipartUpload { - fn put_part(&mut self, idx: usize, payload: PutPayload) -> UploadPart { + fn put_part(&mut self, payload: PutPayload) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; let state = Arc::clone(&self.state); Box::pin(async move { let part = state @@ -126,8 +129,8 @@ impl MultipartUpload for GCSMultipartUpload { }) } - async fn complete(&mut self, idx: usize) -> Result { - let parts = self.state.parts.finish(idx)?; + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; self.state .client @@ -162,6 +165,7 @@ impl ObjectStore for GoogleCloudStorage { let upload_id = self.client.multipart_initiate(location, opts).await?; Ok(Box::new(GCSMultipartUpload { + part_idx: 0, state: Arc::new(UploadState { client: Arc::clone(&self.client), path: location.clone(), diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 5634e13926a3..89b21bc61696 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -497,8 +497,8 @@ pub async fn put_get_attributes(integration: &dyn ObjectStore) { let opts = attributes.clone().into(); match integration.put_multipart_opts(&path, opts).await { Ok(mut w) => { - w.put_part(0, "foo".into()).await.unwrap(); - w.complete(1).await.unwrap(); + w.put_part("foo".into()).await.unwrap(); + w.complete().await.unwrap(); let r = integration.get(&path).await.unwrap(); assert_eq!(r.attributes, attributes); @@ -755,7 +755,7 @@ pub async fn stream_get(storage: &DynObjectStore) { let data = get_chunks(5 * 1024 * 1024, 3); let bytes_expected = data.concat(); let mut upload = storage.put_multipart(&location).await.unwrap(); - let uploads = data.into_iter().map(|x| upload.put_part(0, x.into())); + let uploads = data.into_iter().map(|x| upload.put_part(x.into())); futures::future::try_join_all(uploads).await.unwrap(); // Object should not yet exist in store @@ -772,7 +772,7 @@ pub async fn stream_get(storage: &DynObjectStore) { let result = storage.list_with_delimiter(None).await.unwrap(); assert_eq!(&result.objects, &[]); - upload.complete(1).await.unwrap(); + upload.complete().await.unwrap(); let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); @@ -826,7 +826,7 @@ pub async fn stream_get(storage: &DynObjectStore) { // We can abort an in-progress write let mut upload = storage.put_multipart(&location).await.unwrap(); upload - .put_part(0, data.first().unwrap().clone().into()) + .put_part(data.first().unwrap().clone().into()) .await .unwrap(); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 6abf342b4511..efbfe0bd4763 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1391,8 +1391,8 @@ mod tests { .await .unwrap(); - write.put_part(0, "foo".into()).await.unwrap(); - write.complete(1).await.unwrap(); + write.put_part("foo".into()).await.unwrap(); + write.complete().await.unwrap(); let buf_path = Path::from("tag_test_buf"); let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set); diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 311a8213bb31..64b96ad1a96c 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -251,8 +251,8 @@ impl LimitUpload { #[async_trait] impl MultipartUpload for LimitUpload { - fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { - let upload = self.upload.put_part(idx, data); + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let upload = self.upload.put_part(data); let s = Arc::clone(&self.semaphore); Box::pin(async move { let _permit = s.acquire().await.unwrap(); @@ -260,9 +260,9 @@ impl MultipartUpload for LimitUpload { }) } - async fn complete(&mut self, num_parts: usize) -> Result { + async fn complete(&mut self) -> Result { let _permit = self.semaphore.acquire().await.unwrap(); - self.upload.complete(num_parts).await + self.upload.complete().await } async fn abort(&mut self) -> Result<()> { diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 42df6fae9c9b..db4b4b05031e 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -790,7 +790,7 @@ impl LocalUpload { #[async_trait] impl MultipartUpload for LocalUpload { - fn put_part(&mut self, _idx: usize, data: PutPayload) -> UploadPart { + fn put_part(&mut self, data: PutPayload) -> UploadPart { let offset = self.offset; self.offset += data.content_length() as u64; @@ -809,7 +809,7 @@ impl MultipartUpload for LocalUpload { .boxed() } - async fn complete(&mut self, _num_parts: usize) -> Result { + async fn complete(&mut self) -> Result { let src = self.src.take().context(AbortedSnafu)?; let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { @@ -1097,9 +1097,9 @@ mod tests { // Can't use stream_get test as WriteMultipart uses a tokio JoinSet let p = Path::from("manual_upload"); let mut upload = integration.put_multipart(&p).await.unwrap(); - upload.put_part(0, "123".into()).await.unwrap(); - upload.put_part(1, "45678".into()).await.unwrap(); - let r = upload.complete(2).await.unwrap(); + upload.put_part("123".into()).await.unwrap(); + upload.put_part("45678".into()).await.unwrap(); + let r = upload.complete().await.unwrap(); let get = integration.get(&p).await.unwrap(); assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap()); @@ -1406,10 +1406,10 @@ mod tests { let data = PutPayload::from("arbitrary data"); let mut u1 = integration.put_multipart(&location).await.unwrap(); - u1.put_part(0, data.clone()).await.unwrap(); + u1.put_part(data.clone()).await.unwrap(); let mut u2 = integration.put_multipart(&location).await.unwrap(); - u2.put_part(1, data).await.unwrap(); + u2.put_part(data).await.unwrap(); let list = flatten_list_stream(&integration, None).await.unwrap(); assert_eq!(list.len(), 0); @@ -1566,7 +1566,7 @@ mod not_wasm_tests { let location = Path::from("some_file"); let data = PutPayload::from_static(b"hello"); let mut upload = integration.put_multipart(&location).await.unwrap(); - upload.put_part(0, data).await.unwrap(); + upload.put_part(data).await.unwrap(); let file_count = std::fs::read_dir(root.path()).unwrap().count(); assert_eq!(file_count, 1); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 09baf3ebfc73..0d72983b0495 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -499,12 +499,12 @@ struct InMemoryUpload { #[async_trait] impl MultipartUpload for InMemoryUpload { - fn put_part(&mut self, _idx: usize, payload: PutPayload) -> UploadPart { + fn put_part(&mut self, payload: PutPayload) -> UploadPart { self.parts.push(payload); Box::pin(futures::future::ready(Ok(()))) } - async fn complete(&mut self, _num_parts: usize) -> Result { + async fn complete(&mut self) -> Result { let cap = self.parts.iter().map(|x| x.content_length()).sum(); let mut buf = Vec::with_capacity(cap); let parts = self.parts.iter().flatten(); diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 04829c576d25..d07276c3dcad 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -377,17 +377,17 @@ struct ThrottledUpload { #[async_trait] impl MultipartUpload for ThrottledUpload { - fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { + fn put_part(&mut self, data: PutPayload) -> UploadPart { let duration = self.sleep; - let put = self.upload.put_part(idx, data); + let put = self.upload.put_part(data); Box::pin(async move { sleep(duration).await; put.await }) } - async fn complete(&mut self, num_parts: usize) -> Result { - self.upload.complete(num_parts).await + async fn complete(&mut self) -> Result { + self.upload.complete().await } async fn abort(&mut self) -> Result<()> { diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index c9b0686a6c55..9b519f00ad31 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -64,7 +64,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// ``` /// /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations - fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart; + fn put_part(&mut self, data: PutPayload) -> UploadPart; /// Complete the multipart upload /// @@ -72,7 +72,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug { /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally, /// it is implementation defined behaviour to call [`MultipartUpload::complete`] /// on an already completed or aborted [`MultipartUpload`]. - async fn complete(&mut self, num_parts: usize) -> Result; + async fn complete(&mut self) -> Result; /// Abort the multipart upload /// @@ -94,12 +94,12 @@ pub trait MultipartUpload: Send + std::fmt::Debug { #[async_trait] impl MultipartUpload for Box { - fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart { - (**self).put_part(idx, data) + fn put_part(&mut self, data: PutPayload) -> UploadPart { + (**self).put_part(data) } - async fn complete(&mut self, num_parts: usize) -> Result { - (**self).complete(num_parts).await + async fn complete(&mut self) -> Result { + (**self).complete().await } async fn abort(&mut self) -> Result<()> { @@ -118,8 +118,6 @@ impl MultipartUpload for Box { /// [`Sink`]: futures::sink::Sink #[derive(Debug)] pub struct WriteMultipart { - idx: usize, - upload: Box, buffer: PutPayloadMut, @@ -138,7 +136,6 @@ impl WriteMultipart { /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks pub fn new_with_chunk_size(upload: Box, chunk_size: usize) -> Self { Self { - idx: 0, upload, chunk_size, buffer: PutPayloadMut::new(), @@ -217,9 +214,7 @@ impl WriteMultipart { } pub(crate) fn put_part(&mut self, part: PutPayload) { - let idx = self.idx; - self.idx += 1; - self.tasks.spawn(self.upload.put_part(idx, part)); + self.tasks.spawn(self.upload.put_part(part)); } /// Abort this upload, attempting to clean up any successfully uploaded parts @@ -241,7 +236,7 @@ impl WriteMultipart { self.wait_for_capacity(0).await?; - match self.upload.complete(self.idx).await { + match self.upload.complete().await { Err(e) => { self.tasks.shutdown().await; self.upload.abort().await?; @@ -295,12 +290,12 @@ mod tests { #[async_trait] impl MultipartUpload for InstrumentedUpload { - fn put_part(&mut self, _idx: usize, data: PutPayload) -> UploadPart { + fn put_part(&mut self, data: PutPayload) -> UploadPart { self.chunks.lock().push(data); futures::future::ready(Ok(())).boxed() } - async fn complete(&mut self, _idx: usize) -> Result { + async fn complete(&mut self) -> Result { Ok(PutResult { e_tag: None, version: None, From 35bea5c5c227b1dbea94382dabcea16b1aab6242 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 13:26:18 +0200 Subject: [PATCH 13/19] more logs --- object_store/src/upload.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 9b519f00ad31..23c533045ff6 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -154,6 +154,10 @@ impl WriteMultipart { while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency { ready!(self.tasks.poll_join_next(cx)).unwrap()?? } + println!( + "WriteMultipart::poll_for_capacity ready for {}", + max_concurrency + ); Poll::Ready(Ok(())) } @@ -176,10 +180,6 @@ impl WriteMultipart { /// Back pressure can optionally be applied to producers by calling /// [`Self::wait_for_capacity`] prior to calling this method pub fn write(&mut self, mut buf: &[u8]) { - println!( - "WriteMultipart::write writting chunk of {} bytes", - buf.len() - ); while !buf.is_empty() { let remaining = self.chunk_size - self.buffer.content_length(); let to_read = buf.len().min(remaining); @@ -190,6 +190,7 @@ impl WriteMultipart { } buf = &buf[to_read..] } + println!("WriteMultipart::write write chunk of {} bytes", buf.len()); } /// Put a chunk of data into this [`WriteMultipart`] without copying @@ -200,7 +201,6 @@ impl WriteMultipart { /// /// See [`Self::write`] for information on backpressure pub fn put(&mut self, mut bytes: Bytes) { - println!("WriteMultipart::put putting chunk of {} bytes", bytes.len()); while !bytes.is_empty() { let remaining = self.chunk_size - self.buffer.content_length(); if bytes.len() < remaining { @@ -211,10 +211,16 @@ impl WriteMultipart { let buffer = std::mem::take(&mut self.buffer); self.put_part(buffer.into()) } + println!("WriteMultipart::put put chunk of {} bytes", bytes.len()); } pub(crate) fn put_part(&mut self, part: PutPayload) { + let len = part.content_length(); self.tasks.spawn(self.upload.put_part(part)); + println!( + "WriteMultipart::put_part spawned task for part of size: {}", + len + ); } /// Abort this upload, attempting to clean up any successfully uploaded parts @@ -226,12 +232,13 @@ impl WriteMultipart { /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(mut self) -> Result { if !self.buffer.is_empty() { + let len = self.buffer.content_length(); + let part = std::mem::take(&mut self.buffer); + self.put_part(part.into()); println!( "WriteMultipart::finish: flushing final chunk of {} bytes", - self.buffer.content_length() + len ); - let part = std::mem::take(&mut self.buffer); - self.put_part(part.into()) } self.wait_for_capacity(0).await?; @@ -242,7 +249,10 @@ impl WriteMultipart { self.upload.abort().await?; Err(e) } - Ok(result) => Ok(result), + Ok(result) => { + println!("WriteMultipart::finish: done"); + Ok(result) + } } } } From 7b1a1bff57ce543048e7a6bc679ac7cedbe8660f Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 14:31:43 +0200 Subject: [PATCH 14/19] fix log --- object_store/src/upload.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 23c533045ff6..1cea198f7042 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -180,6 +180,7 @@ impl WriteMultipart { /// Back pressure can optionally be applied to producers by calling /// [`Self::wait_for_capacity`] prior to calling this method pub fn write(&mut self, mut buf: &[u8]) { + let len = buf.len(); while !buf.is_empty() { let remaining = self.chunk_size - self.buffer.content_length(); let to_read = buf.len().min(remaining); @@ -190,7 +191,7 @@ impl WriteMultipart { } buf = &buf[to_read..] } - println!("WriteMultipart::write write chunk of {} bytes", buf.len()); + println!("WriteMultipart::write write chunk of {} bytes", len); } /// Put a chunk of data into this [`WriteMultipart`] without copying From ee7c455ef7772f85af55731ca4e8bae9be29d8a3 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 14:45:18 +0200 Subject: [PATCH 15/19] more fixes --- object_store/src/upload.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 1cea198f7042..85c08b6103da 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -202,6 +202,7 @@ impl WriteMultipart { /// /// See [`Self::write`] for information on backpressure pub fn put(&mut self, mut bytes: Bytes) { + let len = bytes.len(); while !bytes.is_empty() { let remaining = self.chunk_size - self.buffer.content_length(); if bytes.len() < remaining { @@ -212,7 +213,7 @@ impl WriteMultipart { let buffer = std::mem::take(&mut self.buffer); self.put_part(buffer.into()) } - println!("WriteMultipart::put put chunk of {} bytes", bytes.len()); + println!("WriteMultipart::put put chunk of {} bytes", len); } pub(crate) fn put_part(&mut self, part: PutPayload) { From ef999c40efc1a33435747f95103a5afe2e4d3e78 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 14:50:08 +0200 Subject: [PATCH 16/19] measure some time too --- object_store/src/aws/mod.rs | 11 ++++++++--- object_store/src/upload.rs | 9 +++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index dfbd1a72453a..349490cfb473 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -33,6 +33,7 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; +use std::time::SystemTime; use std::{sync::Arc, time::Duration}; use url::Url; @@ -336,12 +337,16 @@ impl MultipartUpload for S3MultiPartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { let idx = self.part_idx; self.part_idx += 1; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_micros(); let len = data.content_length(); - let state = Arc::clone(&self.state); println!( - "uploading part: {}, location: {:?}, size: {}, upload_id: {}", - idx, state.location, len, state.upload_id + "uploading part: {}, location: {:?}, size: {}, upload_id: {}, timestamp: {}", + idx, self.state.location, len, self.state.upload_id, now ); + let state = Arc::clone(&self.state); Box::pin(async move { let part = state .client diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 85c08b6103da..0a8147073e73 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -16,6 +16,7 @@ // under the License. use std::task::{Context, Poll}; +use std::time::SystemTime; use crate::{PutPayload, PutPayloadMut, PutResult, Result}; use async_trait::async_trait; @@ -218,10 +219,14 @@ impl WriteMultipart { pub(crate) fn put_part(&mut self, part: PutPayload) { let len = part.content_length(); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_micros(); self.tasks.spawn(self.upload.put_part(part)); println!( - "WriteMultipart::put_part spawned task for part of size: {}", - len + "WriteMultipart::put_part spawned task for part of size: {}, time: {}", + len, now ); } From 572d570066af1ee0dce89ac0b31fce120e474f08 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 17 Oct 2024 15:08:42 +0200 Subject: [PATCH 17/19] calc index before spawn --- object_store/src/upload.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 0a8147073e73..29b143ad4c26 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -223,7 +223,8 @@ impl WriteMultipart { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros(); - self.tasks.spawn(self.upload.put_part(part)); + let fut = self.upload.put_part(part); + self.tasks.spawn(fut); println!( "WriteMultipart::put_part spawned task for part of size: {}, time: {}", len, now From 2bf23ed1cf257b820f4b611d95157ce096c8c877 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Fri, 18 Oct 2024 10:37:18 +0200 Subject: [PATCH 18/19] commit --- object_store/src/aws/mod.rs | 4 ++-- object_store/src/upload.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 349490cfb473..136b1f640d69 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -335,12 +335,12 @@ struct UploadState { #[async_trait] impl MultipartUpload for S3MultiPartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { - let idx = self.part_idx; - self.part_idx += 1; let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros(); + let idx = self.part_idx; + self.part_idx += 1; let len = data.content_length(); println!( "uploading part: {}, location: {:?}, size: {}, upload_id: {}, timestamp: {}", diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 29b143ad4c26..c88e1164da20 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -218,11 +218,11 @@ impl WriteMultipart { } pub(crate) fn put_part(&mut self, part: PutPayload) { - let len = part.content_length(); let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros(); + let len = part.content_length(); let fut = self.upload.put_part(part); self.tasks.spawn(fut); println!( From 4e73e08b3426028d329fd30ee9214260e62ba11f Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Sun, 20 Oct 2024 18:49:16 +0200 Subject: [PATCH 19/19] use atomic --- object_store/src/aws/mod.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 136b1f640d69..9f25ad750182 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -33,6 +33,7 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::SystemTime; use std::{sync::Arc, time::Duration}; use url::Url; @@ -216,7 +217,7 @@ impl ObjectStore for AmazonS3 { let upload_id = self.client.create_multipart(location, opts).await?; Ok(Box::new(S3MultiPartUpload { - part_idx: 0, + part_idx: AtomicUsize::new(0), state: Arc::new(UploadState { client: Arc::clone(&self.client), location: location.clone(), @@ -320,7 +321,7 @@ impl ObjectStore for AmazonS3 { #[derive(Debug)] struct S3MultiPartUpload { - part_idx: usize, + part_idx: AtomicUsize, state: Arc, } @@ -339,8 +340,7 @@ impl MultipartUpload for S3MultiPartUpload { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_micros(); - let idx = self.part_idx; - self.part_idx += 1; + let idx = self.part_idx.fetch_add(1, Ordering::AcqRel); let len = data.content_length(); println!( "uploading part: {}, location: {:?}, size: {}, upload_id: {}, timestamp: {}", @@ -362,10 +362,11 @@ impl MultipartUpload for S3MultiPartUpload { } async fn complete(&mut self) -> Result { - let parts = self.state.parts.finish(self.part_idx)?; + let idx = self.part_idx.load(Ordering::Acquire); + let parts = self.state.parts.finish(idx)?; println!( "completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}", - self.state.upload_id, self.part_idx, self.state.location, parts + self.state.upload_id, idx, self.state.location, parts ); self.state @@ -375,9 +376,10 @@ impl MultipartUpload for S3MultiPartUpload { } async fn abort(&mut self) -> Result<()> { + let idx = self.part_idx.load(Ordering::Acquire); println!( "aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}", - self.state.upload_id, self.part_idx, self.state.location + self.state.upload_id, idx, self.state.location ); self.state .client