Skip to content

Commit

Permalink
wtf
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Dec 3, 2024
1 parent c5ea1d0 commit fc51b10
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 14 deletions.
27 changes: 19 additions & 8 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,12 @@ impl S3Client {
location: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
let response = self
.request(Method::POST, location)
.header("x-amz-checksum-algorithm", "SHA256")
let mut req = self
.request(Method::POST, location);
if opts.checksum {
req = req.header("x-amz-checksum-algorithm", "SHA256");
}
let response = req
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
Expand Down Expand Up @@ -678,11 +681,17 @@ impl S3Client {
.idempotent(true);

request = match data {
PutPartPayload::Part(payload) => request.with_payload(payload),
PutPartPayload::Copy(path) => request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
PutPartPayload::Part(payload) => {
println!("put_part payload={}", payload.content_length());
request.with_payload(payload)
},
PutPartPayload::Copy(path) => {
println!("put_part path={path:?}");
request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
)
},
};

if self
Expand Down Expand Up @@ -721,11 +730,13 @@ impl S3Client {
}

pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> {
println!("Aborting");
self.request(Method::DELETE, location)
.query(&[("uploadId", upload_id)])
.with_encryption_headers()
.send()
.await?;
println!("Aborted");

Ok(())
}
Expand Down
21 changes: 17 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,29 +309,37 @@ impl ObjectStore for AmazonS3 {
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
println!("copy_if_not_exists from={from:?} to={to:?}");
let (k, v, status) = match &self.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
Some(S3CopyIfNotExists::Multipart) => {
let opts = PutMultipartOpts {
tags: Default::default(),
attributes: Default::default(),
checksum: false,
};
let upload_id = self
.client
.create_multipart(to, PutMultipartOpts::default())
.create_multipart(to, opts)
.await?;

let res = async {
let part = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
match self
println!("copy_if_not_exists part={part:#?}");
let res = self
.client
.complete_multipart(
to,
&upload_id,
vec![part],
CompleteMultipartMode::Create,
)
.await
.await;
match res
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
Expand Down Expand Up @@ -442,8 +450,13 @@ impl MultipartUpload for S3MultiPartUpload {
#[async_trait]
impl MultipartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
let opts = PutMultipartOpts {
tags: Default::default(),
attributes: Default::default(),
checksum: true,
};
self.client
.create_multipart(path, PutMultipartOpts::default())
.create_multipart(path, opts)
.await
}

Expand Down
2 changes: 2 additions & 0 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ impl BufWriter {
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
checksum: true,
};
let upload = self.store.put_multipart_opts(&path, opts).await?;
let mut chunked =
Expand Down Expand Up @@ -384,6 +385,7 @@ impl AsyncWrite for BufWriter {
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
checksum: true,
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ impl From<Vec<PartId>> for CompleteMultipartUpload {
let part = value
.into_iter()
.enumerate()
.map(|(part_number, part)| MultipartPart {
.map(|(idx, part)| MultipartPart {
e_tag: part.content_id,
part_number: part_number + 1,
part_number: idx + 1,
checksum_sha256: None,
})
.collect();
Expand Down
2 changes: 2 additions & 0 deletions object_store/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,9 @@ pub async fn copy_if_not_exists(storage: &DynObjectStore) {

// copy_if_not_exists() copies contents and allows deleting original
storage.delete(&path2).await.unwrap();
println!("copy_if_not_exists");
storage.copy_if_not_exists(&path1, &path2).await.unwrap();
println!("copied_if_not_exists");
storage.delete(&path1).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
Expand Down
1 change: 1 addition & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,7 @@ pub struct PutMultipartOpts {
///
/// Implementations that don't support an attribute should return an error
pub attributes: Attributes,
pub checksum: bool,
}

impl From<TagSet> for PutMultipartOpts {
Expand Down

0 comments on commit fc51b10

Please sign in to comment.