Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: Support for streaming body in put/put_opts #6934

Open
thinkharderdev opened this issue Jan 3, 2025 · 8 comments
Open

object_store: Support for streaming body in put/put_opts #6934

thinkharderdev opened this issue Jan 3, 2025 · 8 comments
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@thinkharderdev
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently, put/put_opts takes a PutPayload which is just a wrapper around an array of Bytes. It would be useful (for us) to be able to stream the body without having to do a multipart upload.

Describe the solution you'd like

The underlying http library reqwest already supports this. So I think we can change the definition of PutPayload to an enum like so:

type StreamingBody = Box<dyn futures::stream::Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>> + Send + 'static>;

pub enum PutPayload {
  Bytes(Arc<[Bytes]>),
  Streaming(StreamingBody,usize),
}

where PutPayload::Streaming holds the stream and the total content length (which must be known ahead of time).

The (unfortunate) requirement of the S3 API to know the content-length up front means this isn't fully general but can be useful in certain situations such as buffering a largeish file on disk before uploading. In that case, a streaming body would allow the upload to not have to load the entire object in memory to upload it.

Describe alternatives you've considered

This can already be done with put_multipart but mutli-part upload is complicated, may have some performance implications for reads.

Additional context

@thinkharderdev thinkharderdev added the enhancement Any new improvement worthy of a entry in the changelog label Jan 3, 2025
@tustvold
Copy link
Contributor

tustvold commented Jan 3, 2025

This can already be done with put_multipart but mutli-part upload is complicated, may have some performance implications for reads.

Could you expand on this a bit, the multi-part upload is typically more performant for large uploads (>10MB). Not only can it upload chunks in parallel, it has much better reliability as it can retry just failed parts instead of the entire upload.

I have debated adding this in the past, but the protocol for supporting it is complicated, and it is unclear to me that it would yield any tangible benefits.

To be honest I had viewed the streaming upload feature as largely having been superseded by the multipart upload feature. This is certainly the way it was marketed and AWS tools like S3 Transfer Manager recommend using multipart uploads to improve performance.

S3 Transfer Manager can take advantage of performance improvements such as the multipart upload API and byte-range fetches

Edit: FYI the BufWriter::put API should take care of most of the complexities of doing streaming uploads.

@thinkharderdev
Copy link
Contributor Author

Could you expand on this a bit, the multi-part upload is typically more performant for large uploads (>10MB). Not only can it upload chunks in parallel, it has much better reliability as it can retry just failed parts instead of the entire upload.

My understanding (which may be wrong) is that read latency can be higher when doing range fetches for multi-part files that cross part boundaries since S3 stores the parts separately.

On a separate note, I think there are some concurrency issues in BufWriter which I am investigating. We have seen issues where the parts are in the wrong order when uploaded (and it stops happening when we set the max concurrency in BufWriter to 1).

@tustvold
Copy link
Contributor

tustvold commented Jan 3, 2025

My understanding (which may be wrong) is that read latency can be higher when doing range fetches for multi-part files that cross part boundaries since S3 stores the parts separately.

I've not seen any AWS material documenting this as a thing, and even if it were, S3 latencies are so noisy I suspect it would be rendered largely irrelevant.

As for the concurrency issues I suggest reading through #6460 filed by your coworker, from his description it sounded like a bug in your code.

@thinkharderdev
Copy link
Contributor Author

As for the concurrency issues I suggest reading through #6460 filed by your coworker, from his description it sounded like a bug in your code.

I've read through that one and this is actually a different issue. Nothing fails but the files are corrupted because the parts are in the wrong order. This happens consistently (as in it happens to a certain percentage of uploads consistently, no one case I've found is actually reproducible). I'm trying to find a way to reproduce it and will create an issue if I can find something.

@tustvold
Copy link
Contributor

tustvold commented Jan 3, 2025

Nothing fails but the files are corrupted because the parts are in the wrong order.

That sounds to me like exactly the same issue, I'm not familiar with how you are integrating the multipart upload, but my suspicion is whatever is sinking to it is accidentally deferring the put_part to a spawned future, or something similar.

Correct

tokio::spawn(upload.put_part(x));

Incorrect

tokio::spawn(async move {
    upload.lock().put_part(x);
})

I'd be very suspicious of anything that is using RWLock or similar

@thinkharderdev
Copy link
Contributor Author

thinkharderdev commented Jan 3, 2025

Nothing fails but the files are corrupted because the parts are in the wrong order.

That sounds to me like exactly the same issue, I'm not familiar with how you are integrating the multipart upload, but my suspicion is whatever is sinking to it is accidentally deferring the put_part to a spawned future, or something similar.

Correct

tokio::spawn(upload.put_part(x));

Incorrect

tokio::spawn(async move {
    upload.lock().put_part(x);
})

I'd be very suspicious of anything that is using RWLock or similar

There is nothing like that. I can't share the code here because it is internal, but I can assure you there is nothing weird going on. No spawned tasks, no interior mutability just a plain vanilla usage of BufWriter as an AsyncWrite. Conceptually it's just

Edit: more accurate version

let mut writer = BufWriter::new(store, path);

let mut buf = vec![];
while something {
   buf.extend_from_slice(&some_data);

   if buf.len() > SOME_SIZE {
      writer.write_all(&buf).await?
      buf.clear();
   }
}

writer.flush().await?;
writer.shutdown().await?;

I didn't want to create an issue until I have something concrete (and am double-sure it's not a bug on our end even though I am quite confident it is not) so I will report something when/if I have it.

@Xuanwo
Copy link
Member

Xuanwo commented Jan 3, 2025

Edit: more accurate version

let mut writer = BufWriter::new(store, path);

let mut buf = vec![];
while something {
   buf.extend_from_slice(&some_data);

   if buf.len() > SOME_SIZE {
      writer.write_all(&buf).await?
      buf.clear();
   }
}

writer.flush().await?;
writer.shutdown().await?;

Hi, if the code here is correct, it's possible that the buf still have content after the while loop finished.

@thinkharderdev
Copy link
Contributor Author

Hi, if the code here is correct, it's possible that the buf still have content after the while loop finished.

That code is simplifies for demonstration, but the real code will write the remaining data. The problem we are experiencing though is not that the files are too short but that they are jumbled because the parts are in the wrong order. The total file size is correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

3 participants