Skip to content

Commit

Permalink
feat(object_store): Add buffered::BufUploader
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Jun 3, 2024
1 parent 198af7a commit e266896
Showing 1 changed file with 162 additions and 3 deletions.
165 changes: 162 additions & 3 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
use crate::path::Path;
use crate::{
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
WriteMultipart,
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, PutResult,
TagSet, WriteMultipart,
};
use bytes::Bytes;
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use std::cmp::Ordering;
Expand Down Expand Up @@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
}
}

/// A buffered multipart uploader.
///
/// This uploader adaptively uses [`ObjectStore::put`] or
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
/// been written.
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
/// streamed using [`ObjectStore::put_multipart`]
///
/// # TODO
///
/// Add attributes and tags support.
pub struct BufUploader {
store: Arc<dyn ObjectStore>,
path: Path,

chunk_size: usize,
max_concurrency: usize,

buffer: PutPayloadMut,
write_multipart: Option<WriteMultipart>,
}

impl std::fmt::Debug for BufUploader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufUploader")
.field("chunk_size", &self.chunk_size)
.finish()
}
}

impl BufUploader {
/// Create a new [`BufUploader`] from the provided [`ObjectStore`] and [`Path`]
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self::with_chunk_size(store, path, 5 * 1024 * 1024)
}

/// Create a new [`BufUploader`] from the provided [`ObjectStore`], [`Path`] and `capacity`
pub fn with_chunk_size(store: Arc<dyn ObjectStore>, path: Path, chunk_size: usize) -> Self {
Self {
store,
path,
chunk_size,
max_concurrency: 8,
buffer: PutPayloadMut::new(),
write_multipart: None,
}
}

/// Override the maximum number of in-flight requests for this writer
///
/// Defaults to 8
pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
Self {
max_concurrency,
..self
}
}

/// Write data to the uploader
pub async fn put(&mut self, mut bytes: Bytes) -> crate::Result<()> {
// If we have have write_multipart in progress, put into `WriteMultipart` directly.
if let Some(writer) = self.write_multipart.as_mut() {
writer.wait_for_capacity(self.max_concurrency).await?;
writer.put(bytes);
return Ok(());
}

// If we have not started a multipart upload, and the buffer is not full, push the bytes.
if self.buffer.content_length() + bytes.len() <= self.chunk_size {
self.buffer.push(bytes);
return Ok(());
}

// Otherwise, we need to create a WriteMultipart and push all data in.
let mut write_multipart = WriteMultipart::new_with_chunk_size(
self.store.put_multipart(&self.path).await?,
self.chunk_size,
);

// NOTE: buffer length must be less than chunk_size, already checked.
let in_buffer = bytes.split_to(self.chunk_size - self.buffer.content_length());
self.buffer.push(in_buffer);
write_multipart.put_part(std::mem::take(&mut self.buffer).freeze());

// It's possible that we have more data to write.
if bytes.has_remaining() {
write_multipart.put(bytes);
}

self.write_multipart = Some(write_multipart);
Ok(())
}

/// Finish the upload and create the file.
pub async fn finish(&mut self) -> crate::Result<PutResult> {
match self.write_multipart.take() {
Some(w) => w.finish().await,
None => {
self.store
.put(&self.path, std::mem::take(&mut self.buffer).freeze())
.await
}
}
}

/// Abort the upload and cleanup all data.
pub async fn abort(&mut self) -> crate::Result<()> {
// take and drop all content in buffer.
std::mem::take(&mut self.buffer);

if let Some(write_multipart) = self.write_multipart.take() {
write_multipart.abort().await?;
}
Ok(())
}
}

/// Port of standardised function as requires Rust 1.66
///
/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
Expand Down Expand Up @@ -547,4 +666,44 @@ mod tests {
assert_eq!(response.meta.size, 40);
assert_eq!(response.attributes, attributes);
}

#[tokio::test]
async fn test_buf_uploader() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = Path::from("file.txt");

// Test put
let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30);
writer.put(Bytes::from(vec![0; 20])).await.unwrap();
writer.put(Bytes::from(vec![0; 5])).await.unwrap();
writer.finish().await.unwrap();
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 25);

// Test multipart
let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30);
writer.put(Bytes::from(vec![0; 20])).await.unwrap();
writer.put(Bytes::from(vec![0; 20])).await.unwrap();
writer.finish().await.unwrap();
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 40);
}
}

0 comments on commit e266896

Please sign in to comment.