Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Allocator type param to MutableBuffer #6336

Closed
wants to merge 14 commits into from
8 changes: 4 additions & 4 deletions .github/workflows/arrow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ jobs:
submodules: true
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Test arrow-buffer with all features
run: cargo test -p arrow-buffer --all-features
- name: Test arrow-buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it confusing at first why this line doesn't follow the model of the rest of the tests in this workflow which run with --all-features

I am also concerned this may remove coverage for some of the other features such as prettyprint, ffi and pyarrow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow-buffer only has one feature gate allocator_api from this PR. This specific change won't affect other tests like prettyprint as the command -p arrow-buffer only runs on this sub-crate. But I agree it's easy to forget when the new feature comes in the future... Sadly, cargo seems not to support opt-out one feature in CLI 😢

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, the same problem comes to the main arrow lib because I added a delegate feature allocator_api = ["arrow-buffer/allocator_api"]. Looks like we have to enumerate all available features except allocator_api in CI?

run: cargo test -p arrow-buffer
- name: Test arrow-data with all features
run: cargo test -p arrow-data --all-features
- name: Test arrow-schema with all features
Expand Down Expand Up @@ -163,8 +163,8 @@ jobs:
uses: ./.github/actions/setup-builder
- name: Setup Clippy
run: rustup component add clippy
- name: Clippy arrow-buffer with all features
run: cargo clippy -p arrow-buffer --all-targets --all-features -- -D warnings
- name: Clippy arrow-buffer
run: cargo clippy -p arrow-buffer --all-targets -- -D warnings
- name: Clippy arrow-data with all features
run: cargo clippy -p arrow-data --all-targets --all-features -- -D warnings
- name: Clippy arrow-schema with all features
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/miri.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ cargo miri setup
cargo clean

echo "Starting Arrow MIRI run..."
cargo miri test -p arrow-buffer
cargo miri test -p arrow-buffer --features allocator_api
cargo miri test -p arrow-data --features ffi
cargo miri test -p arrow-schema --features ffi
cargo miri test -p arrow-ord
Expand Down
3 changes: 3 additions & 0 deletions arrow-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ name = "arrow_buffer"
path = "src/lib.rs"
bench = false

[features]
allocator_api = []

[dependencies]
bytes = { version = "1.4" }
num = { version = "0.4", default-features = false, features = ["std"] }
Expand Down
227 changes: 197 additions & 30 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(feature = "allocator_api")]
use std::alloc::{Allocator, Global};

use std::alloc::{handle_alloc_error, Layout};
use std::mem;
use std::ptr::NonNull;
Expand All @@ -28,6 +31,25 @@ use crate::{

use super::Buffer;

#[cfg(not(feature = "allocator_api"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's a good idea to introduce allocator-api2 as a compatibility layer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your information. It looks like allocator-api2 provides many things we don't need. And I'm hesitant to add a new dep for the normal case. We can add it in the future if it turns out that allocator-api2 suits our use case well. What we need here is

  • Placeholders to standard library things that are not available without unstable features (Allocator and Global)
  • Wrapped alloc and dealloc methods

As for now we can define them by ourselves in few lines.

#[doc = "Placeholder trait for std::alloc::Allocator. To appoint a real allocator,"]
#[doc = "please enable `allocator_api` feature with nightly toolchain"]
pub trait Allocator: private::Sealed {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance, it's somewhat confusing to define a sealed trait here. Maybe add some comments here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, please take a look


#[cfg(not(feature = "allocator_api"))]
impl Allocator for Global {}

#[cfg(not(feature = "allocator_api"))]
#[derive(Debug)]
pub struct Global;

#[cfg(not(feature = "allocator_api"))]
mod private {
pub trait Sealed {}

impl Sealed for super::Global {}
}

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
///
/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned
Expand All @@ -50,13 +72,28 @@ use super::Buffer;
/// let buffer: Buffer = buffer.into();
/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0])
/// ```
///
/// # Customize [`Allocator`]
///
/// To customize the allocator for the buffer, enable the `allocator_api` feature and use either
/// methods like [`MutableBuffer::new_in`] or [`MutableBuffer::with_capacity_in`], or inherit the
/// allocator from a type like [`Vec`] using [`MutableBuffer::from`]. A example can be found in
/// the [allocator_api example](https://github.com/apache/arrow-rs/tree/master/arrow/examples).
#[derive(Debug)]
pub struct MutableBuffer {
pub struct MutableBuffer<
#[cfg(feature = "allocator_api")] A: Allocator = Global,
#[cfg(not(feature = "allocator_api"))] A: Allocator = Global,
> {
// dangling iff capacity = 0
data: NonNull<u8>,
// invariant: len <= capacity
len: usize,
layout: Layout,
#[cfg(feature = "allocator_api")]
allocator: A,
#[cfg(not(feature = "allocator_api"))]
#[doc = "Placeholder for allocator API"]
allocator: A,
}

impl MutableBuffer {
Expand All @@ -83,14 +120,14 @@ impl MutableBuffer {
0 => dangling_ptr(),
_ => {
// Safety: Verified size != 0
let raw_ptr = unsafe { std::alloc::alloc(layout) };
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
unsafe { Self::alloc(&Global, layout) }
}
};
Self {
data,
len: 0,
layout,
allocator: Global,
}
}

Expand All @@ -115,7 +152,12 @@ impl MutableBuffer {
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
}
};
Self { data, len, layout }
Self {
data,
len,
layout,
allocator: Global,
}
}

/// Create a [`MutableBuffer`] from the provided [`Vec`] without copying
Expand All @@ -136,7 +178,12 @@ impl MutableBuffer {
let data = bytes.ptr();
mem::forget(bytes);

Ok(Self { data, len, layout })
Ok(Self {
data,
len,
layout,
allocator: Global,
})
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
Expand Down Expand Up @@ -204,28 +251,6 @@ impl MutableBuffer {
}
}

#[cold]
fn reallocate(&mut self, capacity: usize) {
let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
if new_layout.size() == 0 {
if self.layout.size() != 0 {
// Safety: data was allocated with layout
unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
self.layout = new_layout
}
return;
}

let data = match self.layout.size() {
// Safety: new_layout is not empty
0 => unsafe { std::alloc::alloc(new_layout) },
// Safety: verified new layout is valid and not empty
_ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
};
self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
self.layout = new_layout;
}

/// Truncates this buffer to `len` bytes
///
/// If `len` is greater than the buffer's current length, this has no effect
Expand Down Expand Up @@ -483,6 +508,138 @@ impl MutableBuffer {
}
}

#[cfg(feature = "allocator_api")]
impl<A: Allocator> MutableBuffer<A> {
/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`
/// in the given allocator.
///
/// See [`MutableBuffer::with_capacity_in`].
#[inline]
pub fn new_in(allocator: A, capacity: usize) -> Self {
Self::with_capacity_in(allocator, capacity)
}

/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
/// in the given allocator
///
/// # Panics
///
/// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater
/// then `isize::MAX`, then this function will panic.
#[inline]
pub fn with_capacity_in(allocator: A, capacity: usize) -> Self {
let capacity = bit_util::round_upto_multiple_of_64(capacity);
let layout = Layout::from_size_align(capacity, ALIGNMENT)
.expect("failed to create layout for MutableBuffer");
let data = match layout.size() {
0 => dangling_ptr(),
_ => {
// Safety: Verified size != 0
unsafe { Self::alloc(&allocator, layout) }
}
};
Self {
data,
len: 0,
layout,
allocator,
}
}
}

/// `allocator_api` related internal methods
impl<A: Allocator> MutableBuffer<A> {
#[inline]
unsafe fn alloc(alloc: &A, layout: Layout) -> NonNull<u8> {
#[cfg(feature = "allocator_api")]
{
alloc
.allocate(layout)
.unwrap_or_else(|_| handle_alloc_error(layout))
.cast()
}

#[cfg(not(feature = "allocator_api"))]
{
waynexia marked this conversation as resolved.
Show resolved Hide resolved
let _ = alloc;
let data = std::alloc::alloc(layout);
NonNull::new(data).unwrap_or_else(|| handle_alloc_error(layout))
}
}

#[inline]
unsafe fn dealloc(alloc: &A, ptr: NonNull<u8>, layout: Layout) {
#[cfg(feature = "allocator_api")]
{
alloc.deallocate(ptr, layout)
}

#[cfg(not(feature = "allocator_api"))]
{
let _ = alloc;
std::alloc::dealloc(ptr.as_ptr(), layout)
}
}

#[cold]
fn reallocate(&mut self, capacity: usize) {
let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();

// shrink to zero
if new_layout.size() == 0 {
if self.layout.size() != 0 {
// Safety: data was allocated with layout
unsafe { Self::dealloc(&self.allocator, self.data, self.layout) };
self.layout = new_layout
}
return;
}

#[cfg(feature = "allocator_api")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire MutableBuffer has been gated under allocator_api. Do we still need it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may misread this. Only those new public APIs are limited under the feature gate.

match new_layout.size().cmp(&self.layout.size()) {
std::cmp::Ordering::Equal => {
// no action needed
}
std::cmp::Ordering::Less => {
// shrink to new capacity
let new_data = unsafe {
self.allocator
.shrink(self.data, self.layout, new_layout)
.unwrap_or_else(|_| handle_alloc_error(new_layout))
.cast()
};
self.layout = new_layout;
self.data = new_data;
}
std::cmp::Ordering::Greater => {
// grow to new capacity
let new_data = unsafe {
self.allocator
.grow(self.data, self.layout, new_layout)
.unwrap_or_else(|_| handle_alloc_error(new_layout))
.cast()
};
self.layout = new_layout;
self.data = new_data;
}
}

#[cfg(not(feature = "allocator_api"))]
{
self.data = match self.layout.size() {
// Safety: new_layout is not empty
0 => unsafe { Self::alloc(&self.allocator, new_layout) },
// Safety: verified new layout is valid and not empty
_ => unsafe {
let new_data = std::alloc::realloc(self.data.as_ptr(), self.layout, capacity);
NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout))
},
};
self.layout = new_layout;
}
}
}

#[inline]
fn dangling_ptr() -> NonNull<u8> {
// SAFETY: ALIGNMENT is a non-zero usize which is then casted
Expand Down Expand Up @@ -517,8 +674,18 @@ impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
// Vec guaranteed to have a valid layout matching that of `Layout::array`
// This is based on `RawVec::current_memory`
let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
let zelf = Self {
data,
len,
layout,
#[cfg(not(feature = "allocator_api"))]
allocator: Global,
#[cfg(feature = "allocator_api")]
allocator: *value.allocator(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The From impl would benefit from having a type parameter for the allocator, otherwise the allocator here would always implicitly be the global one. That probably requires two separate impl blocks depending on the feature flag.

The same would be useful for Buffer::from_vec(Vec).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will default to Global without allocator_api, and inherit from the vector via Vec::allocator() API. Is this behavior ideal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got what you mean https://github.com/apache/arrow-rs/pull/6336/files#diff-371342744df1b634b0bd9d90f4fe38c1eb0096df322fd3cc2fbc513f3428046cR692-R696

We should have two impl blocks indeed for different type parameters

};

mem::forget(value);
Self { data, len, layout }
zelf
}
}

Expand Down Expand Up @@ -688,11 +855,11 @@ impl std::ops::DerefMut for MutableBuffer {
}
}

impl Drop for MutableBuffer {
impl<A: Allocator> Drop for MutableBuffer<A> {
fn drop(&mut self) {
if self.layout.size() != 0 {
// Safety: data was allocated with standard allocator with given layout
unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
unsafe { Self::dealloc(&self.allocator, self.data, self.layout) };
}
}
}
Expand Down
1 change: 1 addition & 0 deletions arrow-buffer/src/builder/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::{BooleanBufferBuilder, MutableBuffer, NullBuffer};

/// Builder for creating the null bit buffer.
///
/// This builder only materializes the buffer when we append `false`.
/// If you only append `true`s to the builder, what you get will be
/// `None` when calling [`finish`](#method.finish).
Expand Down
1 change: 1 addition & 0 deletions arrow-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

// used by [`buffer::mutable::dangling_ptr`]
#![cfg_attr(miri, feature(strict_provenance))]
#![cfg_attr(feature = "allocator_api", feature(allocator_api))]

pub mod alloc;
pub mod buffer;
Expand Down
8 changes: 5 additions & 3 deletions arrow-buffer/src/util/bit_mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
use crate::bit_chunk_iterator::BitChunks;
use crate::bit_util::{ceil, get_bit, set_bit};

/// Sets all bits on `write_data` in the range `[offset_write..offset_write+len]` to be equal to the
/// bits in `data` in the range `[offset_read..offset_read+len]`
/// returns the number of `0` bits `data[offset_read..offset_read+len]`
/// Sets bits by range.
///
/// Sets all bits on `write_data` in the range `[offset_write..offset_write+len]`
/// to be equal to the bits in `data` in the range `[offset_read..offset_read+len]`.
/// Returns the number of `0` bits `data[offset_read..offset_read+len]`.
pub fn set_bits(
write_data: &mut [u8],
data: &[u8],
Expand Down
Loading
Loading