Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object_store): Add put API for buffered::BufWriter #5835

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ enum BufWriterState {
/// Buffer up to capacity bytes
Buffer(Path, PutPayloadMut),
/// [`ObjectStore::put_multipart`]
Prepare(BoxFuture<'static, std::io::Result<WriteMultipart>>),
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
/// Write to a multipart upload
Write(Option<WriteMultipart>),
/// [`ObjectStore::put`]
Flush(BoxFuture<'static, std::io::Result<()>>),
Flush(BoxFuture<'static, crate::Result<()>>),
}

impl BufWriter {
Expand Down Expand Up @@ -289,6 +289,58 @@ impl BufWriter {
}
}

/// Write data to the writer in [`Bytes`].
///
/// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra copying.
///
/// This API is recommended while the data source generates [`Bytes`].
pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> {
loop {
return match &mut self.state {
BufWriterState::Write(Some(write)) => {
write.wait_for_capacity(self.max_concurrency).await?;
write.put(bytes);
Ok(())
}
BufWriterState::Write(None) | BufWriterState::Flush(_) => {
panic!("Already shut down")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function returns a Result is there any reason to panic here rather than returning Err?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the same behavior in poll_write. It should not happen that put come into this state.

I'm fine if we want to return error in both cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
// NOTE
//
// This case should never happen in practice, but rust async API does
// make it possible for users to call `put` before `poll_write` returns `Ready`.
//
// We allow such usage by `await` the future and continue the loop.
BufWriterState::Prepare(f) => {
self.state = BufWriterState::Write(f.await?.into());
continue;
}
BufWriterState::Buffer(path, b) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i took a look at trying to combine this code with the very similar code in poll_write but I couldn't come up with anything that was particularly good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a bit clumsy to refactor the same code for both poll-based and async-await systems just for 10 lines.

Copy link
Member Author

@Xuanwo Xuanwo Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is I want to avoid the extra cost of Box::pin(fut) by awaiting it in place.

if b.content_length().saturating_add(bytes.len()) < self.capacity {
b.push(bytes);
Ok(())
} else {
let buffer = std::mem::take(b);
let path = std::mem::take(path);
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
};
let upload = self.store.put_multipart_opts(&path, opts).await?;
let mut chunked =
WriteMultipart::new_with_chunk_size(upload, self.capacity);
for chunk in buffer.freeze() {
chunked.put(chunk);
}
chunked.put(bytes);
self.state = BufWriterState::Write(Some(chunked));
Ok(())
}
}
};
}
}

/// Abort this writer, cleaning up any partially uploaded state
///
/// # Panic
Expand Down Expand Up @@ -384,7 +436,7 @@ impl AsyncWrite for BufWriter {
Ok(())
}));
}
BufWriterState::Flush(f) => return f.poll_unpin(cx),
BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from),
BufWriterState::Write(x) => {
let upload = x.take().unwrap();
self.state = BufWriterState::Flush(
Expand Down Expand Up @@ -416,6 +468,7 @@ mod tests {
use crate::memory::InMemory;
use crate::path::Path;
use crate::{Attribute, GetOptions};
use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

#[tokio::test]
Expand Down Expand Up @@ -547,4 +600,58 @@ mod tests {
assert_eq!(response.meta.size, 40);
assert_eq!(response.attributes, attributes);
}

#[tokio::test]
async fn test_buf_writer_with_put() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = Path::from("file.txt");

// Test put
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
writer
.put(Bytes::from((0..20).collect_vec()))
.await
.unwrap();
writer
.put(Bytes::from((20..25).collect_vec()))
.await
.unwrap();
writer.shutdown().await.unwrap();
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 25);
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec());

// Test multipart
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
writer
.put(Bytes::from((0..20).collect_vec()))
.await
.unwrap();
writer
.put(Bytes::from((20..40).collect_vec()))
.await
.unwrap();
writer.shutdown().await.unwrap();
let response = store
.get_opts(
&path,
GetOptions {
head: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(response.meta.size, 40);
assert_eq!(response.bytes().await.unwrap(), (0..40).collect_vec());
}
}
Loading