Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bytes::from_owner #742

Merged
merged 28 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 174 additions & 3 deletions src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::iter::FromIterator;
use core::mem::{self, ManuallyDrop};
use core::ops::{Deref, RangeBounds};
use core::ptr::NonNull;
use core::{cmp, fmt, hash, ptr, slice, usize};

use alloc::{
Expand Down Expand Up @@ -200,6 +201,94 @@ impl Bytes {
}
}

/// Create [Bytes] with a buffer whose lifetime is controlled
/// via an explicit owner.
///
/// A common use case is to zero-copy construct from mapped memory.
///
/// ```
/// # struct File;
/// #
/// # impl File {
/// # pub fn open(_: &str) -> Result<Self, ()> {
/// # Ok(Self)
/// # }
/// # }
/// #
/// # mod memmap2 {
/// # pub struct Mmap;
/// #
/// # impl Mmap {
/// # pub unsafe fn map(_file: &super::File) -> Result<Self, ()> {
/// # Ok(Self)
/// # }
/// # }
/// #
/// # impl AsRef<[u8]> for Mmap {
/// # fn as_ref(&self) -> &[u8] {
/// # b"buf"
/// # }
/// # }
/// # }
/// use bytes::Bytes;
/// use memmap2::Mmap;
///
/// # fn main() -> Result<(), ()> {
/// let file = File::open("upload_bundle.tar.gz")?;
/// let mmap = unsafe { Mmap::map(&file) }?;
/// let b = Bytes::from_owner(mmap);
/// # Ok(())
/// # }
/// ```
///
/// The `owner` will be transferred to the constructed [Bytes] object, which
/// will ensure it is dropped once all remaining clones of the constructed
/// object are dropped. The owner will then be responsible for dropping the
/// specified region of memory as part of its [Drop] implementation.
///
/// Note that converting [Bytes] constructed from an owner into a [BytesMut]
/// will always create a deep copy of the buffer into newly allocated memory.
pub fn from_owner<T>(owner: T) -> Self
where
T: AsRef<[u8]> + Send + 'static,
{
// Safety & Miri:
// The ownership of `owner` is first transferred to the `Owned` wrapper and `Bytes` object.
// This ensures that the owner is pinned in memory, allowing us to call `.as_ref()` safely
// since the lifetime of the owner is controlled by the lifetime of the new `Bytes` object,
// and the lifetime of the resulting borrowed `&[u8]` matches that of the owner.
// Note that this remains safe so long as we only call `.as_ref()` once.
//
// There are some additional special considerations here:
// * We rely on Bytes's Drop impl to clean up memory should `.as_ref()` panic.
// * Setting the `ptr` and `len` on the bytes object last (after moving the owner to
// Bytes) allows Miri checks to pass since it avoids obtaining the `&[u8]` slice
// from a stack-owned Box.
// More details on this: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813375863
// and: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813316032

let owned = Box::into_raw(Box::new(Owned {
lifetime: OwnedLifetime {
ref_cnt: AtomicUsize::new(1),
drop: owned_box_and_drop::<T>,
},
owner,
}));

let mut ret = Bytes {
ptr: NonNull::dangling().as_ptr(),
len: 0,
data: AtomicPtr::new(owned.cast()),
vtable: &OWNED_VTABLE,
};

let buf = unsafe { &*owned }.owner.as_ref();
ret.ptr = buf.as_ptr();
ret.len = buf.len();

ret
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -230,14 +319,16 @@ impl Bytes {
self.len == 0
}

/// Returns true if this is the only reference to the data.
/// Returns true if this is the only reference to the data and
/// `Into<BytesMut>` would avoid cloning the underlying buffer.
///
/// Always returns false if the data is backed by a static slice.
/// Always returns false if the data is backed by a [static slice](Bytes::from_static),
/// or an [owner](Bytes::from_owner).
///
/// The result of this method may be invalidated immediately if another
/// thread clones this value while this is being called. Ensure you have
/// unique access to this value (`&mut Bytes`) first if you need to be
/// certain the result is valid (i.e. for safety reasons)
/// certain the result is valid (i.e. for safety reasons).
/// # Examples
///
/// ```
Expand Down Expand Up @@ -536,6 +627,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via either
/// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static).
///
/// # Examples
///
/// ```
Expand Down Expand Up @@ -1012,6 +1106,83 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}

// ===== impl OwnedVtable =====

#[repr(C)]
struct OwnedLifetime {
ref_cnt: AtomicUsize,
drop: unsafe fn(*mut ()),
}

#[repr(C)]
struct Owned<T> {
lifetime: OwnedLifetime,
owner: T,
}

unsafe fn owned_box_and_drop<T>(ptr: *mut ()) {
let b: Box<Owned<T>> = Box::from_raw(ptr as _);
drop(b);
}

unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let owned = data.load(Ordering::Relaxed);
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed);
if old_cnt > usize::MAX >> 1 {
crate::abort()
}

Bytes {
ptr,
len,
data: AtomicPtr::new(owned as _),
vtable: &OWNED_VTABLE,
}
}

unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let slice = slice::from_raw_parts(ptr, len);
slice.to_vec()
}

unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len));
owned_drop_impl(data.load(Ordering::Relaxed));
bytes_mut
}

unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool {
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;

let old_cnt = ref_cnt.fetch_sub(1, Ordering::Release);
amunra marked this conversation as resolved.
Show resolved Hide resolved
if old_cnt != 1 {
return;
}
ref_cnt.load(Ordering::Acquire);

let drop_fn = &(*lifetime).drop;
drop_fn(owned)
}

unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
let owned = data.load(Ordering::Relaxed);
owned_drop_impl(owned);
}

static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
to_vec: owned_to_vec,
to_mut: owned_to_mut,
is_unique: owned_is_unique,
drop: owned_drop,
};

// ===== impl PromotableVtable =====

static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
Expand Down
152 changes: 152 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![warn(rust_2018_idioms)]

use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use std::panic::{self, AssertUnwindSafe};
use std::usize;

const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb";
Expand Down Expand Up @@ -1479,3 +1482,152 @@ fn split_to_empty_addr_mut() {
let _ = &empty_start[..];
let _ = &buf[..];
}

#[derive(Clone)]
struct SharedAtomicCounter(Arc<AtomicUsize>);

impl SharedAtomicCounter {
pub fn new() -> Self {
SharedAtomicCounter(Arc::new(AtomicUsize::new(0)))
}

pub fn increment(&self) {
self.0.fetch_add(1, Ordering::AcqRel);
}

pub fn get(&self) -> usize {
self.0.load(Ordering::Acquire)
}
}

#[derive(Clone)]
struct OwnedTester<const L: usize> {
buf: [u8; L],
drop_count: SharedAtomicCounter,
pub panic_as_ref: bool,
}

impl<const L: usize> OwnedTester<L> {
fn new(buf: [u8; L], drop_count: SharedAtomicCounter) -> Self {
Self {
buf,
drop_count,
panic_as_ref: false,
}
}
}

impl<const L: usize> AsRef<[u8]> for OwnedTester<L> {
fn as_ref(&self) -> &[u8] {
if self.panic_as_ref {
panic!("test-triggered panic in `AsRef<[u8]> for OwnedTester`");
}
self.buf.as_slice()
}
}

impl<const L: usize> Drop for OwnedTester<L> {
fn drop(&mut self) {
self.drop_count.increment();
}
}

#[test]
fn owned_is_unique_always_false() {
let b1 = Bytes::from_owner([1, 2, 3, 4, 5, 6, 7]);
assert!(!b1.is_unique()); // even if ref_cnt == 1
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
drop(b1);
assert!(!b2.is_unique()); // even if ref_cnt == 1
}

#[test]
fn owned_buf_sharing() {
let buf = [1, 2, 3, 4, 5, 6, 7];
let b1 = Bytes::from_owner(buf);
let b2 = b1.clone();
assert_eq!(&buf[..], &b1[..]);
assert_eq!(&buf[..], &b2[..]);
assert_eq!(b1.as_ptr(), b2.as_ptr());
assert_eq!(b1.len(), b2.len());
assert_eq!(b1.len(), buf.len());
}

#[test]
fn owned_buf_slicing() {
let b1 = Bytes::from_owner(SHORT);
assert_eq!(SHORT, &b1[..]);
let b2 = b1.slice(1..(b1.len() - 1));
assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2);
assert_eq!(unsafe { SHORT.as_ptr().add(1) }, b2.as_ptr());
assert_eq!(SHORT.len() - 2, b2.len());
}

#[test]
fn owned_dropped_exactly_once() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
let b2 = b1.clone();
assert_eq!(drop_counter.get(), 0);
drop(b1);
assert_eq!(drop_counter.get(), 0);
let b3 = b2.slice(1..b2.len() - 1);
drop(b2);
assert_eq!(drop_counter.get(), 0);
drop(b3);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance has a ref_cnt == 1.
let b1 = b1.try_into_mut().unwrap_err();

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

// `.into::<BytesMut>()` has correctly dropped the owner
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_to_vec() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

let v1 = b1.to_vec();
assert_eq!(&v1[..], &buf[..]);
assert_eq!(&v1[..], &b1[..]);

drop(b1);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_safe_drop_on_as_ref_panic() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let mut owner = OwnedTester::new(buf, drop_counter.clone());
owner.panic_as_ref = true;

let result = panic::catch_unwind(AssertUnwindSafe(|| {
let _ = Bytes::from_owner(owner);
}));

assert!(result.is_err());
assert_eq!(drop_counter.get(), 1);
}