diff --git a/src/bytes.rs b/src/bytes.rs index 8d7abd040..96f834f43 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -1,6 +1,7 @@ use core::iter::FromIterator; use core::mem::{self, ManuallyDrop}; use core::ops::{Deref, RangeBounds}; +use core::ptr::NonNull; use core::{cmp, fmt, hash, ptr, slice, usize}; use alloc::{ @@ -200,6 +201,94 @@ impl Bytes { } } + /// Create [Bytes] with a buffer whose lifetime is controlled + /// via an explicit owner. + /// + /// A common use case is to zero-copy construct from mapped memory. + /// + /// ``` + /// # struct File; + /// # + /// # impl File { + /// # pub fn open(_: &str) -> Result { + /// # Ok(Self) + /// # } + /// # } + /// # + /// # mod memmap2 { + /// # pub struct Mmap; + /// # + /// # impl Mmap { + /// # pub unsafe fn map(_file: &super::File) -> Result { + /// # Ok(Self) + /// # } + /// # } + /// # + /// # impl AsRef<[u8]> for Mmap { + /// # fn as_ref(&self) -> &[u8] { + /// # b"buf" + /// # } + /// # } + /// # } + /// use bytes::Bytes; + /// use memmap2::Mmap; + /// + /// # fn main() -> Result<(), ()> { + /// let file = File::open("upload_bundle.tar.gz")?; + /// let mmap = unsafe { Mmap::map(&file) }?; + /// let b = Bytes::from_owner(mmap); + /// # Ok(()) + /// # } + /// ``` + /// + /// The `owner` will be transferred to the constructed [Bytes] object, which + /// will ensure it is dropped once all remaining clones of the constructed + /// object are dropped. The owner will then be responsible for dropping the + /// specified region of memory as part of its [Drop] implementation. + /// + /// Note that converting [Bytes] constructed from an owner into a [BytesMut] + /// will always create a deep copy of the buffer into newly allocated memory. + pub fn from_owner(owner: T) -> Self + where + T: AsRef<[u8]> + Send + 'static, + { + // Safety & Miri: + // The ownership of `owner` is first transferred to the `Owned` wrapper and `Bytes` object. + // This ensures that the owner is pinned in memory, allowing us to call `.as_ref()` safely + // since the lifetime of the owner is controlled by the lifetime of the new `Bytes` object, + // and the lifetime of the resulting borrowed `&[u8]` matches that of the owner. + // Note that this remains safe so long as we only call `.as_ref()` once. + // + // There are some additional special considerations here: + // * We rely on Bytes's Drop impl to clean up memory should `.as_ref()` panic. + // * Setting the `ptr` and `len` on the bytes object last (after moving the owner to + // Bytes) allows Miri checks to pass since it avoids obtaining the `&[u8]` slice + // from a stack-owned Box. + // More details on this: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813375863 + // and: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813316032 + + let owned = Box::into_raw(Box::new(Owned { + lifetime: OwnedLifetime { + ref_cnt: AtomicUsize::new(1), + drop: owned_box_and_drop::, + }, + owner, + })); + + let mut ret = Bytes { + ptr: NonNull::dangling().as_ptr(), + len: 0, + data: AtomicPtr::new(owned.cast()), + vtable: &OWNED_VTABLE, + }; + + let buf = unsafe { &*owned }.owner.as_ref(); + ret.ptr = buf.as_ptr(); + ret.len = buf.len(); + + ret + } + /// Returns the number of bytes contained in this `Bytes`. /// /// # Examples @@ -230,14 +319,16 @@ impl Bytes { self.len == 0 } - /// Returns true if this is the only reference to the data. + /// Returns true if this is the only reference to the data and + /// `Into` would avoid cloning the underlying buffer. /// - /// Always returns false if the data is backed by a static slice. + /// Always returns false if the data is backed by a [static slice](Bytes::from_static), + /// or an [owner](Bytes::from_owner). /// /// The result of this method may be invalidated immediately if another /// thread clones this value while this is being called. Ensure you have /// unique access to this value (`&mut Bytes`) first if you need to be - /// certain the result is valid (i.e. for safety reasons) + /// certain the result is valid (i.e. for safety reasons). /// # Examples /// /// ``` @@ -536,6 +627,9 @@ impl Bytes { /// If `self` is not unique for the entire original buffer, this will fail /// and return self. /// + /// This will also always fail if the buffer was constructed via either + /// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static). + /// /// # Examples /// /// ``` @@ -1012,6 +1106,83 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) { // nothing to drop for &'static [u8] } +// ===== impl OwnedVtable ===== + +#[repr(C)] +struct OwnedLifetime { + ref_cnt: AtomicUsize, + drop: unsafe fn(*mut ()), +} + +#[repr(C)] +struct Owned { + lifetime: OwnedLifetime, + owner: T, +} + +unsafe fn owned_box_and_drop(ptr: *mut ()) { + let b: Box> = Box::from_raw(ptr as _); + drop(b); +} + +unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { + let owned = data.load(Ordering::Relaxed); + let ref_cnt = &(*owned.cast::()).ref_cnt; + let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed); + if old_cnt > usize::MAX >> 1 { + crate::abort() + } + + Bytes { + ptr, + len, + data: AtomicPtr::new(owned as _), + vtable: &OWNED_VTABLE, + } +} + +unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + let slice = slice::from_raw_parts(ptr, len); + slice.to_vec() +} + +unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { + let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len)); + owned_drop_impl(data.load(Ordering::Relaxed)); + bytes_mut +} + +unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool { + false +} + +unsafe fn owned_drop_impl(owned: *mut ()) { + let lifetime = owned.cast::(); + let ref_cnt = &(*lifetime).ref_cnt; + + let old_cnt = ref_cnt.fetch_sub(1, Ordering::Release); + if old_cnt != 1 { + return; + } + ref_cnt.load(Ordering::Acquire); + + let drop_fn = &(*lifetime).drop; + drop_fn(owned) +} + +unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { + let owned = data.load(Ordering::Relaxed); + owned_drop_impl(owned); +} + +static OWNED_VTABLE: Vtable = Vtable { + clone: owned_clone, + to_vec: owned_to_vec, + to_mut: owned_to_mut, + is_unique: owned_is_unique, + drop: owned_drop, +}; + // ===== impl PromotableVtable ===== static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable { diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 71f0e6681..fdc36ce8d 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -1,7 +1,10 @@ #![warn(rust_2018_idioms)] use bytes::{Buf, BufMut, Bytes, BytesMut}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::panic::{self, AssertUnwindSafe}; use std::usize; const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb"; @@ -1479,3 +1482,152 @@ fn split_to_empty_addr_mut() { let _ = &empty_start[..]; let _ = &buf[..]; } + +#[derive(Clone)] +struct SharedAtomicCounter(Arc); + +impl SharedAtomicCounter { + pub fn new() -> Self { + SharedAtomicCounter(Arc::new(AtomicUsize::new(0))) + } + + pub fn increment(&self) { + self.0.fetch_add(1, Ordering::AcqRel); + } + + pub fn get(&self) -> usize { + self.0.load(Ordering::Acquire) + } +} + +#[derive(Clone)] +struct OwnedTester { + buf: [u8; L], + drop_count: SharedAtomicCounter, + pub panic_as_ref: bool, +} + +impl OwnedTester { + fn new(buf: [u8; L], drop_count: SharedAtomicCounter) -> Self { + Self { + buf, + drop_count, + panic_as_ref: false, + } + } +} + +impl AsRef<[u8]> for OwnedTester { + fn as_ref(&self) -> &[u8] { + if self.panic_as_ref { + panic!("test-triggered panic in `AsRef<[u8]> for OwnedTester`"); + } + self.buf.as_slice() + } +} + +impl Drop for OwnedTester { + fn drop(&mut self) { + self.drop_count.increment(); + } +} + +#[test] +fn owned_is_unique_always_false() { + let b1 = Bytes::from_owner([1, 2, 3, 4, 5, 6, 7]); + assert!(!b1.is_unique()); // even if ref_cnt == 1 + let b2 = b1.clone(); + assert!(!b1.is_unique()); + assert!(!b2.is_unique()); + drop(b1); + assert!(!b2.is_unique()); // even if ref_cnt == 1 +} + +#[test] +fn owned_buf_sharing() { + let buf = [1, 2, 3, 4, 5, 6, 7]; + let b1 = Bytes::from_owner(buf); + let b2 = b1.clone(); + assert_eq!(&buf[..], &b1[..]); + assert_eq!(&buf[..], &b2[..]); + assert_eq!(b1.as_ptr(), b2.as_ptr()); + assert_eq!(b1.len(), b2.len()); + assert_eq!(b1.len(), buf.len()); +} + +#[test] +fn owned_buf_slicing() { + let b1 = Bytes::from_owner(SHORT); + assert_eq!(SHORT, &b1[..]); + let b2 = b1.slice(1..(b1.len() - 1)); + assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2); + assert_eq!(unsafe { SHORT.as_ptr().add(1) }, b2.as_ptr()); + assert_eq!(SHORT.len() - 2, b2.len()); +} + +#[test] +fn owned_dropped_exactly_once() { + let buf: [u8; 5] = [1, 2, 3, 4, 5]; + let drop_counter = SharedAtomicCounter::new(); + let owner = OwnedTester::new(buf, drop_counter.clone()); + let b1 = Bytes::from_owner(owner); + let b2 = b1.clone(); + assert_eq!(drop_counter.get(), 0); + drop(b1); + assert_eq!(drop_counter.get(), 0); + let b3 = b2.slice(1..b2.len() - 1); + drop(b2); + assert_eq!(drop_counter.get(), 0); + drop(b3); + assert_eq!(drop_counter.get(), 1); +} + +#[test] +fn owned_to_mut() { + let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let drop_counter = SharedAtomicCounter::new(); + let owner = OwnedTester::new(buf, drop_counter.clone()); + let b1 = Bytes::from_owner(owner); + + // Holding an owner will fail converting to a BytesMut, + // even when the bytes instance has a ref_cnt == 1. + let b1 = b1.try_into_mut().unwrap_err(); + + // That said, it's still possible, just not cheap. + let bm1: BytesMut = b1.into(); + let new_buf = &bm1[..]; + assert_eq!(new_buf, &buf[..]); + + // `.into::()` has correctly dropped the owner + assert_eq!(drop_counter.get(), 1); +} + +#[test] +fn owned_to_vec() { + let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let drop_counter = SharedAtomicCounter::new(); + let owner = OwnedTester::new(buf, drop_counter.clone()); + let b1 = Bytes::from_owner(owner); + + let v1 = b1.to_vec(); + assert_eq!(&v1[..], &buf[..]); + assert_eq!(&v1[..], &b1[..]); + + drop(b1); + assert_eq!(drop_counter.get(), 1); +} + +#[test] +fn owned_safe_drop_on_as_ref_panic() { + let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let drop_counter = SharedAtomicCounter::new(); + let mut owner = OwnedTester::new(buf, drop_counter.clone()); + owner.panic_as_ref = true; + + let result = panic::catch_unwind(AssertUnwindSafe(|| { + let _ = Bytes::from_owner(owner); + })); + + assert!(result.is_err()); + assert_eq!(drop_counter.get(), 1); +}