Skip to content

Commit

Permalink
Add arrow(1)-interface on top of Loggable and ArrowBuffer (#8197)
Browse files Browse the repository at this point in the history
* [x] I checked this checkbox
* Part of #3741
  • Loading branch information
emilk authored Nov 22, 2024
1 parent 240033f commit 7b32378
Show file tree
Hide file tree
Showing 50 changed files with 435 additions and 369 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6342,6 +6342,7 @@ name = "re_types_core"
version = "0.21.0-alpha.1+dev"
dependencies = [
"anyhow",
"arrow",
"backtrace",
"bytemuck",
"criterion",
Expand Down
44 changes: 22 additions & 22 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::{Duration, Instant},
};

use arrow2::array::{Array as ArrowArray, PrimitiveArray as ArrowPrimitiveArray};
use arrow2::array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray};
use crossbeam::channel::{Receiver, Sender};
use nohash_hasher::IntMap;

Expand Down Expand Up @@ -679,14 +679,14 @@ pub struct PendingRow {
/// The component data.
///
/// Each array is a single component, i.e. _not_ a list array.
pub components: BTreeMap<ComponentName, Box<dyn ArrowArray>>,
pub components: BTreeMap<ComponentName, Box<dyn Arrow2Array>>,
}

impl PendingRow {
#[inline]
pub fn new(
timepoint: TimePoint,
components: BTreeMap<ComponentName, Box<dyn ArrowArray>>,
components: BTreeMap<ComponentName, Box<dyn Arrow2Array>>,
) -> Self {
Self {
row_id: RowId::new(),
Expand Down Expand Up @@ -726,7 +726,7 @@ impl PendingRow {
let timelines = timepoint
.into_iter()
.map(|(timeline, time)| {
let times = ArrowPrimitiveArray::<i64>::from_vec(vec![time.as_i64()]);
let times = Arrow2PrimitiveArray::<i64>::from_vec(vec![time.as_i64()]);
let time_column = TimeColumn::new(Some(true), timeline, times);
(timeline, time_column)
})
Expand Down Expand Up @@ -799,7 +799,7 @@ impl PendingRow {
re_tracing::profile_scope!("iterate per timeline set");

// Then we split the micro batches even further -- one sub-batch per unique set of datatypes.
let mut per_datatype_set: IntMap<u64 /* ArrowDatatype set */, Vec<Self>> =
let mut per_datatype_set: IntMap<u64 /* Arrow2Datatype set */, Vec<Self>> =
Default::default();
{
re_tracing::profile_scope!("compute datatype sets");
Expand All @@ -826,7 +826,7 @@ impl PendingRow {

// Create all the logical list arrays that we're going to need, accounting for the
// possibility of sparse components in the data.
let mut all_components: IntMap<ComponentName, Vec<Option<&dyn ArrowArray>>> =
let mut all_components: IntMap<ComponentName, Vec<Option<&dyn Arrow2Array>>> =
IntMap::default();
for row in &rows {
for component_name in row.components.keys() {
Expand Down Expand Up @@ -893,7 +893,7 @@ impl PendingRow {
arrays.push(
row_components
.get(component_name)
.map(|array| &**array as &dyn ArrowArray),
.map(|array| &**array as &dyn Arrow2Array),
);
}
}
Expand Down Expand Up @@ -967,7 +967,7 @@ impl PendingTimeColumn {

TimeColumn {
timeline,
times: ArrowPrimitiveArray::<i64>::from_vec(times).to(timeline.datatype()),
times: Arrow2PrimitiveArray::<i64>::from_vec(times).to(timeline.datatype()),
is_sorted,
time_range,
}
Expand Down Expand Up @@ -1047,7 +1047,7 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![42, 43, 44]),
Arrow2PrimitiveArray::from_vec(vec![42, 43, 44]),
),
)];
let expected_components = [(
Expand Down Expand Up @@ -1203,7 +1203,7 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![42, 44]),
Arrow2PrimitiveArray::from_vec(vec![42, 44]),
),
)];
let expected_components = [(
Expand Down Expand Up @@ -1231,7 +1231,7 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![43]),
Arrow2PrimitiveArray::from_vec(vec![43]),
),
)];
let expected_components = [(
Expand Down Expand Up @@ -1318,7 +1318,7 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![42]),
Arrow2PrimitiveArray::from_vec(vec![42]),
),
)];
let expected_components = [(
Expand Down Expand Up @@ -1347,15 +1347,15 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![43, 44]),
Arrow2PrimitiveArray::from_vec(vec![43, 44]),
),
),
(
timeline2,
TimeColumn::new(
Some(true),
timeline2,
ArrowPrimitiveArray::from_vec(vec![1000, 1001]),
Arrow2PrimitiveArray::from_vec(vec![1000, 1001]),
),
),
];
Expand Down Expand Up @@ -1439,7 +1439,7 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![42, 44]),
Arrow2PrimitiveArray::from_vec(vec![42, 44]),
),
)];
let expected_components = [(
Expand Down Expand Up @@ -1467,7 +1467,7 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![43]),
Arrow2PrimitiveArray::from_vec(vec![43]),
),
)];
let expected_components = [(
Expand Down Expand Up @@ -1569,15 +1569,15 @@ mod tests {
TimeColumn::new(
Some(false),
timeline1,
ArrowPrimitiveArray::from_vec(vec![45, 42, 43, 44]),
Arrow2PrimitiveArray::from_vec(vec![45, 42, 43, 44]),
),
),
(
timeline2,
TimeColumn::new(
Some(false),
timeline2,
ArrowPrimitiveArray::from_vec(vec![1003, 1000, 1001, 1002]),
Arrow2PrimitiveArray::from_vec(vec![1003, 1000, 1001, 1002]),
),
),
];
Expand Down Expand Up @@ -1683,15 +1683,15 @@ mod tests {
TimeColumn::new(
Some(false),
timeline1,
ArrowPrimitiveArray::from_vec(vec![45, 42, 43]),
Arrow2PrimitiveArray::from_vec(vec![45, 42, 43]),
),
),
(
timeline2,
TimeColumn::new(
Some(false),
timeline2,
ArrowPrimitiveArray::from_vec(vec![1003, 1000, 1001]),
Arrow2PrimitiveArray::from_vec(vec![1003, 1000, 1001]),
),
),
];
Expand Down Expand Up @@ -1722,15 +1722,15 @@ mod tests {
TimeColumn::new(
Some(true),
timeline1,
ArrowPrimitiveArray::from_vec(vec![44]),
Arrow2PrimitiveArray::from_vec(vec![44]),
),
),
(
timeline2,
TimeColumn::new(
Some(true),
timeline2,
ArrowPrimitiveArray::from_vec(vec![1002]),
Arrow2PrimitiveArray::from_vec(vec![1002]),
),
),
];
Expand Down
14 changes: 7 additions & 7 deletions crates/store/re_chunk/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::BTreeMap;

use arrow2::{
array::{Array as ArrowArray, PrimitiveArray as ArrowPrimitiveArray},
datatypes::DataType as ArrowDatatype,
array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray},
datatypes::DataType as Arrow2Datatype,
};
use itertools::Itertools;

Expand All @@ -23,7 +23,7 @@ pub struct ChunkBuilder {

row_ids: Vec<RowId>,
timelines: BTreeMap<Timeline, TimeColumnBuilder>,
components: BTreeMap<ComponentName, Vec<Option<Box<dyn ArrowArray>>>>,
components: BTreeMap<ComponentName, Vec<Option<Box<dyn Arrow2Array>>>>,
}

impl Chunk {
Expand Down Expand Up @@ -63,7 +63,7 @@ impl ChunkBuilder {
mut self,
row_id: RowId,
timepoint: impl Into<TimePoint>,
components: impl IntoIterator<Item = (ComponentName, Option<Box<dyn ArrowArray>>)>,
components: impl IntoIterator<Item = (ComponentName, Option<Box<dyn Arrow2Array>>)>,
) -> Self {
let components = components.into_iter().collect_vec();

Expand Down Expand Up @@ -107,7 +107,7 @@ impl ChunkBuilder {
self,
row_id: RowId,
timepoint: impl Into<TimePoint>,
components: impl IntoIterator<Item = (ComponentName, Box<dyn ArrowArray>)>,
components: impl IntoIterator<Item = (ComponentName, Box<dyn Arrow2Array>)>,
) -> Self {
self.with_sparse_row(
row_id,
Expand Down Expand Up @@ -258,7 +258,7 @@ impl ChunkBuilder {
#[inline]
pub fn build_with_datatypes(
self,
datatypes: &IntMap<ComponentName, ArrowDatatype>,
datatypes: &IntMap<ComponentName, Arrow2Datatype>,
) -> ChunkResult<Chunk> {
let Self {
id,
Expand Down Expand Up @@ -343,7 +343,7 @@ impl TimeColumnBuilder {
pub fn build(self) -> TimeColumn {
let Self { timeline, times } = self;

let times = ArrowPrimitiveArray::<i64>::from_vec(times).to(timeline.datatype());
let times = Arrow2PrimitiveArray::<i64>::from_vec(times).to(timeline.datatype());
TimeColumn::new(None, timeline, times)
}
}
Loading

0 comments on commit 7b32378

Please sign in to comment.