Skip to content

Commit

Permalink
simplify gc
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jun 13, 2024
1 parent f5e964d commit 92d6de5
Showing 1 changed file with 13 additions and 277 deletions.
290 changes: 13 additions & 277 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -267,25 +266,6 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
}
}

/// Returns whether the buffers are compact
pub(self) fn compact_check(&self) -> Vec<bool> {
let mut checkers: Vec<_> = self
.buffers
.iter()
.map(|b| CompactChecker::new(b.len()))
.collect();

for (i, view) in self.views.iter().enumerate() {
let view = ByteView::from(*view);
if self.is_null(i) || view.length <= 12 {
continue;
}
checkers[view.buffer_index as usize]
.accumulate(view.offset as usize, view.length as usize);
}
checkers.into_iter().map(|c| c.finish()).collect()
}

/// Returns a buffer compact version of this array
///
/// The original array will *not* be modified
Expand Down Expand Up @@ -320,60 +300,20 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
///
/// 2 views
/// ```
/// This method will compact the data buffers to only include the data
/// that is pointed to by the views,
/// and return a new array with the compacted data buffers.
/// The original array will be left as is.
/// This method will compact the data buffers by recreating the view array and only include the data
/// that is pointed to by the views.
///
/// Note that it will copy the array regardless of whether the original array is compact.
/// Use with caution as this can be an expensive operation, only use it when you are sure that the view
/// array is significantly smaller than when it is originally created, e.g., after filtering or slicing.
pub fn gc(&self) -> Self {
let check_result = self.compact_check();
let mut builder = GenericByteViewBuilder::<T>::with_capacity(self.len());

if check_result.iter().all(|x| *x) {
return self.clone();
for v in self.iter() {
builder.append_option(v);
}

let mut new_views = Vec::with_capacity(self.views.len());
let mut new_bufs: Vec<Vec<u8>> = vec![vec![]; self.buffers.len()];
for (i, view) in self.views.iter().enumerate() {
let mut bv = ByteView::from(*view);
let idx = bv.buffer_index as usize;
if self.is_null(i) || bv.length <= 12 || check_result[idx] {
new_views.push(*view);
continue;
}
// copy data to new buffer
let data = self.buffers.get(idx).unwrap();
let offset = new_bufs[idx].len();
let len = bv.length as usize;
new_bufs[idx].extend_from_slice(
data.get(bv.offset as usize..bv.offset as usize + len)
.unwrap(),
);
// update view
bv.offset = offset as u32;

new_views.push(bv.into());
}
let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect();

let new_views = ScalarBuffer::from(new_views);

let new_buffers = self
.buffers
.iter()
.enumerate()
.map(|(idx, buf)| {
if check_result[idx] {
buf.clone()
} else {
new_bufs[idx].clone()
}
})
.collect();

let mut compacted = self.clone();
compacted.buffers = new_buffers;
compacted.views = new_views;
compacted
builder.finish()
}
}

Expand Down Expand Up @@ -592,67 +532,6 @@ impl From<Vec<Option<String>>> for StringViewArray {
}
}

/// A helper struct that used to check if the array is compact view
///
/// The checker is lazy and will not check the array until `finish` is called.
///
/// This is based on the assumption that the array will most likely to be not compact,
/// so it is likely to scan the entire array.
///
/// Then it is better to do the check at once, rather than doing it for each accumulate operation.
struct CompactChecker {
length: usize,
intervals: BTreeMap<usize, usize>,
}

impl CompactChecker {
/// Create a new checker with the expected length of the buffer
pub fn new(length: usize) -> Self {
Self {
length,
intervals: BTreeMap::new(),
}
}

/// Accumulate a new covered interval to the checker
pub fn accumulate(&mut self, offset: usize, length: usize) {
if length == 0 {
return;
}
let end = offset + length;
if end > self.length {
panic!(
"Invalid interval: offset {} length {} is out of bound of length {}",
offset, length, self.length
);
}
if let Some(val) = self.intervals.get_mut(&offset) {
if *val < end {
*val = end;
}
} else {
self.intervals.insert(offset, end);
}
}

/// Check if the checker is fully covered
pub fn finish(self) -> bool {
// check if the coverage is continuous and full
let mut last_end = 0;

for (start, end) in self.intervals.iter() {
if *start > last_end {
return false;
}
if *end > last_end {
last_end = *end;
}
}

last_end == self.length
}
}

#[cfg(test)]
mod tests {
use crate::builder::{BinaryViewBuilder, StringViewBuilder};
Expand Down Expand Up @@ -817,136 +696,21 @@ mod tests {
StringViewArray::new(views, buffers, None);
}

#[test]
#[should_panic(expected = "Invalid interval: offset 0 length 13 is out of bound of length 12")]
fn test_compact_checker() {
use super::CompactChecker;

// single coverage, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 10);
assert!(checker.finish());

// single coverage, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
assert!(!checker.finish());

// multiple coverage, no overlapping, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 4);
assert!(!checker.finish());

//multiple coverage, no overlapping, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 5);
assert!(checker.finish());

//multiple coverage, overlapping, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(4, 5);
assert!(!checker.finish());

//multiple coverage, overlapping, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(4, 6);
assert!(checker.finish());

//mutiple coverage, no overlapping, full, out of order
let mut checker = CompactChecker::new(10);
checker.accumulate(4, 6);
checker.accumulate(0, 4);
assert!(checker.finish());

// multiple coverage, overlapping, full, out of order
let mut checker = CompactChecker::new(10);
checker.accumulate(4, 6);
checker.accumulate(0, 4);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(5, 5);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(4, 6);
checker.accumulate(5, 5);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
//
// this case is for attacking those implementation that only check
// the lower-bound of the interval
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(1, 9);
checker.accumulate(2, 3);
checker.accumulate(3, 1);
checker.accumulate(9, 1);
assert!(checker.finish());

// panic case, out of bound
let mut checker = CompactChecker::new(12);
checker.accumulate(0, 13);
checker.finish();
}

#[test]
fn test_gc() {
// ---------------------------------------------------------------------
// test compact on compacted data

let array = {
let mut builder = StringViewBuilder::new();
builder.append_value("I look at you all");
builder.append_option(Some("see the love there that's sleeping"));
builder.finish()
};
let compacted = array.gc();
// verify it is a shallow copy
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());

// ---------------------------------------------------------------------
// test compact on non-compacted data

let mut array = {
let mut builder = StringViewBuilder::new();
builder.append_value("while my guitar gently weeps");
builder.finish()
};
let mut array = StringViewArray::from_iter_values(["while my guitar gently weeps"]);
// shrink the view
let mut view = ByteView::from(array.views[0]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![view.into()]);
array.views = new_views;
let compacted = array.gc();
// verify it is a deep copy
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
// verify content
assert_eq!(array.value(0), compacted.value(0));
// verify compacted
assert!(compacted.compact_check().iter().all(|x| *x));

// ---------------------------------------------------------------------
// test compact on array containing null

let mut array = {
let mut builder = StringViewBuilder::new();
builder.append_null();
builder.append_option(Some("I don't know why nobody told you"));
builder.finish()
};
let mut array =
StringViewArray::from_iter([None, Some("I don't know why nobody told you")]);

let mut view = ByteView::from(array.views[1]);
view.length = 15;
Expand All @@ -958,33 +722,5 @@ mod tests {
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
assert_eq!(array.value(0), compacted.value(0));
assert_eq!(array.value(1), compacted.value(1));
assert!(compacted.compact_check().iter().all(|x| *x));

// ---------------------------------------------------------------------
// test compact on multiple buffers

let mut array = {
let mut builder = StringViewBuilder::new().with_block_size(15);
builder.append_value("how to unfold your love");
builder.append_option(Some("I don't know how someone controlled you"));
builder.finish()
};

// verify it's not same buffer
assert_eq!(array.buffers.len(), 2);
// shrink the view

let mut view = ByteView::from(array.views[1]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]);
array.views = new_views;

let compacted = array.gc();
assert_eq!(compacted.buffers.len(), 2);
assert_eq!(array.value(0), compacted.value(0));
assert_eq!(array.value(1), compacted.value(1));
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
assert_ne!(array.buffers[1].as_ptr(), compacted.buffers[1].as_ptr());
assert!(compacted.compact_check().iter().all(|x| *x));
}
}

0 comments on commit 92d6de5

Please sign in to comment.