-
Notifications
You must be signed in to change notification settings - Fork 843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Allocator
type param to MutableBuffer
#6336
Changes from 5 commits
8be6078
c12e5f8
609d93c
227e667
05fd1cb
cbcbc51
b8d2924
4934043
d2fc15e
36eccf6
fa587a0
6b28b10
ceef772
02d73ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,9 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#[cfg(feature = "allocator_api")] | ||
use std::alloc::{Allocator, Global}; | ||
|
||
use std::alloc::{handle_alloc_error, Layout}; | ||
use std::mem; | ||
use std::ptr::NonNull; | ||
|
@@ -28,6 +31,23 @@ use crate::{ | |
|
||
use super::Buffer; | ||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if it's a good idea to introduce allocator-api2 as a compatibility layer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your information. It looks like
As for now we can define them by ourselves in few lines. |
||
pub trait Allocator: private::Sealed {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At first glance, it's somewhat confusing to define a sealed trait here. Maybe add some comments here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added, please take a look |
||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
impl Allocator for Global {} | ||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
#[derive(Debug)] | ||
pub struct Global; | ||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
mod private { | ||
pub trait Sealed {} | ||
|
||
impl Sealed for super::Global {} | ||
} | ||
|
||
/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items. | ||
/// | ||
/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned | ||
|
@@ -51,12 +71,20 @@ use super::Buffer; | |
/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0]) | ||
/// ``` | ||
#[derive(Debug)] | ||
pub struct MutableBuffer { | ||
pub struct MutableBuffer< | ||
#[cfg(feature = "allocator_api")] A: Allocator = Global, | ||
#[cfg(not(feature = "allocator_api"))] A: Allocator = Global, | ||
> { | ||
// dangling iff capacity = 0 | ||
data: NonNull<u8>, | ||
// invariant: len <= capacity | ||
len: usize, | ||
layout: Layout, | ||
#[cfg(feature = "allocator_api")] | ||
allocator: A, | ||
#[cfg(not(feature = "allocator_api"))] | ||
#[doc = "Placeholder for allocator API"] | ||
allocator: A, | ||
} | ||
|
||
impl MutableBuffer { | ||
|
@@ -83,14 +111,14 @@ impl MutableBuffer { | |
0 => dangling_ptr(), | ||
_ => { | ||
// Safety: Verified size != 0 | ||
let raw_ptr = unsafe { std::alloc::alloc(layout) }; | ||
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) | ||
unsafe { Self::alloc(&Global, layout) } | ||
} | ||
}; | ||
Self { | ||
data, | ||
len: 0, | ||
layout, | ||
allocator: Global, | ||
} | ||
} | ||
|
||
|
@@ -115,7 +143,12 @@ impl MutableBuffer { | |
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) | ||
} | ||
}; | ||
Self { data, len, layout } | ||
Self { | ||
data, | ||
len, | ||
layout, | ||
allocator: Global, | ||
} | ||
} | ||
|
||
/// Create a [`MutableBuffer`] from the provided [`Vec`] without copying | ||
|
@@ -136,7 +169,12 @@ impl MutableBuffer { | |
let data = bytes.ptr(); | ||
mem::forget(bytes); | ||
|
||
Ok(Self { data, len, layout }) | ||
Ok(Self { | ||
data, | ||
len, | ||
layout, | ||
allocator: Global, | ||
}) | ||
} | ||
|
||
/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits. | ||
|
@@ -204,28 +242,6 @@ impl MutableBuffer { | |
} | ||
} | ||
|
||
#[cold] | ||
fn reallocate(&mut self, capacity: usize) { | ||
let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); | ||
if new_layout.size() == 0 { | ||
if self.layout.size() != 0 { | ||
// Safety: data was allocated with layout | ||
unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) }; | ||
self.layout = new_layout | ||
} | ||
return; | ||
} | ||
|
||
let data = match self.layout.size() { | ||
// Safety: new_layout is not empty | ||
0 => unsafe { std::alloc::alloc(new_layout) }, | ||
// Safety: verified new layout is valid and not empty | ||
_ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) }, | ||
}; | ||
self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); | ||
self.layout = new_layout; | ||
} | ||
|
||
/// Truncates this buffer to `len` bytes | ||
/// | ||
/// If `len` is greater than the buffer's current length, this has no effect | ||
|
@@ -483,6 +499,139 @@ impl MutableBuffer { | |
} | ||
} | ||
|
||
#[cfg(feature = "allocator_api")] | ||
impl<A: Allocator> MutableBuffer<A> { | ||
/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity` | ||
/// in the given allocator. | ||
/// | ||
/// See [`MutableBuffer::with_capacity_in`]. | ||
#[inline] | ||
pub fn new_in(allocator: A, capacity: usize) -> Self { | ||
Self::with_capacity_in(allocator, capacity) | ||
} | ||
|
||
/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. | ||
/// in the given allocator | ||
/// | ||
/// # Panics | ||
/// | ||
/// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater | ||
/// then `isize::MAX`, then this function will panic. | ||
#[inline] | ||
pub fn with_capacity_in(allocator: A, capacity: usize) -> Self { | ||
let capacity = bit_util::round_upto_multiple_of_64(capacity); | ||
let layout = Layout::from_size_align(capacity, ALIGNMENT) | ||
.expect("failed to create layout for MutableBuffer"); | ||
let data = match layout.size() { | ||
0 => dangling_ptr(), | ||
_ => { | ||
// Safety: Verified size != 0 | ||
unsafe { Self::alloc(&allocator, layout) } | ||
} | ||
}; | ||
Self { | ||
data, | ||
len: 0, | ||
layout, | ||
allocator, | ||
} | ||
} | ||
} | ||
|
||
/// `allocator_api` related internal methods | ||
impl<A: Allocator> MutableBuffer<A> { | ||
#[inline] | ||
unsafe fn alloc(_alloc: &A, layout: Layout) -> NonNull<u8> { | ||
#[cfg(feature = "allocator_api")] | ||
{ | ||
_alloc | ||
.allocate(layout) | ||
.unwrap_or_else(|_| handle_alloc_error(layout)) | ||
.cast() | ||
} | ||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
{ | ||
waynexia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let data = std::alloc::alloc(layout); | ||
NonNull::new(data).unwrap_or_else(|| handle_alloc_error(layout)) | ||
} | ||
} | ||
|
||
#[inline] | ||
unsafe fn dealloc(_alloc: &A, ptr: NonNull<u8>, layout: Layout) { | ||
#[cfg(feature = "allocator_api")] | ||
{ | ||
_alloc.deallocate(ptr, layout) | ||
} | ||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
{ | ||
std::alloc::dealloc(ptr.as_ptr(), layout) | ||
} | ||
} | ||
|
||
#[cold] | ||
fn reallocate(&mut self, capacity: usize) { | ||
let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); | ||
|
||
// shrink to zero | ||
if new_layout.size() == 0 { | ||
if self.layout.size() != 0 { | ||
// Safety: data was allocated with layout | ||
unsafe { Self::dealloc(&self.allocator, self.data, self.layout) }; | ||
self.layout = new_layout | ||
} | ||
return; | ||
} | ||
|
||
#[cfg(feature = "allocator_api")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entire There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may misread this. Only those new public APIs are limited under the feature gate. |
||
match new_layout.size().cmp(&self.layout.size()) { | ||
std::cmp::Ordering::Equal => { | ||
// no action needed | ||
return; | ||
} | ||
std::cmp::Ordering::Less => { | ||
// shrink to new capacity | ||
let new_data = unsafe { | ||
self.allocator | ||
.shrink(self.data, self.layout, new_layout) | ||
.unwrap_or_else(|_| handle_alloc_error(new_layout)) | ||
.cast() | ||
}; | ||
self.layout = new_layout; | ||
self.data = new_data; | ||
return; | ||
} | ||
std::cmp::Ordering::Greater => { | ||
// grow to new capacity | ||
let new_data = unsafe { | ||
self.allocator | ||
.grow(self.data, self.layout, new_layout) | ||
.unwrap_or_else(|_| handle_alloc_error(new_layout)) | ||
.cast() | ||
}; | ||
self.layout = new_layout; | ||
self.data = new_data; | ||
} | ||
} | ||
|
||
#[cfg(not(feature = "allocator_api"))] | ||
{ | ||
self.data = match self.layout.size() { | ||
// Safety: new_layout is not empty | ||
0 => unsafe { Self::alloc(&self.allocator, new_layout) }, | ||
// Safety: verified new layout is valid and not empty | ||
_ => unsafe { | ||
let new_data = std::alloc::realloc(self.data.as_ptr(), self.layout, capacity); | ||
NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout)) | ||
}, | ||
}; | ||
// self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); | ||
waynexia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.layout = new_layout; | ||
} | ||
} | ||
} | ||
|
||
#[inline] | ||
fn dangling_ptr() -> NonNull<u8> { | ||
// SAFETY: ALIGNMENT is a non-zero usize which is then casted | ||
|
@@ -517,8 +666,18 @@ impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer { | |
// Vec guaranteed to have a valid layout matching that of `Layout::array` | ||
// This is based on `RawVec::current_memory` | ||
let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() }; | ||
let zelf = Self { | ||
data, | ||
len, | ||
layout, | ||
#[cfg(not(feature = "allocator_api"))] | ||
allocator: Global, | ||
#[cfg(feature = "allocator_api")] | ||
allocator: *value.allocator(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The The same would be useful for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will default to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got what you mean https://github.com/apache/arrow-rs/pull/6336/files#diff-371342744df1b634b0bd9d90f4fe38c1eb0096df322fd3cc2fbc513f3428046cR692-R696 We should have two impl blocks indeed for different type parameters |
||
}; | ||
|
||
mem::forget(value); | ||
Self { data, len, layout } | ||
zelf | ||
} | ||
} | ||
|
||
|
@@ -688,11 +847,11 @@ impl std::ops::DerefMut for MutableBuffer { | |
} | ||
} | ||
|
||
impl Drop for MutableBuffer { | ||
impl<A: Allocator> Drop for MutableBuffer<A> { | ||
fn drop(&mut self) { | ||
if self.layout.size() != 0 { | ||
// Safety: data was allocated with standard allocator with given layout | ||
unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) }; | ||
unsafe { Self::dealloc(&self.allocator, self.data, self.layout) }; | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it confusing at first why this line doesn't follow the model of the rest of the tests in this workflow which run with
--all-features
I am also concerned this may remove coverage for some of the other features such as prettyprint, ffi and pyarrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow-buffer
only has one feature gateallocator_api
from this PR. This specific change won't affect other tests likeprettyprint
as the command-p arrow-buffer
only runs on this sub-crate. But I agree it's easy to forget when the new feature comes in the future... Sadly, cargo seems not to support opt-out one feature in CLI 😢There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, the same problem comes to the main
arrow
lib because I added a delegate featureallocator_api = ["arrow-buffer/allocator_api"]
. Looks like we have to enumerate all available features exceptallocator_api
in CI?