From e266896ed5c5b4a2a70350d2caefa8ce68e584b4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 3 Jun 2024 17:36:19 +0800 Subject: [PATCH 1/5] feat(object_store): Add buffered::BufUploader Signed-off-by: Xuanwo --- object_store/src/buffered.rs | 165 ++++++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 3 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index feb84d4d0bc3..1f598fdf050b 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -19,10 +19,10 @@ use crate::path::Path; use crate::{ - Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet, - WriteMultipart, + Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, PutResult, + TagSet, WriteMultipart, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use std::cmp::Ordering; @@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter { } } +/// A buffered multipart uploader. +/// +/// This uploader adaptively uses [`ObjectStore::put`] or +/// [`ObjectStore::put_multipart`] depending on the amount of data that has +/// been written. +/// +/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown +/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be +/// streamed using [`ObjectStore::put_multipart`] +/// +/// # TODO +/// +/// Add attributes and tags support. +pub struct BufUploader { + store: Arc, + path: Path, + + chunk_size: usize, + max_concurrency: usize, + + buffer: PutPayloadMut, + write_multipart: Option, +} + +impl std::fmt::Debug for BufUploader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BufUploader") + .field("chunk_size", &self.chunk_size) + .finish() + } +} + +impl BufUploader { + /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and [`Path`] + pub fn new(store: Arc, path: Path) -> Self { + Self::with_chunk_size(store, path, 5 * 1024 * 1024) + } + + /// Create a new [`BufUploader`] from the provided [`ObjectStore`], [`Path`] and `capacity` + pub fn with_chunk_size(store: Arc, path: Path, chunk_size: usize) -> Self { + Self { + store, + path, + chunk_size, + max_concurrency: 8, + buffer: PutPayloadMut::new(), + write_multipart: None, + } + } + + /// Override the maximum number of in-flight requests for this writer + /// + /// Defaults to 8 + pub fn with_max_concurrency(self, max_concurrency: usize) -> Self { + Self { + max_concurrency, + ..self + } + } + + /// Write data to the uploader + pub async fn put(&mut self, mut bytes: Bytes) -> crate::Result<()> { + // If we have have write_multipart in progress, put into `WriteMultipart` directly. + if let Some(writer) = self.write_multipart.as_mut() { + writer.wait_for_capacity(self.max_concurrency).await?; + writer.put(bytes); + return Ok(()); + } + + // If we have not started a multipart upload, and the buffer is not full, push the bytes. + if self.buffer.content_length() + bytes.len() <= self.chunk_size { + self.buffer.push(bytes); + return Ok(()); + } + + // Otherwise, we need to create a WriteMultipart and push all data in. + let mut write_multipart = WriteMultipart::new_with_chunk_size( + self.store.put_multipart(&self.path).await?, + self.chunk_size, + ); + + // NOTE: buffer length must be less than chunk_size, already checked. + let in_buffer = bytes.split_to(self.chunk_size - self.buffer.content_length()); + self.buffer.push(in_buffer); + write_multipart.put_part(std::mem::take(&mut self.buffer).freeze()); + + // It's possible that we have more data to write. + if bytes.has_remaining() { + write_multipart.put(bytes); + } + + self.write_multipart = Some(write_multipart); + Ok(()) + } + + /// Finish the upload and create the file. + pub async fn finish(&mut self) -> crate::Result { + match self.write_multipart.take() { + Some(w) => w.finish().await, + None => { + self.store + .put(&self.path, std::mem::take(&mut self.buffer).freeze()) + .await + } + } + } + + /// Abort the upload and cleanup all data. + pub async fn abort(&mut self) -> crate::Result<()> { + // take and drop all content in buffer. + std::mem::take(&mut self.buffer); + + if let Some(write_multipart) = self.write_multipart.take() { + write_multipart.abort().await?; + } + Ok(()) + } +} + /// Port of standardised function as requires Rust 1.66 /// /// @@ -547,4 +666,44 @@ mod tests { assert_eq!(response.meta.size, 40); assert_eq!(response.attributes, attributes); } + + #[tokio::test] + async fn test_buf_uploader() { + let store = Arc::new(InMemory::new()) as Arc; + let path = Path::from("file.txt"); + + // Test put + let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30); + writer.put(Bytes::from(vec![0; 20])).await.unwrap(); + writer.put(Bytes::from(vec![0; 5])).await.unwrap(); + writer.finish().await.unwrap(); + let response = store + .get_opts( + &path, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.meta.size, 25); + + // Test multipart + let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30); + writer.put(Bytes::from(vec![0; 20])).await.unwrap(); + writer.put(Bytes::from(vec![0; 20])).await.unwrap(); + writer.finish().await.unwrap(); + let response = store + .get_opts( + &path, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.meta.size, 40); + } } From 38da11fc7dc5eb1afc4bc4f83d57731ea9bcac22 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Jun 2024 14:37:07 +0800 Subject: [PATCH 2/5] Polish tests Signed-off-by: Xuanwo --- object_store/src/buffered.rs | 47 ++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 1f598fdf050b..b753ac72e37a 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -410,9 +410,8 @@ impl AsyncWrite for BufWriter { /// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be /// streamed using [`ObjectStore::put_multipart`] /// -/// # TODO -/// -/// Add attributes and tags support. +/// Notes: Attribute and tag support are not yet implemented +#[derive(Debug)] pub struct BufUploader { store: Arc, path: Path, @@ -424,22 +423,9 @@ pub struct BufUploader { write_multipart: Option, } -impl std::fmt::Debug for BufUploader { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BufUploader") - .field("chunk_size", &self.chunk_size) - .finish() - } -} - impl BufUploader { /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and [`Path`] - pub fn new(store: Arc, path: Path) -> Self { - Self::with_chunk_size(store, path, 5 * 1024 * 1024) - } - - /// Create a new [`BufUploader`] from the provided [`ObjectStore`], [`Path`] and `capacity` - pub fn with_chunk_size(store: Arc, path: Path, chunk_size: usize) -> Self { + pub fn new(store: Arc, path: Path, chunk_size: usize) -> Self { Self { store, path, @@ -535,6 +521,7 @@ mod tests { use crate::memory::InMemory; use crate::path::Path; use crate::{Attribute, GetOptions}; + use itertools::Itertools; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; #[tokio::test] @@ -673,9 +660,15 @@ mod tests { let path = Path::from("file.txt"); // Test put - let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30); - writer.put(Bytes::from(vec![0; 20])).await.unwrap(); - writer.put(Bytes::from(vec![0; 5])).await.unwrap(); + let mut writer = BufUploader::new(Arc::clone(&store), path.clone(), 30); + writer + .put(Bytes::from((0..20).collect_vec())) + .await + .unwrap(); + writer + .put(Bytes::from((20..25).collect_vec())) + .await + .unwrap(); writer.finish().await.unwrap(); let response = store .get_opts( @@ -688,11 +681,18 @@ mod tests { .await .unwrap(); assert_eq!(response.meta.size, 25); + assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec()); // Test multipart - let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), path.clone(), 30); - writer.put(Bytes::from(vec![0; 20])).await.unwrap(); - writer.put(Bytes::from(vec![0; 20])).await.unwrap(); + let mut writer = BufUploader::new(Arc::clone(&store), path.clone(), 30); + writer + .put(Bytes::from((0..20).collect_vec())) + .await + .unwrap(); + writer + .put(Bytes::from((20..40).collect_vec())) + .await + .unwrap(); writer.finish().await.unwrap(); let response = store .get_opts( @@ -705,5 +705,6 @@ mod tests { .await .unwrap(); assert_eq!(response.meta.size, 40); + assert_eq!(response.bytes().await.unwrap(), (0..40).collect_vec()); } } From 591a70b79e3284f1c01c5d5da005d1a701c48f67 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Jun 2024 18:53:34 +0800 Subject: [PATCH 3/5] Merge BufUploader into BufWriter Signed-off-by: Xuanwo --- object_store/src/buffered.rs | 173 ++++++++++++----------------------- 1 file changed, 57 insertions(+), 116 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index b753ac72e37a..6c098f2aa2c6 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -19,10 +19,10 @@ use crate::path::Path; use crate::{ - Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, PutResult, - TagSet, WriteMultipart, + Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet, + WriteMultipart, }; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use std::cmp::Ordering; @@ -238,11 +238,11 @@ enum BufWriterState { /// Buffer up to capacity bytes Buffer(Path, PutPayloadMut), /// [`ObjectStore::put_multipart`] - Prepare(BoxFuture<'static, std::io::Result>), + Prepare(BoxFuture<'static, crate::Result>), /// Write to a multipart upload Write(Option), /// [`ObjectStore::put`] - Flush(BoxFuture<'static, std::io::Result<()>>), + Flush(BoxFuture<'static, crate::Result<()>>), } impl BufWriter { @@ -289,6 +289,52 @@ impl BufWriter { } } + /// Write data to the writer in [`Bytes`]. + /// + /// Unlike [`AsyncWrite::write`], put can write data without extra copying. + /// + /// This API is recommended while the data source generates [`Bytes`]. + pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> { + loop { + return match &mut self.state { + BufWriterState::Write(Some(write)) => { + write.wait_for_capacity(self.max_concurrency).await?; + write.put(bytes); + Ok(()) + } + BufWriterState::Write(None) | BufWriterState::Flush(_) => { + panic!("Already shut down") + } + BufWriterState::Prepare(f) => { + self.state = BufWriterState::Write(f.await?.into()); + continue; + } + BufWriterState::Buffer(path, b) => { + if b.content_length().saturating_add(bytes.len()) < self.capacity { + b.push(bytes); + Ok(()) + } else { + let buffer = std::mem::take(b); + let path = std::mem::take(path); + let opts = PutMultipartOpts { + attributes: self.attributes.take().unwrap_or_default(), + tags: self.tags.take().unwrap_or_default(), + }; + let upload = self.store.put_multipart_opts(&path, opts).await?; + let mut chunked = + WriteMultipart::new_with_chunk_size(upload, self.capacity); + for chunk in buffer.freeze() { + chunked.put(chunk); + } + chunked.put(bytes); + self.state = BufWriterState::Write(Some(chunked)); + Ok(()) + } + } + }; + } + } + /// Abort this writer, cleaning up any partially uploaded state /// /// # Panic @@ -384,7 +430,7 @@ impl AsyncWrite for BufWriter { Ok(()) })); } - BufWriterState::Flush(f) => return f.poll_unpin(cx), + BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from), BufWriterState::Write(x) => { let upload = x.take().unwrap(); self.state = BufWriterState::Flush( @@ -400,111 +446,6 @@ impl AsyncWrite for BufWriter { } } -/// A buffered multipart uploader. -/// -/// This uploader adaptively uses [`ObjectStore::put`] or -/// [`ObjectStore::put_multipart`] depending on the amount of data that has -/// been written. -/// -/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown -/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be -/// streamed using [`ObjectStore::put_multipart`] -/// -/// Notes: Attribute and tag support are not yet implemented -#[derive(Debug)] -pub struct BufUploader { - store: Arc, - path: Path, - - chunk_size: usize, - max_concurrency: usize, - - buffer: PutPayloadMut, - write_multipart: Option, -} - -impl BufUploader { - /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and [`Path`] - pub fn new(store: Arc, path: Path, chunk_size: usize) -> Self { - Self { - store, - path, - chunk_size, - max_concurrency: 8, - buffer: PutPayloadMut::new(), - write_multipart: None, - } - } - - /// Override the maximum number of in-flight requests for this writer - /// - /// Defaults to 8 - pub fn with_max_concurrency(self, max_concurrency: usize) -> Self { - Self { - max_concurrency, - ..self - } - } - - /// Write data to the uploader - pub async fn put(&mut self, mut bytes: Bytes) -> crate::Result<()> { - // If we have have write_multipart in progress, put into `WriteMultipart` directly. - if let Some(writer) = self.write_multipart.as_mut() { - writer.wait_for_capacity(self.max_concurrency).await?; - writer.put(bytes); - return Ok(()); - } - - // If we have not started a multipart upload, and the buffer is not full, push the bytes. - if self.buffer.content_length() + bytes.len() <= self.chunk_size { - self.buffer.push(bytes); - return Ok(()); - } - - // Otherwise, we need to create a WriteMultipart and push all data in. - let mut write_multipart = WriteMultipart::new_with_chunk_size( - self.store.put_multipart(&self.path).await?, - self.chunk_size, - ); - - // NOTE: buffer length must be less than chunk_size, already checked. - let in_buffer = bytes.split_to(self.chunk_size - self.buffer.content_length()); - self.buffer.push(in_buffer); - write_multipart.put_part(std::mem::take(&mut self.buffer).freeze()); - - // It's possible that we have more data to write. - if bytes.has_remaining() { - write_multipart.put(bytes); - } - - self.write_multipart = Some(write_multipart); - Ok(()) - } - - /// Finish the upload and create the file. - pub async fn finish(&mut self) -> crate::Result { - match self.write_multipart.take() { - Some(w) => w.finish().await, - None => { - self.store - .put(&self.path, std::mem::take(&mut self.buffer).freeze()) - .await - } - } - } - - /// Abort the upload and cleanup all data. - pub async fn abort(&mut self) -> crate::Result<()> { - // take and drop all content in buffer. - std::mem::take(&mut self.buffer); - - if let Some(write_multipart) = self.write_multipart.take() { - write_multipart.abort().await?; - } - Ok(()) - } -} - /// Port of standardised function as requires Rust 1.66 /// /// @@ -655,12 +596,12 @@ mod tests { } #[tokio::test] - async fn test_buf_uploader() { + async fn test_buf_writer_with_put() { let store = Arc::new(InMemory::new()) as Arc; let path = Path::from("file.txt"); // Test put - let mut writer = BufUploader::new(Arc::clone(&store), path.clone(), 30); + let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30); writer .put(Bytes::from((0..20).collect_vec())) .await @@ -669,7 +610,7 @@ mod tests { .put(Bytes::from((20..25).collect_vec())) .await .unwrap(); - writer.finish().await.unwrap(); + writer.shutdown().await.unwrap(); let response = store .get_opts( &path, @@ -684,7 +625,7 @@ mod tests { assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec()); // Test multipart - let mut writer = BufUploader::new(Arc::clone(&store), path.clone(), 30); + let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30); writer .put(Bytes::from((0..20).collect_vec())) .await @@ -693,7 +634,7 @@ mod tests { .put(Bytes::from((20..40).collect_vec())) .await .unwrap(); - writer.finish().await.unwrap(); + writer.shutdown().await.unwrap(); let response = store .get_opts( &path, From 27cbb33a6ddf0e60751da5fd153e527266fff93c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Jun 2024 18:56:23 +0800 Subject: [PATCH 4/5] Fix docs Signed-off-by: Xuanwo --- object_store/src/buffered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 6c098f2aa2c6..9c81cc5fe19e 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -291,7 +291,7 @@ impl BufWriter { /// Write data to the writer in [`Bytes`]. /// - /// Unlike [`AsyncWrite::write`], put can write data without extra copying. + /// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra copying. /// /// This API is recommended while the data source generates [`Bytes`]. pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> { From 56fc2a57119a1807666ece930095524ad77be947 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Jun 2024 19:02:27 +0800 Subject: [PATCH 5/5] Add comment Signed-off-by: Xuanwo --- object_store/src/buffered.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 9c81cc5fe19e..c7b71aa1cc2d 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -305,6 +305,12 @@ impl BufWriter { BufWriterState::Write(None) | BufWriterState::Flush(_) => { panic!("Already shut down") } + // NOTE + // + // This case should never happen in practice, but rust async API does + // make it possible for users to call `put` before `poll_write` returns `Ready`. + // + // We allow such usage by `await` the future and continue the loop. BufWriterState::Prepare(f) => { self.state = BufWriterState::Write(f.await?.into()); continue;