From 3e22cbc13da3a4cfe26cd7417e22041be359f3cb Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Mon, 9 Dec 2024 16:07:28 -0500 Subject: [PATCH 01/10] Use randomized content ID for Azure multipart uploads --- object_store/src/azure/client.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 76dedd71aa5..6c122a34882 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -36,6 +36,7 @@ use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use hyper::http::HeaderName; +use rand::Rng as _; use reqwest::{ header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}, Client as ReqwestClient, Method, RequestBuilder, Response, @@ -556,10 +557,11 @@ impl AzureClient { pub(crate) async fn put_block( &self, path: &Path, - part_idx: usize, + _part_idx: usize, payload: PutPayload, ) -> Result { - let content_id = format!("{part_idx:20}"); + let part_idx = u128::from_be_bytes(rand::thread_rng().gen()); + let content_id = format!("{part_idx:40}"); let block_id = BASE64_STANDARD.encode(&content_id); self.put_request(path, payload) From 4181cde1572550460c5e898f9a4a455b853fc39f Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Wed, 11 Dec 2024 09:45:52 -0500 Subject: [PATCH 02/10] Update object_store/src/azure/client.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- object_store/src/azure/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 6c122a34882..69ff39526be 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -561,7 +561,7 @@ impl AzureClient { payload: PutPayload, ) -> Result { let part_idx = u128::from_be_bytes(rand::thread_rng().gen()); - let content_id = format!("{part_idx:40}"); + let content_id = format!("{part_idx:032x}"); let block_id = BASE64_STANDARD.encode(&content_id); self.put_request(path, payload) From c2839a5f0c668c5728d9b1d151006c3ed9d755f7 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Wed, 11 Dec 2024 13:26:25 -0500 Subject: [PATCH 03/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/azure/mod.rs | 65 +++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 177bffb653a..38a1ff87a7b 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -295,10 +295,14 @@ impl MultipartStore for MicrosoftAzure { #[cfg(test)] mod tests { + use core::str; + use super::*; use crate::integration::*; use crate::tests::*; use bytes::Bytes; + use rand::thread_rng; + use rand::Rng; #[tokio::test] async fn azure_blob_test() { @@ -392,4 +396,65 @@ mod tests { azure_storage_token ); } + + #[tokio::test] + async fn azure_parallel_put_multipart_test() { + maybe_skip_integration!(); + let integration = MicrosoftAzureBuilder::from_env().build().unwrap(); + + let rng = thread_rng(); + let suffix = String::from_utf8( + rng.sample_iter(rand::distributions::Alphanumeric) + .take(32) + .collect(), + ) + .unwrap(); + let path = Path::from(format!("put_multipart_{suffix}")); + + let mut multipart_upload_1 = integration.put_multipart(&path).await.unwrap(); + let mut multipart_upload_2 = integration.put_multipart(&path).await.unwrap(); + + for i in 0..5 { + multipart_upload_1 + .put_part(Bytes::from(format!("1:{},", i)).into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from(format!("2:{},", i)).into()) + .await + .unwrap(); + } + + multipart_upload_1.complete().await.unwrap(); + let err = multipart_upload_2.complete().await.unwrap_err(); + + assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + + if let crate::Error::Generic { source, store } = err as crate::Error { + assert_eq!(store, STORE); + + if let Some(crate::client::retry::Error::Client { status, body, .. }) = + source.downcast_ref::() + { + assert_eq!(status.clone(), http::StatusCode::BAD_REQUEST); + + let body = body.clone().unwrap(); + if !body.contains("InvalidBlockListThe specified block list is invalid.") { + panic!( + "assertion failed: `{body:?}` is not an InvalidBlockList response", + ); + } + } else { + panic!("Not a Client error") + } + } else { + panic!("Not a Generic error") + } + + let get_result = integration.get(&path).await.unwrap(); + let bytes = get_result.bytes().await.unwrap(); + let string_contents = str::from_utf8(&bytes).unwrap(); + + assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); + } } From a4e8628f7edc62235497801bcdb64597b85250a4 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Wed, 11 Dec 2024 13:53:05 -0500 Subject: [PATCH 04/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/azure/mod.rs | 54 ++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 38a1ff87a7b..22b61702be3 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -414,16 +414,50 @@ mod tests { let mut multipart_upload_1 = integration.put_multipart(&path).await.unwrap(); let mut multipart_upload_2 = integration.put_multipart(&path).await.unwrap(); - for i in 0..5 { - multipart_upload_1 - .put_part(Bytes::from(format!("1:{},", i)).into()) - .await - .unwrap(); - multipart_upload_2 - .put_part(Bytes::from(format!("2:{},", i)).into()) - .await - .unwrap(); - } + multipart_upload_1 + .put_part(Bytes::from("1:0,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:0,").into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from("2:1,").into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from("1:1,").into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:2,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:2,").into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from("2:3,").into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from("1:3,").into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:4,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:4,").into()) + .await + .unwrap(); multipart_upload_1.complete().await.unwrap(); let err = multipart_upload_2.complete().await.unwrap_err(); From d11eb215c87368ea7d107147e00420d5c53b3f94 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Thu, 12 Dec 2024 16:05:39 -0500 Subject: [PATCH 05/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/azure/mod.rs | 1 + object_store/src/integration.rs | 66 +++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 22b61702be3..304fcd4a715 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -318,6 +318,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration).await; signing(&integration).await; let validate = !integration.client.config().disable_tagging; diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 30177878306..863b03ab9b0 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -24,6 +24,8 @@ //! //! They are intended solely for testing purposes. +use core::str; + use crate::multipart::MultipartStore; use crate::path::Path; use crate::{ @@ -1109,3 +1111,67 @@ async fn delete_fixtures(storage: &DynObjectStore) { .await .unwrap(); } + +/// Tests a race condition where 2 threads are performing multipart writes to the same path +pub async fn multipart_race_condition(storage: &dyn ObjectStore) { + let path = Path::from("test_multipart_race_condition"); + + let mut multipart_upload_1 = storage.put_multipart(&path).await.unwrap(); + let mut multipart_upload_2 = storage.put_multipart(&path).await.unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:0,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:0,").into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from("2:1,").into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from("1:1,").into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:2,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:2,").into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from("2:3,").into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from("1:3,").into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:4,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:4,").into()) + .await + .unwrap(); + + multipart_upload_1.complete().await.unwrap(); + let err = multipart_upload_2.complete().await.unwrap_err(); + + assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + + let get_result = storage.get(&path).await.unwrap(); + let bytes = get_result.bytes().await.unwrap(); + let string_contents = str::from_utf8(&bytes).unwrap(); + + assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); +} From 5edc040deb729b68f1fbc89b3ac008430401fab3 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Thu, 12 Dec 2024 16:18:38 -0500 Subject: [PATCH 06/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/azure/mod.rs | 99 ----------------------------------- 1 file changed, 99 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 304fcd4a715..469f15b2d9d 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -295,14 +295,10 @@ impl MultipartStore for MicrosoftAzure { #[cfg(test)] mod tests { - use core::str; - use super::*; use crate::integration::*; use crate::tests::*; use bytes::Bytes; - use rand::thread_rng; - use rand::Rng; #[tokio::test] async fn azure_blob_test() { @@ -397,99 +393,4 @@ mod tests { azure_storage_token ); } - - #[tokio::test] - async fn azure_parallel_put_multipart_test() { - maybe_skip_integration!(); - let integration = MicrosoftAzureBuilder::from_env().build().unwrap(); - - let rng = thread_rng(); - let suffix = String::from_utf8( - rng.sample_iter(rand::distributions::Alphanumeric) - .take(32) - .collect(), - ) - .unwrap(); - let path = Path::from(format!("put_multipart_{suffix}")); - - let mut multipart_upload_1 = integration.put_multipart(&path).await.unwrap(); - let mut multipart_upload_2 = integration.put_multipart(&path).await.unwrap(); - - multipart_upload_1 - .put_part(Bytes::from("1:0,").into()) - .await - .unwrap(); - multipart_upload_2 - .put_part(Bytes::from("2:0,").into()) - .await - .unwrap(); - - multipart_upload_2 - .put_part(Bytes::from("2:1,").into()) - .await - .unwrap(); - multipart_upload_1 - .put_part(Bytes::from("1:1,").into()) - .await - .unwrap(); - - multipart_upload_1 - .put_part(Bytes::from("1:2,").into()) - .await - .unwrap(); - multipart_upload_2 - .put_part(Bytes::from("2:2,").into()) - .await - .unwrap(); - - multipart_upload_2 - .put_part(Bytes::from("2:3,").into()) - .await - .unwrap(); - multipart_upload_1 - .put_part(Bytes::from("1:3,").into()) - .await - .unwrap(); - - multipart_upload_1 - .put_part(Bytes::from("1:4,").into()) - .await - .unwrap(); - multipart_upload_2 - .put_part(Bytes::from("2:4,").into()) - .await - .unwrap(); - - multipart_upload_1.complete().await.unwrap(); - let err = multipart_upload_2.complete().await.unwrap_err(); - - assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); - - if let crate::Error::Generic { source, store } = err as crate::Error { - assert_eq!(store, STORE); - - if let Some(crate::client::retry::Error::Client { status, body, .. }) = - source.downcast_ref::() - { - assert_eq!(status.clone(), http::StatusCode::BAD_REQUEST); - - let body = body.clone().unwrap(); - if !body.contains("InvalidBlockListThe specified block list is invalid.") { - panic!( - "assertion failed: `{body:?}` is not an InvalidBlockList response", - ); - } - } else { - panic!("Not a Client error") - } - } else { - panic!("Not a Generic error") - } - - let get_result = integration.get(&path).await.unwrap(); - let bytes = get_result.bytes().await.unwrap(); - let string_contents = str::from_utf8(&bytes).unwrap(); - - assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); - } } From 7ca06431dcffbcb5e2d91d94ebd2738d2cd55e95 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Thu, 12 Dec 2024 16:23:14 -0500 Subject: [PATCH 07/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/aws/mod.rs | 1 + object_store/src/azure/mod.rs | 2 +- object_store/src/gcp/mod.rs | 1 + object_store/src/integration.rs | 11 ++++++++--- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index a7f9264a681..7f449c49963 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -570,6 +570,7 @@ mod tests { rename_and_copy(&integration).await; stream_get(&integration).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration, true).await; signing(&integration).await; s3_encryption(&integration).await; put_get_attributes(&integration).await; diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 469f15b2d9d..81b6667bc05 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -314,7 +314,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; - multipart_race_condition(&integration).await; + multipart_race_condition(&integration, false).await; signing(&integration).await; let validate = !integration.client.config().disable_tagging; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 039ec46b68c..5199135ba6b 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -297,6 +297,7 @@ mod test { // https://github.com/fsouza/fake-gcs-server/issues/852 stream_get(&integration).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration, true).await; // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; put_opts(&integration, true).await; diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 863b03ab9b0..1a4907b9742 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -1113,7 +1113,7 @@ async fn delete_fixtures(storage: &DynObjectStore) { } /// Tests a race condition where 2 threads are performing multipart writes to the same path -pub async fn multipart_race_condition(storage: &dyn ObjectStore) { +pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_wins: bool) { let path = Path::from("test_multipart_race_condition"); let mut multipart_upload_1 = storage.put_multipart(&path).await.unwrap(); @@ -1165,9 +1165,14 @@ pub async fn multipart_race_condition(storage: &dyn ObjectStore) { .unwrap(); multipart_upload_1.complete().await.unwrap(); - let err = multipart_upload_2.complete().await.unwrap_err(); - assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + if last_writer_wins { + multipart_upload_2.complete().await.unwrap(); + } else { + let err = multipart_upload_2.complete().await.unwrap_err(); + + assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + } let get_result = storage.get(&path).await.unwrap(); let bytes = get_result.bytes().await.unwrap(); From b4fd82503cc08197a23cf824552f72e34dce73ea Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Thu, 12 Dec 2024 16:25:22 -0500 Subject: [PATCH 08/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/integration.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 1a4907b9742..74f5180f28d 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -1178,5 +1178,9 @@ pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_win let bytes = get_result.bytes().await.unwrap(); let string_contents = str::from_utf8(&bytes).unwrap(); - assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); + if last_writer_wins { + assert_eq!("2:0,2:1,2:2,2:3,2:4,", string_contents); + } else { + assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); + } } From fae706b1ba13161b58ab3b98f1ac4206e72ab722 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Thu, 12 Dec 2024 16:33:41 -0500 Subject: [PATCH 09/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/integration.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 74f5180f28d..a4d61880240 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -1164,6 +1164,19 @@ pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_win .await .unwrap(); + // This is to satisy AWS S3's minimum allowed upload size of 5242880 + let last_chunk_size = 5 * 1024 * 1024; + let last_chunk = get_chunk(last_chunk_size); + + multipart_upload_1 + .put_part(last_chunk.clone().into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(last_chunk.into()) + .await + .unwrap(); + multipart_upload_1.complete().await.unwrap(); if last_writer_wins { @@ -1179,8 +1192,8 @@ pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_win let string_contents = str::from_utf8(&bytes).unwrap(); if last_writer_wins { - assert_eq!("2:0,2:1,2:2,2:3,2:4,", string_contents); + assert!(string_contents.starts_with("2:0,2:1,2:2,2:3,2:4,")); } else { - assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); + assert!(string_contents.starts_with("1:0,1:1,1:2,1:3,1:4,")); } } From e6a4c74b4bce7757a0d1c61173e1e28189af67a0 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Thu, 12 Dec 2024 16:51:14 -0500 Subject: [PATCH 10/10] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/integration.rs | 49 ++++++++++++++++----------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index a4d61880240..20e95fddc47 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -1120,60 +1120,47 @@ pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_win let mut multipart_upload_2 = storage.put_multipart(&path).await.unwrap(); multipart_upload_1 - .put_part(Bytes::from("1:0,").into()) + .put_part(Bytes::from(format!("1:{:05300000},", 0)).into()) .await .unwrap(); multipart_upload_2 - .put_part(Bytes::from("2:0,").into()) + .put_part(Bytes::from(format!("2:{:05300000},", 0)).into()) .await .unwrap(); multipart_upload_2 - .put_part(Bytes::from("2:1,").into()) + .put_part(Bytes::from(format!("2:{:05300000},", 1)).into()) .await .unwrap(); multipart_upload_1 - .put_part(Bytes::from("1:1,").into()) + .put_part(Bytes::from(format!("1:{:05300000},", 1)).into()) .await .unwrap(); multipart_upload_1 - .put_part(Bytes::from("1:2,").into()) + .put_part(Bytes::from(format!("1:{:05300000},", 2)).into()) .await .unwrap(); multipart_upload_2 - .put_part(Bytes::from("2:2,").into()) + .put_part(Bytes::from(format!("2:{:05300000},", 2)).into()) .await .unwrap(); multipart_upload_2 - .put_part(Bytes::from("2:3,").into()) + .put_part(Bytes::from(format!("2:{:05300000},", 3)).into()) .await .unwrap(); multipart_upload_1 - .put_part(Bytes::from("1:3,").into()) + .put_part(Bytes::from(format!("1:{:05300000},", 3)).into()) .await .unwrap(); multipart_upload_1 - .put_part(Bytes::from("1:4,").into()) + .put_part(Bytes::from(format!("1:{:05300000},", 4)).into()) .await .unwrap(); multipart_upload_2 - .put_part(Bytes::from("2:4,").into()) - .await - .unwrap(); - - // This is to satisy AWS S3's minimum allowed upload size of 5242880 - let last_chunk_size = 5 * 1024 * 1024; - let last_chunk = get_chunk(last_chunk_size); - - multipart_upload_1 - .put_part(last_chunk.clone().into()) - .await - .unwrap(); - multipart_upload_2 - .put_part(last_chunk.into()) + .put_part(Bytes::from(format!("2:{:05300000},", 4)).into()) .await .unwrap(); @@ -1192,8 +1179,20 @@ pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_win let string_contents = str::from_utf8(&bytes).unwrap(); if last_writer_wins { - assert!(string_contents.starts_with("2:0,2:1,2:2,2:3,2:4,")); + assert!(string_contents.starts_with( + format!( + "2:{:05300000},2:{:05300000},2:{:05300000},2:{:05300000},2:{:05300000},", + 0, 1, 2, 3, 4 + ) + .as_str() + )); } else { - assert!(string_contents.starts_with("1:0,1:1,1:2,1:3,1:4,")); + assert!(string_contents.starts_with( + format!( + "1:{:05300000},1:{:05300000},1:{:05300000},1:{:05300000},1:{:05300000},", + 0, 1, 2, 3, 4 + ) + .as_str() + )); } }