Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object_store): Add put API for buffered::BufWriter #5835

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 162 additions & 3 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

use crate::path::Path;
use crate::{
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
WriteMultipart,
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, PutResult,
TagSet, WriteMultipart,
};
use bytes::Bytes;
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use std::cmp::Ordering;
Expand Down Expand Up @@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
}
}

/// A buffered multipart uploader.
///
/// This uploader adaptively uses [`ObjectStore::put`] or
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
/// been written.
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
/// streamed using [`ObjectStore::put_multipart`]
///
/// # TODO
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
///
/// Add attributes and tags support.
pub struct BufUploader {
store: Arc<dyn ObjectStore>,
path: Path,

chunk_size: usize,
max_concurrency: usize,

buffer: PutPayloadMut,
write_multipart: Option<WriteMultipart>,
}

impl std::fmt::Debug for BufUploader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
f.debug_struct("BufUploader")
.field("chunk_size", &self.chunk_size)
.finish()
}
}

impl BufUploader {
/// Create a new [`BufUploader`] from the provided [`ObjectStore`] and [`Path`]
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self::with_chunk_size(store, path, 5 * 1024 * 1024)
}

/// Create a new [`BufUploader`] from the provided [`ObjectStore`], [`Path`] and `capacity`
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_chunk_size(store: Arc<dyn ObjectStore>, path: Path, chunk_size: usize) -> Self {
Self {
store,
path,
chunk_size,
max_concurrency: 8,
buffer: PutPayloadMut::new(),
write_multipart: None,
}
}

/// Override the maximum number of in-flight requests for this writer
///
/// Defaults to 8
pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
Self {
max_concurrency,
..self
}
}

/// Write data to the uploader
pub async fn put(&mut self, mut bytes: Bytes) -> crate::Result<()> {
// If we have have write_multipart in progress, put into `WriteMultipart` directly.
if let Some(writer) = self.write_multipart.as_mut() {
writer.wait_for_capacity(self.max_concurrency).await?;
writer.put(bytes);
return Ok(());
}

// If we have not started a multipart upload, and the buffer is not full, push the bytes.
if self.buffer.content_length() + bytes.len() <= self.chunk_size {
self.buffer.push(bytes);
return Ok(());
}

// Otherwise, we need to create a WriteMultipart and push all data in.
let mut write_multipart = WriteMultipart::new_with_chunk_size(
self.store.put_multipart(&self.path).await?,
self.chunk_size,
);

// NOTE: buffer length must be less than chunk_size, already checked.
let in_buffer = bytes.split_to(self.chunk_size - self.buffer.content_length());
self.buffer.push(in_buffer);
write_multipart.put_part(std::mem::take(&mut self.buffer).freeze());

// It's possible that we have more data to write.
if bytes.has_remaining() {
write_multipart.put(bytes);
}

self.write_multipart = Some(write_multipart);
Ok(())
}

/// Finish the upload and create the file.
pub async fn finish(&mut self) -> crate::Result<PutResult> {
match self.write_multipart.take() {
Some(w) => w.finish().await,
None => {
self.store
.put(&self.path, std::mem::take(&mut self.buffer).freeze())
.await
}
}
}

/// Abort the upload and cleanup all data.
pub async fn abort(&mut self) -> crate::Result<()> {
// take and drop all content in buffer.
std::mem::take(&mut self.buffer);

if let Some(write_multipart) = self.write_multipart.take() {
write_multipart.abort().await?;
}
Ok(())
}
}

/// Port of standardised function as requires Rust 1.66
///
/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
Expand Down Expand Up @@ -547,4 +666,44 @@ mod tests {
assert_eq!(response.meta.size, 40);
assert_eq!(response.attributes, attributes);
}

#[tokio::test]
async fn test_buf_uploader() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = Path::from("file.txt");

// Test put
let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30);
writer.put(Bytes::from(vec![0; 20])).await.unwrap();
writer.put(Bytes::from(vec![0; 5])).await.unwrap();
writer.finish().await.unwrap();
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 25);
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

// Test multipart
let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30);
writer.put(Bytes::from(vec![0; 20])).await.unwrap();
writer.put(Bytes::from(vec![0; 20])).await.unwrap();
writer.finish().await.unwrap();
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 40);
}
}
Loading