diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index a7f9264a6815..7f449c49963c 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -570,6 +570,7 @@ mod tests { rename_and_copy(&integration).await; stream_get(&integration).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration, true).await; signing(&integration).await; s3_encryption(&integration).await; put_get_attributes(&integration).await; diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 76dedd71aa50..69ff39526bef 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -36,6 +36,7 @@ use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use hyper::http::HeaderName; +use rand::Rng as _; use reqwest::{ header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}, Client as ReqwestClient, Method, RequestBuilder, Response, @@ -556,10 +557,11 @@ impl AzureClient { pub(crate) async fn put_block( &self, path: &Path, - part_idx: usize, + _part_idx: usize, payload: PutPayload, ) -> Result { - let content_id = format!("{part_idx:20}"); + let part_idx = u128::from_be_bytes(rand::thread_rng().gen()); + let content_id = format!("{part_idx:032x}"); let block_id = BASE64_STANDARD.encode(&content_id); self.put_request(path, payload) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 177bffb653ae..81b6667bc058 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -314,6 +314,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration, false).await; signing(&integration).await; let validate = !integration.client.config().disable_tagging; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 039ec46b68c2..5199135ba6b0 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -297,6 +297,7 @@ mod test { // https://github.com/fsouza/fake-gcs-server/issues/852 stream_get(&integration).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration, true).await; // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; put_opts(&integration, true).await; diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 30177878306f..20e95fddc478 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -24,6 +24,8 @@ //! //! They are intended solely for testing purposes. +use core::str; + use crate::multipart::MultipartStore; use crate::path::Path; use crate::{ @@ -1109,3 +1111,88 @@ async fn delete_fixtures(storage: &DynObjectStore) { .await .unwrap(); } + +/// Tests a race condition where 2 threads are performing multipart writes to the same path +pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_wins: bool) { + let path = Path::from("test_multipart_race_condition"); + + let mut multipart_upload_1 = storage.put_multipart(&path).await.unwrap(); + let mut multipart_upload_2 = storage.put_multipart(&path).await.unwrap(); + + multipart_upload_1 + .put_part(Bytes::from(format!("1:{:05300000},", 0)).into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from(format!("2:{:05300000},", 0)).into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from(format!("2:{:05300000},", 1)).into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from(format!("1:{:05300000},", 1)).into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from(format!("1:{:05300000},", 2)).into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from(format!("2:{:05300000},", 2)).into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from(format!("2:{:05300000},", 3)).into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from(format!("1:{:05300000},", 3)).into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from(format!("1:{:05300000},", 4)).into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from(format!("2:{:05300000},", 4)).into()) + .await + .unwrap(); + + multipart_upload_1.complete().await.unwrap(); + + if last_writer_wins { + multipart_upload_2.complete().await.unwrap(); + } else { + let err = multipart_upload_2.complete().await.unwrap_err(); + + assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + } + + let get_result = storage.get(&path).await.unwrap(); + let bytes = get_result.bytes().await.unwrap(); + let string_contents = str::from_utf8(&bytes).unwrap(); + + if last_writer_wins { + assert!(string_contents.starts_with( + format!( + "2:{:05300000},2:{:05300000},2:{:05300000},2:{:05300000},2:{:05300000},", + 0, 1, 2, 3, 4 + ) + .as_str() + )); + } else { + assert!(string_contents.starts_with( + format!( + "1:{:05300000},1:{:05300000},1:{:05300000},1:{:05300000},1:{:05300000},", + 0, 1, 2, 3, 4 + ) + .as_str() + )); + } +}