diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 59501b5addfe..85e159fe843c 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -141,6 +141,7 @@ jobs: echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket + aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key") diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 47249685b7bb..af54b8a0d4ce 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -32,7 +32,7 @@ use crate::client::s3::{ InitiateMultipartUploadResult, ListResponse, }; use crate::client::GetOptionsExt; -use crate::multipart::PartId; +use crate::multipart::MultipartInfo; use crate::path::DELIMITER; use crate::{ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path, @@ -186,7 +186,7 @@ impl From for Error { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct S3Config { pub region: String, pub endpoint: Option, @@ -462,6 +462,27 @@ impl S3Client { } } + #[allow(dead_code)] + pub(crate) fn request_with_config<'a>( + &'a self, + method: Method, + path: &'a Path, + config: &'a S3Config, + ) -> Request<'a> { + let url = self.config.path_url(path); + Request { + path, + builder: self.client.request(method, url), + payload: None, + payload_sha256: None, + config, + use_session_creds: true, + idempotent: false, + retry_on_conflict: false, + retry_error_body: false, + } + } + /// Make an S3 Delete Objects request /// /// Produces a vector of results, one for each path in the input vector. If @@ -619,6 +640,7 @@ impl S3Client { ) -> Result { let response = self .request(Method::POST, location) + .header("x-amz-checksum-algorithm", "SHA256") .query(&[("uploads", "")]) .with_encryption_headers() .with_attributes(opts.attributes) @@ -642,12 +664,16 @@ impl S3Client { upload_id: &MultipartId, part_idx: usize, data: PutPartPayload<'_>, - ) -> Result { + ) -> Result { let is_copy = matches!(data, PutPartPayload::Copy(_)); let part = (part_idx + 1).to_string(); + let config = S3Config { + checksum: Some(Checksum::SHA256), + ..self.config.clone() + }; let mut request = self - .request(Method::PUT, path) + .request_with_config(Method::PUT, path, &config) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .idempotent(true); @@ -669,6 +695,10 @@ impl S3Client { request = request.with_encryption_headers(); } let response = request.send().await?; + let checksum_sha256 = response + .headers() + .get("x-amz-checksum-sha256") + .map(|v| v.to_str().unwrap().to_string()); let content_id = match is_copy { false => get_etag(response.headers()).context(MetadataSnafu)?, @@ -682,7 +712,12 @@ impl S3Client { response.e_tag } }; - Ok(PartId { content_id }) + let part = MultipartInfo { + e_tag: content_id, + part_number: part_idx + 1, + checksum_sha256, + }; + Ok(part) } pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { @@ -699,7 +734,7 @@ impl S3Client { &self, location: &Path, upload_id: &str, - parts: Vec, + parts: Vec, mode: CompleteMultipartMode, ) -> Result { let parts = if parts.is_empty() { diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index d7c8c9b546eb..9716c103b9ef 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -40,7 +40,7 @@ use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3 use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -use crate::multipart::{MultipartStore, PartId}; +use crate::multipart::{MultipartInfo, MultipartStore}; use crate::signer::Signer; use crate::util::STRICT_ENCODE_SET; use crate::{ @@ -319,7 +319,7 @@ impl ObjectStore for AmazonS3 { .await?; let res = async { - let part_id = self + let part = self .client .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) .await?; @@ -328,7 +328,7 @@ impl ObjectStore for AmazonS3 { .complete_multipart( to, &upload_id, - vec![part_id], + vec![part], CompleteMultipartMode::Create, ) .await @@ -407,7 +407,7 @@ impl MultipartUpload for S3MultiPartUpload { PutPartPayload::Part(data), ) .await?; - state.parts.put(idx, part); + state.parts.put(part); Ok(()) }) } @@ -453,7 +453,7 @@ impl MultipartStore for AmazonS3 { id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { self.client .put_part(path, id, part_idx, PutPartPayload::Part(data)) .await @@ -463,7 +463,7 @@ impl MultipartStore for AmazonS3 { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { self.client .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite) @@ -493,6 +493,66 @@ mod tests { const NON_EXISTENT_NAME: &str = "nonexistentname"; + #[tokio::test] + async fn write_multipart_file_with_signature() { + maybe_skip_integration!(); + + let store = AmazonS3Builder::from_env() + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + upload + .put_part(PutPayload::from(vec![0u8; 10_000_000])) + .await + .unwrap(); + upload + .put_part(PutPayload::from(vec![0u8; 5_000_000])) + .await + .unwrap(); + + let res = upload.complete().await.unwrap(); + assert!(res.e_tag.is_some(), "Should have valid etag"); + + store.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn write_multipart_file_with_signature_object_lock() { + maybe_skip_integration!(); + + let bucket = "test-object-lock"; + let store = AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + upload + .put_part(PutPayload::from(vec![0u8; 10_000_000])) + .await + .unwrap(); + upload + .put_part(PutPayload::from(vec![0u8; 5_000_000])) + .await + .unwrap(); + + let res = upload.complete().await.unwrap(); + assert!(res.e_tag.is_some(), "Should have valid etag"); + + store.delete(&path).await.unwrap(); + } + #[tokio::test] async fn s3_test() { maybe_skip_integration!(); diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 76dedd71aa50..a03cf263b00f 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -23,7 +23,7 @@ use crate::client::header::{get_put_result, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; -use crate::multipart::PartId; +use crate::multipart::{MultipartInfo, PartId}; use crate::path::DELIMITER; use crate::util::{deserialize_rfc1123, GetRange}; use crate::{ @@ -558,7 +558,7 @@ impl AzureClient { path: &Path, part_idx: usize, payload: PutPayload, - ) -> Result { + ) -> Result { let content_id = format!("{part_idx:20}"); let block_id = BASE64_STANDARD.encode(&content_id); @@ -568,7 +568,12 @@ impl AzureClient { .send() .await?; - Ok(PartId { content_id }) + let part = MultipartInfo { + e_tag: content_id, + part_number: part_idx + 1, + checksum_sha256: None, + }; + Ok(part) } /// PUT a block list diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 177bffb653ae..60829ebe0f28 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -23,11 +23,9 @@ //! //! Unused blocks will automatically be dropped after 7 days. use crate::{ - multipart::{MultipartStore, PartId}, - path::Path, - signer::Signer, - GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, + multipart::MultipartStore, path::Path, signer::Signer, GetOptions, GetResult, ListResult, + MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, + PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use futures::stream::{BoxStream, StreamExt, TryStreamExt}; @@ -50,6 +48,7 @@ mod credential; pub type AzureCredentialProvider = Arc>; use crate::azure::client::AzureClient; use crate::client::parts::Parts; +use crate::multipart::MultipartInfo; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -239,13 +238,14 @@ impl MultipartUpload for AzureMultiPartUpload { let state = Arc::clone(&self.state); Box::pin(async move { let part = state.client.put_block(&state.location, idx, data).await?; - state.parts.put(idx, part); + state.parts.put(part); Ok(()) }) } async fn complete(&mut self) -> Result { let parts = self.state.parts.finish(self.part_idx)?; + let parts = parts.into_iter().map(|part| part.into()).collect(); self.state .client @@ -271,7 +271,7 @@ impl MultipartStore for MicrosoftAzure { _: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { self.client.put_block(path, part_idx, data).await } @@ -279,8 +279,9 @@ impl MultipartStore for MicrosoftAzure { &self, path: &Path, _: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { + let parts = parts.into_iter().map(|p| p.into()).collect(); self.client .put_block_list(path, parts, Default::default()) .await diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs index 9fc301edcf81..964031c2b277 100644 --- a/object_store/src/client/parts.rs +++ b/object_store/src/client/parts.rs @@ -15,26 +15,26 @@ // specific language governing permissions and limitations // under the License. -use crate::multipart::PartId; +use crate::multipart::MultipartInfo; use parking_lot::Mutex; /// An interior mutable collection of upload parts and their corresponding part index #[derive(Debug, Default)] -pub(crate) struct Parts(Mutex>); +pub(crate) struct Parts(Mutex>); impl Parts { - /// Record the [`PartId`] for a given index + /// Record the [`MultipartInfo`] for a given index /// /// Note: calling this method multiple times with the same `part_idx` - /// will result in multiple [`PartId`] in the final output - pub(crate) fn put(&self, part_idx: usize, id: PartId) { - self.0.lock().push((part_idx, id)) + /// will result in multiple [`MultipartInfo`] in the final output + pub(crate) fn put(&self, part: MultipartInfo) { + self.0.lock().push(part) } - /// Produce the final list of [`PartId`] ordered by `part_idx` + /// Produce the final list of [`MultipartInfo`] ordered by `part_idx` /// /// `expected` is the number of parts expected in the final result - pub(crate) fn finish(&self, expected: usize) -> crate::Result> { + pub(crate) fn finish(&self, expected: usize) -> crate::Result> { let mut parts = self.0.lock(); if parts.len() != expected { return Err(crate::Error::Generic { @@ -42,7 +42,7 @@ impl Parts { source: "Missing part".to_string().into(), }); } - parts.sort_unstable_by_key(|(idx, _)| *idx); - Ok(parts.drain(..).map(|(_, v)| v).collect()) + parts.sort_unstable_by_key(|part| part.part_number); + Ok(parts.drain(..).collect()) } } diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index dba752cb1251..f131adb532ad 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -16,7 +16,7 @@ //! The list and multipart API used by both GCS and S3 -use crate::multipart::PartId; +use crate::multipart::{MultipartInfo, PartId}; use crate::path::Path; use crate::{ListResult, ObjectMeta, Result}; use chrono::{DateTime, Utc}; @@ -114,6 +114,21 @@ impl From> for CompleteMultipartUpload { .map(|(part_number, part)| MultipartPart { e_tag: part.content_id, part_number: part_number + 1, + checksum_sha256: None, + }) + .collect(); + Self { part } + } +} + +impl From> for CompleteMultipartUpload { + fn from(value: Vec) -> Self { + let part = value + .into_iter() + .map(|part| MultipartPart { + e_tag: part.e_tag, + part_number: part.part_number, + checksum_sha256: part.checksum_sha256, }) .collect(); Self { part } @@ -126,6 +141,8 @@ pub(crate) struct MultipartPart { pub e_tag: String, #[serde(rename = "PartNumber")] pub part_number: usize, + #[serde(rename = "ChecksumSHA256")] + pub checksum_sha256: Option, } #[derive(Debug, Deserialize)] diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index ccc9c341f2fe..8c8160b67ef2 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -25,7 +25,7 @@ use crate::client::s3::{ }; use crate::client::GetOptionsExt; use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE}; -use crate::multipart::PartId; +use crate::multipart::{MultipartInfo, PartId}; use crate::path::{Path, DELIMITER}; use crate::util::hex_encode; use crate::{ @@ -411,7 +411,7 @@ impl GoogleCloudStorageClient { upload_id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { let query = &[ ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), @@ -424,9 +424,12 @@ impl GoogleCloudStorageClient { .do_put() .await?; - Ok(PartId { - content_id: result.e_tag.unwrap(), - }) + let part = MultipartInfo { + e_tag: result.e_tag.unwrap(), + part_number: part_idx + 1, + checksum_sha256: None, + }; + Ok(part) } /// Initiate a multipart upload diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 039ec46b68c2..31dfac938298 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -41,9 +41,8 @@ use crate::client::CredentialProvider; use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ - multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, - UploadPart, + path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use client::GoogleCloudStorageClient; @@ -54,7 +53,7 @@ use url::Url; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::parts::Parts; -use crate::multipart::MultipartStore; +use crate::multipart::{MultipartInfo, MultipartStore}; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey}; @@ -124,13 +123,14 @@ impl MultipartUpload for GCSMultipartUpload { .client .put_part(&state.path, &state.multipart_id, idx, payload) .await?; - state.parts.put(idx, part); + state.parts.put(part); Ok(()) }) } async fn complete(&mut self) -> Result { let parts = self.state.parts.finish(self.part_idx)?; + let parts = parts.into_iter().map(|p| p.into()).collect(); self.state .client @@ -222,7 +222,7 @@ impl MultipartStore for GoogleCloudStorage { id: &MultipartId, part_idx: usize, payload: PutPayload, - ) -> Result { + ) -> Result { self.client.put_part(path, id, part_idx, payload).await } @@ -230,8 +230,9 @@ impl MultipartStore for GoogleCloudStorage { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { + let parts = parts.into_iter().map(|p| p.into()).collect(); self.client.multipart_complete(path, id, parts).await } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index a467e3b88a26..85d3075c26cf 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -27,7 +27,7 @@ use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use crate::multipart::{MultipartStore, PartId}; +use crate::multipart::{MultipartInfo, MultipartStore}; use crate::util::InvalidGetRange; use crate::{ path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, @@ -412,23 +412,26 @@ impl MultipartStore for InMemory { id: &MultipartId, part_idx: usize, payload: PutPayload, - ) -> Result { + ) -> Result { let mut storage = self.storage.write(); let upload = storage.upload_mut(id)?; if part_idx <= upload.parts.len() { upload.parts.resize(part_idx + 1, None); } upload.parts[part_idx] = Some(payload.into()); - Ok(PartId { - content_id: Default::default(), - }) + let part = MultipartInfo { + e_tag: "".to_string(), + part_number: 0, + checksum_sha256: None, + }; + Ok(part) } async fn complete_multipart( &self, path: &Path, id: &MultipartId, - _parts: Vec, + _parts: Vec, ) -> Result { let mut storage = self.storage.write(); let upload = storage.remove_upload(id)?; diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index d94e7f150513..41259829c7e7 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -21,18 +21,36 @@ //! cloud storage services. It's designed to offer efficient, non-blocking operations, //! especially useful when dealing with large files or high-throughput systems. -use async_trait::async_trait; - use crate::path::Path; use crate::{MultipartId, PutPayload, PutResult, Result}; +use async_trait::async_trait; -/// Represents a part of a file that has been successfully uploaded in a multipart upload process. +/// An etag of a part of a file that has been successfully uploaded in a multipart upload process. #[derive(Debug, Clone)] pub struct PartId { /// Id of this part pub content_id: String, } +/// Represents a part of a file that has been successfully uploaded in a multipart upload process. +#[derive(Debug)] +pub struct MultipartInfo { + /// Id of this part + pub e_tag: String, + /// Index of the part + 1 + pub part_number: usize, + /// 256 bit SHA of the contents + pub checksum_sha256: Option, +} + +impl From for PartId { + fn from(value: MultipartInfo) -> Self { + Self { + content_id: value.e_tag.clone(), + } + } +} + /// A low-level interface for interacting with multipart upload APIs /// /// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is supported by more @@ -64,7 +82,7 @@ pub trait MultipartStore: Send + Sync + 'static { id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result; + ) -> Result; /// Completes a multipart upload /// @@ -76,7 +94,7 @@ pub trait MultipartStore: Send + Sync + 'static { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result; /// Aborts a multipart upload diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index d07276c3dcad..d50449ef6f7f 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -20,7 +20,7 @@ use parking_lot::Mutex; use std::ops::Range; use std::{convert::TryInto, sync::Arc}; -use crate::multipart::{MultipartStore, PartId}; +use crate::multipart::{MultipartInfo, MultipartStore}; use crate::{ path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, @@ -350,7 +350,7 @@ impl MultipartStore for ThrottledStore { id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { sleep(self.config().wait_put_per_call).await; self.inner.put_part(path, id, part_idx, data).await } @@ -359,7 +359,7 @@ impl MultipartStore for ThrottledStore { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { self.inner.complete_multipart(path, id, parts).await }