Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use arrow-rs in ArrowBuffer #8201

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5990,6 +5990,7 @@ name = "re_space_view"
version = "0.21.0-alpha.1+dev"
dependencies = [
"ahash",
"arrow",
"bytemuck",
"egui",
"glam",
Expand Down Expand Up @@ -6080,6 +6081,7 @@ version = "0.21.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"bitflags 2.6.0",
"bytemuck",
"criterion",
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ serde = [
]

## Enable conversion to and from arrow-rs types
arrow = ["arrow2/arrow", "dep:arrow"]
arrow = ["arrow2/arrow"]


[dependencies]
Expand All @@ -49,6 +49,7 @@ re_types_core.workspace = true
# External
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = [
"compute_concatenate",
"compute_filter",
Expand All @@ -64,7 +65,6 @@ thiserror.workspace = true

# Optional dependencies:
serde = { workspace = true, optional = true, features = ["derive", "rc"] }
arrow = { workspace = true, optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl Chunk {
/// * [`Self::iter_string`].
/// * [`Self::iter_component_arrays`].
/// * [`Self::iter_component`].
pub fn iter_buffer<T: arrow2::types::NativeType>(
pub fn iter_buffer<T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>(
&self,
component_name: &ComponentName,
) -> impl Iterator<Item = Vec<ArrowBuffer<T>>> + '_ {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_types_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = [
"arrow",
"compute_concatenate",
"io_ipc",
"io_print",
"compute_concatenate",
] }
backtrace.workspace = true
bytemuck.workspace = true
Expand Down
94 changes: 58 additions & 36 deletions crates/store/re_types_core/src/arrow_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
/// Convenience-wrapper around an [`arrow2::buffer::Buffer`] that is known to contain a
use arrow::datatypes::ArrowNativeType;

/// Convenience-wrapper around an [`arrow::buffer::ScalarBuffer`] that is known to contain a
/// a primitive type.
///
/// The [`ArrowBuffer`] object is internally reference-counted and can be
/// easily converted back to a `&[T]` referencing the underlying storage.
/// This avoids some of the lifetime complexities that would otherwise
/// arise from returning a `&[T]` directly, but is significantly more
/// performant than doing the full allocation necessary to return a `Vec<T>`.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ArrowBuffer<T>(arrow2::buffer::Buffer<T>);
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowBuffer<T: ArrowNativeType>(arrow::buffer::ScalarBuffer<T>);

impl<T: ArrowNativeType> Default for ArrowBuffer<T> {
fn default() -> Self {
Self(arrow::buffer::ScalarBuffer::<T>::from(vec![]))
}
}

impl<T: crate::SizeBytes> crate::SizeBytes for ArrowBuffer<T> {
impl<T: crate::SizeBytes + ArrowNativeType> crate::SizeBytes for ArrowBuffer<T> {
#[inline]
fn heap_size_bytes(&self) -> u64 {
let Self(buf) = self;
std::mem::size_of_val(buf.as_slice()) as _
std::mem::size_of_val(self.as_slice()) as _
}
}

impl<T> ArrowBuffer<T> {
impl<T: ArrowNativeType> ArrowBuffer<T> {
/// The number of instances of T stored in this buffer.
#[inline]
pub fn num_instances(&self) -> usize {
// WARNING: If you are touching this code, make sure you know what len() actually does.
//
// There is ambiguity in how arrow2 and arrow-rs talk about buffer lengths, including
// some incorrect documentation: https://github.com/jorgecarleitao/arrow2/issues/1430
//
// Arrow2 `Buffer<T>` is typed and `len()` is the number of units of `T`, but the documentation
// is currently incorrect.
// Arrow-rs `Buffer` is untyped and len() is in bytes, but `ScalarBuffer`s are in units of T.
self.0.len()
self.as_slice().len()
}

/// The number of bytes stored in this buffer
Expand All @@ -45,7 +44,7 @@ impl<T> ArrowBuffer<T> {

#[inline]
pub fn as_slice(&self) -> &[T] {
self.0.as_slice()
self.0.as_ref()
}

/// Returns a new [`ArrowBuffer`] that is a slice of this buffer starting at `offset`.
Expand All @@ -56,19 +55,19 @@ impl<T> ArrowBuffer<T> {
/// Panics iff `offset + length` is larger than `len`.
#[inline]
pub fn sliced(self, range: std::ops::Range<usize>) -> Self {
Self(self.0.sliced(range.start, range.len()))
Self(self.0.slice(range.start, range.len()))
}
}

impl<T: bytemuck::Pod> ArrowBuffer<T> {
impl<T: bytemuck::Pod + ArrowNativeType> ArrowBuffer<T> {
/// Cast POD (plain-old-data) types to another POD type.
///
/// For instance: cast a buffer of `u8` to a buffer of `f32`.
#[inline]
pub fn cast_pod<Target: bytemuck::Pod>(
pub fn cast_pod<Target: bytemuck::Pod + ArrowNativeType>(
&self,
) -> Result<ArrowBuffer<Target>, bytemuck::PodCastError> {
// TODO(emilk): when we switch from arrow2, see if we can make this function zero-copy
// TODO(#2978): when we switch from arrow2, see if we can make this function zero-copy
re_tracing::profile_function!();
let target_slice: &[Target] = bytemuck::try_cast_slice(self.as_slice())?;
Ok(ArrowBuffer::from(target_slice.to_vec()))
Expand All @@ -84,56 +83,79 @@ impl<T: bytemuck::Pod> ArrowBuffer<T> {
}
}

impl<T: Eq> Eq for ArrowBuffer<T> {}
impl<T: Eq + ArrowNativeType> Eq for ArrowBuffer<T> {}

impl<T: Clone> ArrowBuffer<T> {
impl<T: ArrowNativeType> ArrowBuffer<T> {
#[inline]
pub fn to_vec(&self) -> Vec<T> {
self.0.as_slice().to_vec()
self.as_slice().to_vec()
}
}

impl<T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>
From<arrow::buffer::ScalarBuffer<T>> for ArrowBuffer<T>
impl<T: ArrowNativeType + arrow2::types::NativeType> From<arrow::buffer::ScalarBuffer<T>>
for ArrowBuffer<T>
{
#[inline]
fn from(value: arrow::buffer::ScalarBuffer<T>) -> Self {
Self(value.into_inner().into())
}
}

impl<T> From<arrow2::buffer::Buffer<T>> for ArrowBuffer<T> {
impl<T: ArrowNativeType + arrow2::types::NativeType> From<arrow2::buffer::Buffer<T>>
for ArrowBuffer<T>
{
#[inline]
fn from(value: arrow2::buffer::Buffer<T>) -> Self {
Self(value)
fn from(arrow2_buffer: arrow2::buffer::Buffer<T>) -> Self {
let num_elements = arrow2_buffer.len();
let arrow1_buffer = arrow::buffer::Buffer::from(arrow2_buffer);
let scalar_buffer = arrow::buffer::ScalarBuffer::new(arrow1_buffer, 0, num_elements);
Self(scalar_buffer)
}
}

impl<T> From<Vec<T>> for ArrowBuffer<T> {
impl<T: ArrowNativeType> From<Vec<T>> for ArrowBuffer<T> {
#[inline]
fn from(value: Vec<T>) -> Self {
Self(value.into())
}
}

impl<T: Clone> From<&[T]> for ArrowBuffer<T> {
impl<T: ArrowNativeType> From<&[T]> for ArrowBuffer<T> {
#[inline]
fn from(value: &[T]) -> Self {
Self(value.iter().cloned().collect()) // TODO(emilk): avoid extra clones
Self(value.iter().copied().collect())
}
}

impl<T> FromIterator<T> for ArrowBuffer<T> {
impl<T: ArrowNativeType> FromIterator<T> for ArrowBuffer<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self(arrow2::buffer::Buffer::from_iter(iter))
Self(arrow::buffer::ScalarBuffer::from_iter(iter))
}
}

impl<T> std::ops::Deref for ArrowBuffer<T> {
impl<T: ArrowNativeType> std::ops::Deref for ArrowBuffer<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &[T] {
self.0.as_slice()
self.as_slice()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_arrow2_compatibility() {
let arrow2_buffer = arrow2::buffer::Buffer::<f32>::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
assert_eq!(arrow2_buffer.as_slice(), &[1.0, 2.0, 3.0, 4.0, 5.0]);

let sliced_arrow2_buffer = arrow2_buffer.sliced(1, 3);
assert_eq!(sliced_arrow2_buffer.as_slice(), &[2.0, 3.0, 4.0]);

let arrow_buffer = ArrowBuffer::<f32>::from(sliced_arrow2_buffer);
assert_eq!(arrow_buffer.num_instances(), 3);
assert_eq!(arrow_buffer.as_slice(), &[2.0, 3.0, 4.0]);
}
}
1 change: 1 addition & 0 deletions crates/viewer/re_space_view/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ re_viewer_context.workspace = true
re_viewport_blueprint.workspace = true

ahash.workspace = true
arrow.workspace = true
bytemuck.workspace = true
egui.workspace = true
glam.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/viewer/re_space_view/src/results_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ impl<'a> HybridResultsChunkIter<'a> {
/// Iterate as indexed buffers.
///
/// See [`Chunk::iter_buffer`] for more information.
pub fn buffer<T: arrow2::types::NativeType>(
pub fn buffer<T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>(
&'a self,
) -> impl Iterator<Item = ((TimeInt, RowId), Vec<re_types_core::ArrowBuffer<T>>)> + 'a {
self.chunks.iter().flat_map(|chunk| {
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_space_view_spatial/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_video.workspace = true
re_viewer_context.workspace = true
re_viewport_blueprint.workspace = true

arrow.workspace = true
arrow2.workspace = true
ahash.workspace = true
anyhow.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub fn iter_string<'a>(
///
/// See [`Chunk::iter_buffer`] for more information.
#[allow(unused)]
pub fn iter_buffer<'a, T: arrow2::types::NativeType>(
pub fn iter_buffer<'a, T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>(
chunks: &'a std::borrow::Cow<'a, [Chunk]>,
timeline: Timeline,
component_name: ComponentName,
Expand Down
Loading