diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index db14845b08d9..521ef088e361 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1016,6 +1016,36 @@ impl PrimitiveArray { PrimitiveArray::new(values, Some(nulls)) } + /// Applies a unary infallible function to each value in an array, producing a + /// new primitive array. + /// + /// # Null Handling + /// + /// See [`Self::unary`] for more information on null handling. + /// + /// # Example: create an [`Int16Array`] from an [`ArrayAccessor`] with item type `&[u8]` + /// ``` + /// use arrow_array::{Array, FixedSizeBinaryArray, Int16Array}; + /// let input_arg = vec![ vec![1, 0], vec![2, 0], vec![3, 0] ]; + /// let arr = FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap(); + /// let c = Int16Array::from_unary(&arr, |x| i16::from_le_bytes(x[..2].try_into().unwrap())); + /// assert_eq!(c, Int16Array::from(vec![Some(1i16), Some(2i16), Some(3i16)])); + /// ``` + pub fn from_unary(left: U, mut op: F) -> Self + where + F: FnMut(U::Item) -> T::Native, + { + let nulls = left.logical_nulls(); + let buffer = unsafe { + // SAFETY: i in range 0..left.len() + let iter = (0..left.len()).map(|i| op(left.value_unchecked(i))); + // SAFETY: upper bound is trusted because `iter` is over a range + Buffer::from_trusted_len_iter(iter) + }; + + PrimitiveArray::new(buffer.into(), nulls) + } + /// Returns a `PrimitiveBuilder` for this array, suitable for mutating values /// in place. /// diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index ed5961586d91..92583155605b 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -120,26 +120,31 @@ impl ArrayReader for ByteArrayReader { self.record_reader.reset(); let array: ArrayRef = match self.data_type { + // Apply conversion to all elements regardless of null slots as the conversions + // are infallible. This improves performance by avoiding a branch in the inner + // loop (see docs for `PrimitiveArray::from_unary`). ArrowType::Decimal128(p, s) => { let array = buffer.into_array(null_buffer, ArrowType::Binary); let binary = array.as_any().downcast_ref::().unwrap(); - let decimal = binary - .iter() - .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?)))) - .collect::() - .with_precision_and_scale(p, s)?; - + // Null slots will have 0 length, so we need to check for that in the lambda + // or sign_extend_be will panic. + let decimal = Decimal128Array::from_unary(binary, |x| match x.len() { + 0 => i128::default(), + _ => i128::from_be_bytes(sign_extend_be(x)), + }) + .with_precision_and_scale(p, s)?; Arc::new(decimal) } ArrowType::Decimal256(p, s) => { let array = buffer.into_array(null_buffer, ArrowType::Binary); let binary = array.as_any().downcast_ref::().unwrap(); - let decimal = binary - .iter() - .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?)))) - .collect::() - .with_precision_and_scale(p, s)?; - + // Null slots will have 0 length, so we need to check for that in the lambda + // or sign_extend_be will panic. + let decimal = Decimal256Array::from_unary(binary, |x| match x.len() { + 0 => i256::default(), + _ => i256::from_be_bytes(sign_extend_be(x)), + }) + .with_precision_and_scale(p, s)?; Arc::new(decimal) } _ => buffer.into_array(null_buffer, self.data_type.clone()), diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 3b2600c54795..01692c242713 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::{ - Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, + ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray, }; use arrow_buffer::{i256, Buffer, IntervalDayTime}; @@ -163,55 +163,36 @@ impl ArrayReader for FixedLenByteArrayReader { let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() }); // TODO: An improvement might be to do this conversion on read + // Note the conversions below apply to all elements regardless of null slots as the + // conversion lambdas are all infallible. This improves performance by avoiding a branch in + // the inner loop (see docs for `PrimitiveArray::from_unary`). let array: ArrayRef = match &self.data_type { ArrowType::Decimal128(p, s) => { - // We can simply reuse the null buffer from `binary` rather than recomputing it - // (as was the case when we simply used `collect` to produce the new array). - // The same applies to the transformations below. - let nulls = binary.nulls().cloned(); - let decimal = binary.iter().map(|o| match o { - Some(b) => i128::from_be_bytes(sign_extend_be(b)), - None => i128::default(), - }); - let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls) - .with_precision_and_scale(*p, *s)?; - Arc::new(decimal) + let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b)); + Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?) + as ArrayRef } ArrowType::Decimal256(p, s) => { - let nulls = binary.nulls().cloned(); - let decimal = binary.iter().map(|o| match o { - Some(b) => i256::from_be_bytes(sign_extend_be(b)), - None => i256::default(), - }); - let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls) - .with_precision_and_scale(*p, *s)?; - Arc::new(decimal) + let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b)); + Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?) + as ArrayRef } ArrowType::Interval(unit) => { - let nulls = binary.nulls().cloned(); // An interval is stored as 3x 32-bit unsigned integers storing months, days, // and milliseconds match unit { IntervalUnit::YearMonth => { - let iter = binary.iter().map(|o| match o { - Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()), - None => i32::default(), - }); - let interval = - IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls); - Arc::new(interval) as ArrayRef + let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap()); + Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef } IntervalUnit::DayTime => { - let iter = binary.iter().map(|o| match o { - Some(b) => IntervalDayTime::new( + let f = |b: &[u8]| { + IntervalDayTime::new( i32::from_le_bytes(b[4..8].try_into().unwrap()), i32::from_le_bytes(b[8..12].try_into().unwrap()), - ), - None => IntervalDayTime::default(), - }); - let interval = - IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls); - Arc::new(interval) as ArrayRef + ) + }; + Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef } IntervalUnit::MonthDayNano => { return Err(nyi_err!("MonthDayNano intervals not supported")); @@ -219,13 +200,8 @@ impl ArrayReader for FixedLenByteArrayReader { } } ArrowType::Float16 => { - let nulls = binary.nulls().cloned(); - let f16s = binary.iter().map(|o| match o { - Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()), - None => f16::default(), - }); - let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls); - Arc::new(f16s) as ArrayRef + let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap()); + Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef } _ => Arc::new(binary) as ArrayRef, }; @@ -488,8 +464,8 @@ mod tests { use crate::arrow::ArrowWriter; use arrow::datatypes::Field; use arrow::error::Result as ArrowResult; - use arrow_array::RecordBatch; use arrow_array::{Array, ListArray}; + use arrow_array::{Decimal256Array, RecordBatch}; use bytes::Bytes; use std::sync::Arc; diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 5e0e09212c7e..010e9c2eed3f 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -217,35 +217,22 @@ where arrow_cast::cast(&a, target_type)? } ArrowType::Decimal128(p, s) => { - // We can simply reuse the null buffer from `array` rather than recomputing it - // (as was the case when we simply used `collect` to produce the new array). - let nulls = array.nulls().cloned(); + // Apply conversion to all elements regardless of null slots as the conversion + // to `i128` is infallible. This improves performance by avoiding a branch in + // the inner loop (see docs for `PrimitiveArray::unary`). let array = match array.data_type() { - ArrowType::Int32 => { - let decimal = array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| match v { - Some(i) => i as i128, - None => i128::default(), - }); - Decimal128Array::from_iter_values_with_nulls(decimal, nulls) - } - - ArrowType::Int64 => { - let decimal = array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| match v { - Some(i) => i as i128, - None => i128::default(), - }); - Decimal128Array::from_iter_values_with_nulls(decimal, nulls) - } + ArrowType::Int32 => array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i as i128) + as Decimal128Array, + ArrowType::Int64 => array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i as i128) + as Decimal128Array, _ => { return Err(arrow_err!( "Cannot convert {:?} to decimal", @@ -258,35 +245,20 @@ where Arc::new(array) as ArrayRef } ArrowType::Decimal256(p, s) => { - // We can simply reuse the null buffer from `array` rather than recomputing it - // (as was the case when we simply used `collect` to produce the new array). - let nulls = array.nulls().cloned(); + // See above comment. Conversion to `i256` is likewise infallible. let array = match array.data_type() { - ArrowType::Int32 => { - let decimal = array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| match v { - Some(i) => i256::from_i128(i as i128), - None => i256::default(), - }); - Decimal256Array::from_iter_values_with_nulls(decimal, nulls) - } - - ArrowType::Int64 => { - let decimal = array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| match v { - Some(i) => i256::from_i128(i as i128), - None => i256::default(), - }); - Decimal256Array::from_iter_values_with_nulls(decimal, nulls) - } + ArrowType::Int32 => array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i256::from_i128(i as i128)) + as Decimal256Array, + ArrowType::Int64 => array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i256::from_i128(i as i128)) + as Decimal256Array, _ => { return Err(arrow_err!( "Cannot convert {:?} to decimal",