From c2839a5f0c668c5728d9b1d151006c3ed9d755f7 Mon Sep 17 00:00:00 2001 From: Andrew Varnon Date: Wed, 11 Dec 2024 13:26:25 -0500 Subject: [PATCH] fixup! Use randomized content ID for Azure multipart uploads --- object_store/src/azure/mod.rs | 65 +++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 177bffb653ae..38a1ff87a7b6 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -295,10 +295,14 @@ impl MultipartStore for MicrosoftAzure { #[cfg(test)] mod tests { + use core::str; + use super::*; use crate::integration::*; use crate::tests::*; use bytes::Bytes; + use rand::thread_rng; + use rand::Rng; #[tokio::test] async fn azure_blob_test() { @@ -392,4 +396,65 @@ mod tests { azure_storage_token ); } + + #[tokio::test] + async fn azure_parallel_put_multipart_test() { + maybe_skip_integration!(); + let integration = MicrosoftAzureBuilder::from_env().build().unwrap(); + + let rng = thread_rng(); + let suffix = String::from_utf8( + rng.sample_iter(rand::distributions::Alphanumeric) + .take(32) + .collect(), + ) + .unwrap(); + let path = Path::from(format!("put_multipart_{suffix}")); + + let mut multipart_upload_1 = integration.put_multipart(&path).await.unwrap(); + let mut multipart_upload_2 = integration.put_multipart(&path).await.unwrap(); + + for i in 0..5 { + multipart_upload_1 + .put_part(Bytes::from(format!("1:{},", i)).into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from(format!("2:{},", i)).into()) + .await + .unwrap(); + } + + multipart_upload_1.complete().await.unwrap(); + let err = multipart_upload_2.complete().await.unwrap_err(); + + assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + + if let crate::Error::Generic { source, store } = err as crate::Error { + assert_eq!(store, STORE); + + if let Some(crate::client::retry::Error::Client { status, body, .. }) = + source.downcast_ref::() + { + assert_eq!(status.clone(), http::StatusCode::BAD_REQUEST); + + let body = body.clone().unwrap(); + if !body.contains("InvalidBlockListThe specified block list is invalid.") { + panic!( + "assertion failed: `{body:?}` is not an InvalidBlockList response", + ); + } + } else { + panic!("Not a Client error") + } + } else { + panic!("Not a Generic error") + } + + let get_result = integration.get(&path).await.unwrap(); + let bytes = get_result.bytes().await.unwrap(); + let string_contents = str::from_utf8(&bytes).unwrap(); + + assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); + } }