Skip to content

Commit

Permalink
Add Bytes::from_owner (#742)
Browse files Browse the repository at this point in the history
  • Loading branch information
amunra authored Oct 29, 2024
1 parent c45697c commit 30ee8e9
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 3 deletions.
177 changes: 174 additions & 3 deletions src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::iter::FromIterator;
use core::mem::{self, ManuallyDrop};
use core::ops::{Deref, RangeBounds};
use core::ptr::NonNull;
use core::{cmp, fmt, hash, ptr, slice, usize};

use alloc::{
Expand Down Expand Up @@ -200,6 +201,94 @@ impl Bytes {
}
}

/// Create [Bytes] with a buffer whose lifetime is controlled
/// via an explicit owner.
///
/// A common use case is to zero-copy construct from mapped memory.
///
/// ```
/// # struct File;
/// #
/// # impl File {
/// # pub fn open(_: &str) -> Result<Self, ()> {
/// # Ok(Self)
/// # }
/// # }
/// #
/// # mod memmap2 {
/// # pub struct Mmap;
/// #
/// # impl Mmap {
/// # pub unsafe fn map(_file: &super::File) -> Result<Self, ()> {
/// # Ok(Self)
/// # }
/// # }
/// #
/// # impl AsRef<[u8]> for Mmap {
/// # fn as_ref(&self) -> &[u8] {
/// # b"buf"
/// # }
/// # }
/// # }
/// use bytes::Bytes;
/// use memmap2::Mmap;
///
/// # fn main() -> Result<(), ()> {
/// let file = File::open("upload_bundle.tar.gz")?;
/// let mmap = unsafe { Mmap::map(&file) }?;
/// let b = Bytes::from_owner(mmap);
/// # Ok(())
/// # }
/// ```
///
/// The `owner` will be transferred to the constructed [Bytes] object, which
/// will ensure it is dropped once all remaining clones of the constructed
/// object are dropped. The owner will then be responsible for dropping the
/// specified region of memory as part of its [Drop] implementation.
///
/// Note that converting [Bytes] constructed from an owner into a [BytesMut]
/// will always create a deep copy of the buffer into newly allocated memory.
pub fn from_owner<T>(owner: T) -> Self
where
T: AsRef<[u8]> + Send + 'static,
{
// Safety & Miri:
// The ownership of `owner` is first transferred to the `Owned` wrapper and `Bytes` object.
// This ensures that the owner is pinned in memory, allowing us to call `.as_ref()` safely
// since the lifetime of the owner is controlled by the lifetime of the new `Bytes` object,
// and the lifetime of the resulting borrowed `&[u8]` matches that of the owner.
// Note that this remains safe so long as we only call `.as_ref()` once.
//
// There are some additional special considerations here:
// * We rely on Bytes's Drop impl to clean up memory should `.as_ref()` panic.
// * Setting the `ptr` and `len` on the bytes object last (after moving the owner to
// Bytes) allows Miri checks to pass since it avoids obtaining the `&[u8]` slice
// from a stack-owned Box.
// More details on this: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813375863
// and: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813316032

let owned = Box::into_raw(Box::new(Owned {
lifetime: OwnedLifetime {
ref_cnt: AtomicUsize::new(1),
drop: owned_box_and_drop::<T>,
},
owner,
}));

let mut ret = Bytes {
ptr: NonNull::dangling().as_ptr(),
len: 0,
data: AtomicPtr::new(owned.cast()),
vtable: &OWNED_VTABLE,
};

let buf = unsafe { &*owned }.owner.as_ref();
ret.ptr = buf.as_ptr();
ret.len = buf.len();

ret
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -230,14 +319,16 @@ impl Bytes {
self.len == 0
}

/// Returns true if this is the only reference to the data.
/// Returns true if this is the only reference to the data and
/// `Into<BytesMut>` would avoid cloning the underlying buffer.
///
/// Always returns false if the data is backed by a static slice.
/// Always returns false if the data is backed by a [static slice](Bytes::from_static),
/// or an [owner](Bytes::from_owner).
///
/// The result of this method may be invalidated immediately if another
/// thread clones this value while this is being called. Ensure you have
/// unique access to this value (`&mut Bytes`) first if you need to be
/// certain the result is valid (i.e. for safety reasons)
/// certain the result is valid (i.e. for safety reasons).
/// # Examples
///
/// ```
Expand Down Expand Up @@ -536,6 +627,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via either
/// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static).
///
/// # Examples
///
/// ```
Expand Down Expand Up @@ -1012,6 +1106,83 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}

// ===== impl OwnedVtable =====

#[repr(C)]
struct OwnedLifetime {
ref_cnt: AtomicUsize,
drop: unsafe fn(*mut ()),
}

#[repr(C)]
struct Owned<T> {
lifetime: OwnedLifetime,
owner: T,
}

unsafe fn owned_box_and_drop<T>(ptr: *mut ()) {
let b: Box<Owned<T>> = Box::from_raw(ptr as _);
drop(b);
}

unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let owned = data.load(Ordering::Relaxed);
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed);
if old_cnt > usize::MAX >> 1 {
crate::abort()
}

Bytes {
ptr,
len,
data: AtomicPtr::new(owned as _),
vtable: &OWNED_VTABLE,
}
}

unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let slice = slice::from_raw_parts(ptr, len);
slice.to_vec()
}

unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len));
owned_drop_impl(data.load(Ordering::Relaxed));
bytes_mut
}

unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool {
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;

let old_cnt = ref_cnt.fetch_sub(1, Ordering::Release);
if old_cnt != 1 {
return;
}
ref_cnt.load(Ordering::Acquire);

let drop_fn = &(*lifetime).drop;
drop_fn(owned)
}

unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
let owned = data.load(Ordering::Relaxed);
owned_drop_impl(owned);
}

static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
to_vec: owned_to_vec,
to_mut: owned_to_mut,
is_unique: owned_is_unique,
drop: owned_drop,
};

// ===== impl PromotableVtable =====

static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
Expand Down
152 changes: 152 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![warn(rust_2018_idioms)]

use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use std::panic::{self, AssertUnwindSafe};
use std::usize;

const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb";
Expand Down Expand Up @@ -1479,3 +1482,152 @@ fn split_to_empty_addr_mut() {
let _ = &empty_start[..];
let _ = &buf[..];
}

#[derive(Clone)]
struct SharedAtomicCounter(Arc<AtomicUsize>);

impl SharedAtomicCounter {
pub fn new() -> Self {
SharedAtomicCounter(Arc::new(AtomicUsize::new(0)))
}

pub fn increment(&self) {
self.0.fetch_add(1, Ordering::AcqRel);
}

pub fn get(&self) -> usize {
self.0.load(Ordering::Acquire)
}
}

#[derive(Clone)]
struct OwnedTester<const L: usize> {
buf: [u8; L],
drop_count: SharedAtomicCounter,
pub panic_as_ref: bool,
}

impl<const L: usize> OwnedTester<L> {
fn new(buf: [u8; L], drop_count: SharedAtomicCounter) -> Self {
Self {
buf,
drop_count,
panic_as_ref: false,
}
}
}

impl<const L: usize> AsRef<[u8]> for OwnedTester<L> {
fn as_ref(&self) -> &[u8] {
if self.panic_as_ref {
panic!("test-triggered panic in `AsRef<[u8]> for OwnedTester`");
}
self.buf.as_slice()
}
}

impl<const L: usize> Drop for OwnedTester<L> {
fn drop(&mut self) {
self.drop_count.increment();
}
}

#[test]
fn owned_is_unique_always_false() {
let b1 = Bytes::from_owner([1, 2, 3, 4, 5, 6, 7]);
assert!(!b1.is_unique()); // even if ref_cnt == 1
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
drop(b1);
assert!(!b2.is_unique()); // even if ref_cnt == 1
}

#[test]
fn owned_buf_sharing() {
let buf = [1, 2, 3, 4, 5, 6, 7];
let b1 = Bytes::from_owner(buf);
let b2 = b1.clone();
assert_eq!(&buf[..], &b1[..]);
assert_eq!(&buf[..], &b2[..]);
assert_eq!(b1.as_ptr(), b2.as_ptr());
assert_eq!(b1.len(), b2.len());
assert_eq!(b1.len(), buf.len());
}

#[test]
fn owned_buf_slicing() {
let b1 = Bytes::from_owner(SHORT);
assert_eq!(SHORT, &b1[..]);
let b2 = b1.slice(1..(b1.len() - 1));
assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2);
assert_eq!(unsafe { SHORT.as_ptr().add(1) }, b2.as_ptr());
assert_eq!(SHORT.len() - 2, b2.len());
}

#[test]
fn owned_dropped_exactly_once() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
let b2 = b1.clone();
assert_eq!(drop_counter.get(), 0);
drop(b1);
assert_eq!(drop_counter.get(), 0);
let b3 = b2.slice(1..b2.len() - 1);
drop(b2);
assert_eq!(drop_counter.get(), 0);
drop(b3);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance has a ref_cnt == 1.
let b1 = b1.try_into_mut().unwrap_err();

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

// `.into::<BytesMut>()` has correctly dropped the owner
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_to_vec() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

let v1 = b1.to_vec();
assert_eq!(&v1[..], &buf[..]);
assert_eq!(&v1[..], &b1[..]);

drop(b1);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_safe_drop_on_as_ref_panic() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let mut owner = OwnedTester::new(buf, drop_counter.clone());
owner.panic_as_ref = true;

let result = panic::catch_unwind(AssertUnwindSafe(|| {
let _ = Bytes::from_owner(owner);
}));

assert!(result.is_err());
assert_eq!(drop_counter.get(), 1);
}

0 comments on commit 30ee8e9

Please sign in to comment.