diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 22b61702be3..304fcd4a715 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -318,6 +318,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + multipart_race_condition(&integration).await; signing(&integration).await; let validate = !integration.client.config().disable_tagging; diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 30177878306..863b03ab9b0 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -24,6 +24,8 @@ //! //! They are intended solely for testing purposes. +use core::str; + use crate::multipart::MultipartStore; use crate::path::Path; use crate::{ @@ -1109,3 +1111,67 @@ async fn delete_fixtures(storage: &DynObjectStore) { .await .unwrap(); } + +/// Tests a race condition where 2 threads are performing multipart writes to the same path +pub async fn multipart_race_condition(storage: &dyn ObjectStore) { + let path = Path::from("test_multipart_race_condition"); + + let mut multipart_upload_1 = storage.put_multipart(&path).await.unwrap(); + let mut multipart_upload_2 = storage.put_multipart(&path).await.unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:0,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:0,").into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from("2:1,").into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from("1:1,").into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:2,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:2,").into()) + .await + .unwrap(); + + multipart_upload_2 + .put_part(Bytes::from("2:3,").into()) + .await + .unwrap(); + multipart_upload_1 + .put_part(Bytes::from("1:3,").into()) + .await + .unwrap(); + + multipart_upload_1 + .put_part(Bytes::from("1:4,").into()) + .await + .unwrap(); + multipart_upload_2 + .put_part(Bytes::from("2:4,").into()) + .await + .unwrap(); + + multipart_upload_1.complete().await.unwrap(); + let err = multipart_upload_2.complete().await.unwrap_err(); + + assert!(matches!(err, crate::Error::Generic { .. }), "{err}"); + + let get_result = storage.get(&path).await.unwrap(); + let bytes = get_result.bytes().await.unwrap(); + let string_contents = str::from_utf8(&bytes).unwrap(); + + assert_eq!("1:0,1:1,1:2,1:3,1:4,", string_contents); +}