Skip to content

Commit

Permalink
Use unary() for array conversion in Parquet array readers, speed up…
Browse files Browse the repository at this point in the history
… `Decimal128`, `Decimal256` and `Float16` (#6252)

* add unary to FixedSizeBinaryArray; use unary for transformations

* clean up documentation some

* add from_unary to PrimitiveArray

* use from_unary for converting byte array to decimal

* rework from_unary to skip vector initialization

* add example to from_unary docstring

* fix broken link

* add comments per review suggestion
  • Loading branch information
etseidl authored Aug 22, 2024
1 parent 2795b94 commit 6dd4a5f
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 112 deletions.
30 changes: 30 additions & 0 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
PrimitiveArray::new(values, Some(nulls))
}

/// Applies a unary infallible function to each value in an array, producing a
/// new primitive array.
///
/// # Null Handling
///
/// See [`Self::unary`] for more information on null handling.
///
/// # Example: create an [`Int16Array`] from an [`ArrayAccessor`] with item type `&[u8]`
/// ```
/// use arrow_array::{Array, FixedSizeBinaryArray, Int16Array};
/// let input_arg = vec![ vec![1, 0], vec![2, 0], vec![3, 0] ];
/// let arr = FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap();
/// let c = Int16Array::from_unary(&arr, |x| i16::from_le_bytes(x[..2].try_into().unwrap()));
/// assert_eq!(c, Int16Array::from(vec![Some(1i16), Some(2i16), Some(3i16)]));
/// ```
pub fn from_unary<U: ArrayAccessor, F>(left: U, mut op: F) -> Self
where
F: FnMut(U::Item) -> T::Native,
{
let nulls = left.logical_nulls();
let buffer = unsafe {
// SAFETY: i in range 0..left.len()
let iter = (0..left.len()).map(|i| op(left.value_unchecked(i)));
// SAFETY: upper bound is trusted because `iter` is over a range
Buffer::from_trusted_len_iter(iter)
};

PrimitiveArray::new(buffer.into(), nulls)
}

/// Returns a `PrimitiveBuilder` for this array, suitable for mutating values
/// in place.
///
Expand Down
29 changes: 17 additions & 12 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,31 @@ impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
self.record_reader.reset();

let array: ArrayRef = match self.data_type {
// Apply conversion to all elements regardless of null slots as the conversions
// are infallible. This improves performance by avoiding a branch in the inner
// loop (see docs for `PrimitiveArray::from_unary`).
ArrowType::Decimal128(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
.with_precision_and_scale(p, s)?;

// Null slots will have 0 length, so we need to check for that in the lambda
// or sign_extend_be will panic.
let decimal = Decimal128Array::from_unary(binary, |x| match x.len() {
0 => i128::default(),
_ => i128::from_be_bytes(sign_extend_be(x)),
})
.with_precision_and_scale(p, s)?;
Arc::new(decimal)
}
ArrowType::Decimal256(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal256Array>()
.with_precision_and_scale(p, s)?;

// Null slots will have 0 length, so we need to check for that in the lambda
// or sign_extend_be will panic.
let decimal = Decimal256Array::from_unary(binary, |x| match x.len() {
0 => i256::default(),
_ => i256::from_be_bytes(sign_extend_be(x)),
})
.with_precision_and_scale(p, s)?;
Arc::new(decimal)
}
_ => buffer.into_array(null_buffer, self.data_type.clone()),
Expand Down
64 changes: 20 additions & 44 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -163,69 +163,45 @@ impl ArrayReader for FixedLenByteArrayReader {
let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });

// TODO: An improvement might be to do this conversion on read
// Note the conversions below apply to all elements regardless of null slots as the
// conversion lambdas are all infallible. This improves performance by avoiding a branch in
// the inner loop (see docs for `PrimitiveArray::from_unary`).
let array: ArrayRef = match &self.data_type {
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `binary` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
// The same applies to the transformations below.
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i128::from_be_bytes(sign_extend_be(b)),
None => i128::default(),
});
let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;
Arc::new(decimal)
let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
as ArrayRef
}
ArrowType::Decimal256(p, s) => {
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i256::from_be_bytes(sign_extend_be(b)),
None => i256::default(),
});
let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;
Arc::new(decimal)
let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
as ArrayRef
}
ArrowType::Interval(unit) => {
let nulls = binary.nulls().cloned();
// An interval is stored as 3x 32-bit unsigned integers storing months, days,
// and milliseconds
match unit {
IntervalUnit::YearMonth => {
let iter = binary.iter().map(|o| match o {
Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()),
None => i32::default(),
});
let interval =
IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::DayTime => {
let iter = binary.iter().map(|o| match o {
Some(b) => IntervalDayTime::new(
let f = |b: &[u8]| {
IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
),
None => IntervalDayTime::default(),
});
let interval =
IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
)
};
Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
}
}
}
ArrowType::Float16 => {
let nulls = binary.nulls().cloned();
let f16s = binary.iter().map(|o| match o {
Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()),
None => f16::default(),
});
let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls);
Arc::new(f16s) as ArrayRef
let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
}
_ => Arc::new(binary) as ArrayRef,
};
Expand Down Expand Up @@ -488,8 +464,8 @@ mod tests {
use crate::arrow::ArrowWriter;
use arrow::datatypes::Field;
use arrow::error::Result as ArrowResult;
use arrow_array::RecordBatch;
use arrow_array::{Array, ListArray};
use arrow_array::{Decimal256Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

Expand Down
84 changes: 28 additions & 56 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,22 @@ where
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in
// the inner loop (see docs for `PrimitiveArray::unary`).
let array = match array.data_type() {
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand All @@ -258,35 +245,20 @@ where
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
// See above comment. Conversion to `i256` is likewise infallible.
let array = match array.data_type() {
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand Down

0 comments on commit 6dd4a5f

Please sign in to comment.