diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 01666c0af4e6..debe0d6109eb 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -19,7 +19,6 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; @@ -77,7 +76,7 @@ pub fn make_byte_array_reader( } /// An [`ArrayReader`] for variable length byte arrays -struct ByteArrayReader { +struct ByteArrayReader { data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -85,14 +84,11 @@ struct ByteArrayReader { record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, } -impl ByteArrayReader { +impl ByteArrayReader { fn new( pages: Box, data_type: ArrowType, - record_reader: GenericRecordReader< - OffsetBuffer, - ByteArrayColumnValueDecoder, - >, + record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, ) -> Self { Self { data_type, @@ -104,7 +100,7 @@ impl ByteArrayReader { } } -impl ArrayReader for ByteArrayReader { +impl ArrayReader for ByteArrayReader { fn as_any(&self) -> &dyn Any { self } @@ -167,15 +163,13 @@ impl ArrayReader for ByteArrayReader { } /// A [`ColumnValueDecoder`] for variable length byte arrays -struct ByteArrayColumnValueDecoder { +struct ByteArrayColumnValueDecoder { dict: Option>, decoder: Option, validate_utf8: bool, } -impl ColumnValueDecoder - for ByteArrayColumnValueDecoder -{ +impl ColumnValueDecoder for ByteArrayColumnValueDecoder { type Slice = OffsetBuffer; fn new(desc: &ColumnDescPtr) -> Self { @@ -275,17 +269,15 @@ impl ByteArrayDecoder { num_values, validate_utf8, )), - Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { - ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new( - data, num_levels, num_values, - )) - } + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary( + ByteArrayDecoderDictionary::new(data, num_levels, num_values), + ), Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength( ByteArrayDecoderDeltaLength::new(data, validate_utf8)?, ), - Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray( - ByteArrayDecoderDelta::new(data, validate_utf8)?, - ), + Encoding::DELTA_BYTE_ARRAY => { + ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, validate_utf8)?) + } _ => { return Err(general_err!( "unsupported encoding for byte array: {}", @@ -298,7 +290,7 @@ impl ByteArrayDecoder { } /// Read up to `len` values to `out` with the optional dictionary - pub fn read( + pub fn read( &mut self, out: &mut OffsetBuffer, len: usize, @@ -307,8 +299,8 @@ impl ByteArrayDecoder { match self { ByteArrayDecoder::Plain(d) => d.read(out, len), ByteArrayDecoder::Dictionary(d) => { - let dict = dict - .ok_or_else(|| general_err!("missing dictionary page for column"))?; + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; d.read(out, dict, len) } @@ -318,7 +310,7 @@ impl ByteArrayDecoder { } /// Skip `len` values - pub fn skip( + pub fn skip( &mut self, len: usize, dict: Option<&OffsetBuffer>, @@ -326,8 +318,8 @@ impl ByteArrayDecoder { match self { ByteArrayDecoder::Plain(d) => d.skip(len), ByteArrayDecoder::Dictionary(d) => { - let dict = dict - .ok_or_else(|| general_err!("missing dictionary page for column"))?; + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; d.skip(dict, len) } @@ -363,7 +355,7 @@ impl ByteArrayDecoderPlain { } } - pub fn read( + pub fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -392,8 +384,7 @@ impl ByteArrayDecoderPlain { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte array".into())); } - let len_bytes: [u8; 4] = - buf[self.offset..self.offset + 4].try_into().unwrap(); + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); let len = u32::from_le_bytes(len_bytes); let start_offset = self.offset + 4; @@ -424,8 +415,7 @@ impl ByteArrayDecoderPlain { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte array".into())); } - let len_bytes: [u8; 4] = - buf[self.offset..self.offset + 4].try_into().unwrap(); + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); let len = u32::from_le_bytes(len_bytes) as usize; skip += 1; self.offset = self.offset + 4 + len; @@ -462,7 +452,7 @@ impl ByteArrayDecoderDeltaLength { }) } - fn read( + fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -529,7 +519,7 @@ impl ByteArrayDecoderDelta { }) } - fn read( + fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -564,7 +554,7 @@ impl ByteArrayDecoderDictionary { } } - fn read( + fn read( &mut self, output: &mut OffsetBuffer, dict: &OffsetBuffer, @@ -576,15 +566,11 @@ impl ByteArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary( - keys, - dict.offsets.as_slice(), - dict.values.as_slice(), - ) + output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice()) }) } - fn skip( + fn skip( &mut self, dict: &OffsetBuffer, to_skip: usize, diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 0d216fa08327..a38122354145 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -27,10 +27,8 @@ use bytes::Bytes; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::buffer::{ - dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, -}; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; +use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer}; +use crate::arrow::record_reader::buffer::BufferQueue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; @@ -123,7 +121,7 @@ pub fn make_byte_array_dictionary_reader( /// An [`ArrayReader`] for dictionary encoded variable length byte arrays /// /// Will attempt to preserve any dictionary encoding present in the parquet data -struct ByteArrayDictionaryReader { +struct ByteArrayDictionaryReader { data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -133,16 +131,13 @@ struct ByteArrayDictionaryReader { impl ByteArrayDictionaryReader where - K: FromBytes + ScalarValue + Ord + ArrowNativeType, - V: ScalarValue + OffsetSizeTrait, + K: FromBytes + Ord + ArrowNativeType, + V: OffsetSizeTrait, { fn new( pages: Box, data_type: ArrowType, - record_reader: GenericRecordReader< - DictionaryBuffer, - DictionaryDecoder, - >, + record_reader: GenericRecordReader, DictionaryDecoder>, ) -> Self { Self { data_type, @@ -156,8 +151,8 @@ where impl ArrayReader for ByteArrayDictionaryReader where - K: FromBytes + ScalarValue + Ord + ArrowNativeType, - V: ScalarValue + OffsetSizeTrait, + K: FromBytes + Ord + ArrowNativeType, + V: OffsetSizeTrait, { fn as_any(&self) -> &dyn Any { self @@ -226,16 +221,15 @@ struct DictionaryDecoder { impl ColumnValueDecoder for DictionaryDecoder where - K: FromBytes + ScalarValue + Ord + ArrowNativeType, - V: ScalarValue + OffsetSizeTrait, + K: FromBytes + Ord + ArrowNativeType, + V: OffsetSizeTrait, { type Slice = DictionaryBuffer; fn new(col: &ColumnDescPtr) -> Self { let validate_utf8 = col.converted_type() == ConvertedType::UTF8; - let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) - { + let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) { (true, true) => ArrowType::LargeUtf8, (true, false) => ArrowType::LargeBinary, (false, true) => ArrowType::Utf8, @@ -274,8 +268,7 @@ where let len = num_values as usize; let mut buffer = OffsetBuffer::::default(); - let mut decoder = - ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); + let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); decoder.read(&mut buffer, usize::MAX)?; let array = buffer.into_array(None, self.value_type.clone()); @@ -339,8 +332,8 @@ where Some(keys) => { // Happy path - can just copy keys // Keys will be validated on conversion to arrow - let keys_slice = keys.spare_capacity_mut(range.start + len); - let len = decoder.get_batch(&mut keys_slice[range.start..])?; + let keys_slice = keys.get_output_slice(len); + let len = decoder.get_batch(keys_slice)?; *max_remaining_values -= len; Ok(len) } @@ -360,11 +353,7 @@ where let dict_offsets = dict_buffers[0].typed_data::(); let dict_values = dict_buffers[1].as_slice(); - values.extend_from_dictionary( - &keys[..len], - dict_offsets, - dict_values, - )?; + values.extend_from_dictionary(&keys[..len], dict_offsets, dict_values)?; *max_remaining_values -= len; Ok(len) } @@ -375,9 +364,7 @@ where fn skip_values(&mut self, num_values: usize) -> Result { match self.decoder.as_mut().expect("decoder set") { - MaybeDictionaryDecoder::Fallback(decoder) => { - decoder.skip::(num_values, None) - } + MaybeDictionaryDecoder::Fallback(decoder) => decoder.skip::(num_values, None), MaybeDictionaryDecoder::Dict { decoder, max_remaining_values, diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index b846997d36b8..849aa37c561f 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -18,7 +18,7 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be}; use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}; +use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{Encoding, Type}; @@ -162,11 +162,10 @@ impl ArrayReader for FixedLenByteArrayReader { fn consume_batch(&mut self) -> Result { let record_data = self.record_reader.consume_record_data(); - let array_data = - ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) - .len(self.record_reader.num_values()) - .add_buffer(record_data) - .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); + let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) + .len(self.record_reader.num_values()) + .add_buffer(record_data) + .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() }); @@ -197,19 +196,13 @@ impl ArrayReader for FixedLenByteArrayReader { IntervalUnit::YearMonth => Arc::new( binary .iter() - .map(|o| { - o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())) - }) + .map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))) .collect::(), ) as ArrayRef, IntervalUnit::DayTime => Arc::new( binary .iter() - .map(|o| { - o.map(|b| { - i64::from_le_bytes(b[4..12].try_into().unwrap()) - }) - }) + .map(|o| o.map(|b| i64::from_le_bytes(b[4..12].try_into().unwrap()))) .collect::(), ) as ArrayRef, IntervalUnit::MonthDayNano => { @@ -247,7 +240,7 @@ impl ArrayReader for FixedLenByteArrayReader { } struct FixedLenByteArrayBuffer { - buffer: ScalarBuffer, + buffer: Vec, /// The length of each element in bytes byte_length: usize, } @@ -263,14 +256,14 @@ impl BufferQueue for FixedLenByteArrayBuffer { type Slice = Self; fn consume(&mut self) -> Self::Output { - self.buffer.consume() + Buffer::from_vec(self.buffer.consume()) } - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { self } - fn set_len(&mut self, len: usize) { + fn truncate_buffer(&mut self, len: usize) { assert_eq!(self.buffer.len(), len * self.byte_length); } } @@ -288,14 +281,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { (read_offset + values_read) * self.byte_length ); self.buffer - .resize((read_offset + levels_read) * self.byte_length); - - let slice = self.buffer.as_slice_mut(); + .resize((read_offset + levels_read) * self.byte_length, 0); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in - values_range.rev().zip(iter_set_bits_rev(valid_mask)) - { + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; @@ -305,7 +294,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let value_pos_bytes = value_pos * self.byte_length; for i in 0..self.byte_length { - slice[level_pos_bytes + i] = slice[value_pos_bytes + i] + self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] } } } @@ -391,8 +380,7 @@ impl ColumnValueDecoder for ValueDecoder { let len = range.end - range.start; match self.decoder.as_mut().unwrap() { Decoder::Plain { offset, buf } => { - let to_read = - (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; + let to_read = (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; let end_offset = *offset + to_read * self.byte_length; out.buffer .extend_from_slice(&buf.as_ref()[*offset..end_offset]); @@ -485,15 +473,12 @@ mod tests { .build() .unwrap(); - let written = RecordBatch::try_from_iter([( - "list", - Arc::new(ListArray::from(data)) as ArrayRef, - )]) - .unwrap(); + let written = + RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)]) + .unwrap(); let mut buffer = Vec::with_capacity(1024); - let mut writer = - ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); + let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); writer.write(&written).unwrap(); writer.close().unwrap(); diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 4ad6c97e2f66..bb32fb307fda 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -16,14 +16,13 @@ // under the License. use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::column::page::PageIterator; use crate::data_type::DataType; use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow_array::ArrayRef; -use arrow_buffer::Buffer; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType as ArrowType; use std::any::Any; use std::sync::Arc; @@ -33,7 +32,7 @@ use std::sync::Arc; pub struct NullArrayReader where T: DataType, - T::T: ScalarValue, + T::T: ArrowNativeType, { data_type: ArrowType, pages: Box, @@ -45,7 +44,7 @@ where impl NullArrayReader where T: DataType, - T::T: ScalarValue, + T::T: ArrowNativeType, { /// Construct null array reader. pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { @@ -65,7 +64,7 @@ where impl ArrayReader for NullArrayReader where T: DataType, - T::T: ScalarValue, + T::T: ArrowNativeType, { fn as_any(&self) -> &dyn Any { self diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index f833eccecb4c..507b6215cacb 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -16,7 +16,6 @@ // under the License. use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::Type as PhysicalType; @@ -26,22 +25,55 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::Decimal256Array; use arrow_array::{ - builder::{BooleanBufferBuilder, TimestampNanosecondBufferBuilder}, - ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, - Int64Array, TimestampNanosecondArray, UInt32Array, UInt64Array, + builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array, + Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array, + UInt64Array, }; -use arrow_buffer::{i256, Buffer}; +use arrow_buffer::{i256, BooleanBuffer, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, TimeUnit}; use std::any::Any; use std::sync::Arc; +/// Provides conversion from `Vec` to `Buffer` +pub trait IntoBuffer { + fn into_buffer(self) -> Buffer; +} + +macro_rules! native_buffer { + ($($t:ty),*) => { + $(impl IntoBuffer for Vec<$t> { + fn into_buffer(self) -> Buffer { + Buffer::from_vec(self) + } + })* + }; +} +native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64); + +impl IntoBuffer for Vec { + fn into_buffer(self) -> Buffer { + BooleanBuffer::from_iter(self).into_inner() + } +} + +impl IntoBuffer for Vec { + fn into_buffer(self) -> Buffer { + let mut builder = TimestampNanosecondBufferBuilder::new(self.len()); + for v in self { + builder.append(v.to_nanos()) + } + builder.finish() + } +} + /// Primitive array readers are leaves of array reader tree. They accept page iterator /// and read them into primitive arrays. pub struct PrimitiveArrayReader where T: DataType, - T::T: ScalarValue, + T::T: Copy + Default, + Vec: IntoBuffer, { data_type: ArrowType, pages: Box, @@ -53,7 +85,8 @@ where impl PrimitiveArrayReader where T: DataType, - T::T: ScalarValue, + T::T: Copy + Default, + Vec: IntoBuffer, { /// Construct primitive array reader. pub fn new( @@ -85,7 +118,8 @@ where impl ArrayReader for PrimitiveArrayReader where T: DataType, - T::T: ScalarValue, + T::T: Copy + Default, + Vec: IntoBuffer, { fn as_any(&self) -> &dyn Any { self @@ -131,40 +165,14 @@ where _ => unreachable!("INT96 must be timestamp nanosecond"), }, PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); + unreachable!("PrimitiveArrayReaders don't support complex physical types"); } }; // Convert to arrays by using the Parquet physical type. // The physical types are then cast to Arrow types if necessary - let record_data = self.record_reader.consume_record_data(); - let record_data = match T::get_physical_type() { - PhysicalType::BOOLEAN => { - let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); - - for e in record_data.as_slice() { - boolean_buffer.append(*e > 0); - } - boolean_buffer.into() - } - PhysicalType::INT96 => { - // SAFETY - record_data is an aligned buffer of Int96 - let (prefix, slice, suffix) = - unsafe { record_data.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - - let mut builder = TimestampNanosecondBufferBuilder::new(slice.len()); - for v in slice { - builder.append(v.to_nanos()) - } - - builder.finish() - } - _ => record_data, - }; + let record_data = self.record_reader.consume_record_data().into_buffer(); let array_data = ArrayDataBuilder::new(arrow_data_type) .len(self.record_reader.num_values()) @@ -188,9 +196,7 @@ where PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)), PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)), PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); + unreachable!("PrimitiveArrayReaders don't support complex physical types"); } }; @@ -409,12 +415,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); @@ -618,12 +621,9 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); let mut accu_len: usize = 0; @@ -697,12 +697,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // read data from the reader // the data type is decimal(8,2) @@ -759,12 +756,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // read data from the reader // the data type is decimal(18,4) diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index 4208318122af..d0f63024edf0 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -16,7 +16,7 @@ // under the License. use crate::arrow::buffer::offset_buffer::OffsetBuffer; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer}; +use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; use crate::column::reader::decoder::ValuesBufferSlice; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait}; @@ -27,17 +27,12 @@ use std::sync::Arc; /// An array of variable length byte arrays that are potentially dictionary encoded /// and can be converted into a corresponding [`ArrayRef`] -pub enum DictionaryBuffer { - Dict { - keys: ScalarBuffer, - values: ArrayRef, - }, - Values { - values: OffsetBuffer, - }, +pub enum DictionaryBuffer { + Dict { keys: Vec, values: ArrayRef }, + Values { values: OffsetBuffer }, } -impl Default for DictionaryBuffer { +impl Default for DictionaryBuffer { fn default() -> Self { Self::Values { values: Default::default(), @@ -45,9 +40,7 @@ impl Default for DictionaryBuffer { } } -impl - DictionaryBuffer -{ +impl DictionaryBuffer { #[allow(unused)] pub fn len(&self) -> usize { match self { @@ -63,7 +56,7 @@ impl /// # Panic /// /// Panics if the dictionary is too large for `K` - pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut ScalarBuffer> { + pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec> { assert!(K::from_usize(dictionary.len()).is_some()); match self { @@ -112,7 +105,7 @@ impl if values.is_empty() { // If dictionary is empty, zero pad offsets - spilled.offsets.resize(keys.len() + 1); + spilled.offsets.resize(keys.len() + 1, V::default()); } else { // Note: at this point null positions will have arbitrary dictionary keys // and this will hydrate them to the corresponding byte array. This is @@ -164,7 +157,7 @@ impl let builder = ArrayDataBuilder::new(data_type.clone()) .len(keys.len()) - .add_buffer(keys.into()) + .add_buffer(Buffer::from_vec(keys)) .add_child_data(values.into_data()) .null_bit_buffer(null_buffer); @@ -192,13 +185,13 @@ impl } } -impl ValuesBufferSlice for DictionaryBuffer { +impl ValuesBufferSlice for DictionaryBuffer { fn capacity(&self) -> usize { usize::MAX } } -impl ValuesBuffer for DictionaryBuffer { +impl ValuesBuffer for DictionaryBuffer { fn pad_nulls( &mut self, read_offset: usize, @@ -208,7 +201,7 @@ impl ValuesBuffer for Dictiona ) { match self { Self::Dict { keys, .. } => { - keys.resize(read_offset + levels_read); + keys.resize(read_offset + levels_read, K::default()); keys.pad_nulls(read_offset, values_read, levels_read, valid_mask) } Self::Values { values, .. } => { @@ -218,7 +211,7 @@ impl ValuesBuffer for Dictiona } } -impl BufferQueue for DictionaryBuffer { +impl BufferQueue for DictionaryBuffer { type Output = Self; type Slice = Self; @@ -234,14 +227,14 @@ impl BufferQueue for Dictionar } } - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { self } - fn set_len(&mut self, len: usize) { + fn truncate_buffer(&mut self, len: usize) { match self { - Self::Dict { keys, .. } => keys.set_len(len), - Self::Values { values } => values.set_len(len), + Self::Dict { keys, .. } => keys.truncate_buffer(len), + Self::Values { values } => values.truncate_buffer(len), } } } diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 3f8f85494f02..459c94ed2803 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -16,7 +16,7 @@ // under the License. use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer}; +use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; use crate::column::reader::decoder::ValuesBufferSlice; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; @@ -27,23 +27,23 @@ use arrow_schema::DataType as ArrowType; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] #[derive(Debug)] -pub struct OffsetBuffer { - pub offsets: ScalarBuffer, - pub values: ScalarBuffer, +pub struct OffsetBuffer { + pub offsets: Vec, + pub values: Vec, } -impl Default for OffsetBuffer { +impl Default for OffsetBuffer { fn default() -> Self { - let mut offsets = ScalarBuffer::new(); - offsets.resize(1); + let mut offsets = Vec::new(); + offsets.resize(1, I::default()); Self { offsets, - values: ScalarBuffer::new(), + values: Vec::new(), } } } -impl OffsetBuffer { +impl OffsetBuffer { /// Returns the number of byte arrays in this buffer pub fn len(&self) -> usize { self.offsets.len() - 1 @@ -128,8 +128,8 @@ impl OffsetBuffer { pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let array_data_builder = ArrayDataBuilder::new(data_type) .len(self.len()) - .add_buffer(self.offsets.into()) - .add_buffer(self.values.into()) + .add_buffer(Buffer::from_vec(self.offsets)) + .add_buffer(Buffer::from_vec(self.values)) .null_bit_buffer(null_buffer); let data = match cfg!(debug_assertions) { @@ -141,7 +141,7 @@ impl OffsetBuffer { } } -impl BufferQueue for OffsetBuffer { +impl BufferQueue for OffsetBuffer { type Output = Self; type Slice = Self; @@ -149,16 +149,16 @@ impl BufferQueue for OffsetBuffer { std::mem::take(self) } - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { self } - fn set_len(&mut self, len: usize) { + fn truncate_buffer(&mut self, len: usize) { assert_eq!(self.offsets.len(), len + 1); } } -impl ValuesBuffer for OffsetBuffer { +impl ValuesBuffer for OffsetBuffer { fn pad_nulls( &mut self, read_offset: usize, @@ -167,9 +167,10 @@ impl ValuesBuffer for OffsetBuffer { valid_mask: &[u8], ) { assert_eq!(self.offsets.len(), read_offset + values_read + 1); - self.offsets.resize(read_offset + levels_read + 1); + self.offsets + .resize(read_offset + levels_read + 1, I::default()); - let offsets = self.offsets.as_slice_mut(); + let offsets = &mut self.offsets; let mut last_pos = read_offset + levels_read + 1; let mut last_start_offset = I::from_usize(self.values.len()).unwrap(); @@ -207,7 +208,7 @@ impl ValuesBuffer for OffsetBuffer { } } -impl ValuesBufferSlice for OffsetBuffer { +impl ValuesBufferSlice for OffsetBuffer { fn capacity(&self) -> usize { usize::MAX } diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 35a322e6c723..3914710ff7b9 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::marker::PhantomData; - use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::data_type::Int96; -use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; /// A buffer that supports writing new data to the end, and removing data from the front /// @@ -37,12 +33,12 @@ pub trait BufferQueue: Sized { /// to append data to the end of this [`BufferQueue`] /// /// NB: writes to the returned slice will not update the length of [`BufferQueue`] - /// instead a subsequent call should be made to [`BufferQueue::set_len`] - fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice; + /// instead a subsequent call should be made to [`BufferQueue::truncate_buffer`] + fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice; /// Sets the length of the [`BufferQueue`]. /// - /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`] + /// Intended to be used in combination with [`BufferQueue::get_output_slice`] /// /// # Panics /// @@ -57,132 +53,27 @@ pub trait BufferQueue: Sized { /// track how much of this slice is actually written to by the caller. This is still /// safe as the slice is default-initialized. /// - fn set_len(&mut self, len: usize); -} - -/// A marker trait for [scalar] types -/// -/// This means that a `[Self::default()]` of length `len` can be safely created from a -/// zero-initialized `[u8]` with length `len * std::mem::size_of::()` and -/// alignment of `std::mem::size_of::()` -/// -/// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types -/// -pub trait ScalarValue: Copy {} -impl ScalarValue for bool {} -impl ScalarValue for u8 {} -impl ScalarValue for i8 {} -impl ScalarValue for u16 {} -impl ScalarValue for i16 {} -impl ScalarValue for u32 {} -impl ScalarValue for i32 {} -impl ScalarValue for u64 {} -impl ScalarValue for i64 {} -impl ScalarValue for f32 {} -impl ScalarValue for f64 {} -impl ScalarValue for Int96 {} - -/// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage -#[derive(Debug)] -pub struct ScalarBuffer { - buffer: MutableBuffer, - - /// Length in elements of size T - len: usize, - - /// Placeholder to allow `T` as an invariant generic parameter - /// without making it !Send - _phantom: PhantomData T>, -} - -impl Default for ScalarBuffer { - fn default() -> Self { - Self::new() - } -} - -impl ScalarBuffer { - pub fn new() -> Self { - Self { - buffer: MutableBuffer::new(0), - len: 0, - _phantom: Default::default(), - } - } - - pub fn len(&self) -> usize { - self.len - } - - pub fn is_empty(&self) -> bool { - self.len == 0 - } - - pub fn reserve(&mut self, additional: usize) { - self.buffer.reserve(additional * std::mem::size_of::()); - } - - pub fn resize(&mut self, len: usize) { - self.buffer.resize(len * std::mem::size_of::(), 0); - self.len = len; - } - - #[inline] - pub fn as_slice(&self) -> &[T] { - let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - buf - } - - #[inline] - pub fn as_slice_mut(&mut self) -> &mut [T] { - let (prefix, buf, suffix) = unsafe { self.buffer.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - buf - } + fn truncate_buffer(&mut self, len: usize); } -impl ScalarBuffer { - pub fn push(&mut self, v: T) { - self.buffer.push(v); - self.len += 1; - } - - pub fn extend_from_slice(&mut self, v: &[T]) { - self.buffer.extend_from_slice(v); - self.len += v.len(); - } -} - -impl From> for Buffer { - fn from(t: ScalarBuffer) -> Self { - t.buffer.into() - } -} - -impl BufferQueue for ScalarBuffer { - type Output = Buffer; +impl BufferQueue for Vec { + type Output = Self; type Slice = [T]; fn consume(&mut self) -> Self::Output { - std::mem::take(self).into() + std::mem::take(self) } - fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { - self.buffer - .resize((self.len + batch_size) * std::mem::size_of::(), 0); - - let range = self.len..self.len + batch_size; - &mut self.as_slice_mut()[range] + fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice { + let len = self.len(); + self.resize(len + batch_size, T::default()); + &mut self[len..] } - fn set_len(&mut self, len: usize) { - self.len = len; - - let new_bytes = self.len * std::mem::size_of::(); - assert!(new_bytes <= self.buffer.len()); - self.buffer.resize(new_bytes, 0); + fn truncate_buffer(&mut self, len: usize) { + assert!(len <= self.len()); + self.truncate(len) } } @@ -212,7 +103,7 @@ pub trait ValuesBuffer: BufferQueue { ); } -impl ValuesBuffer for ScalarBuffer { +impl ValuesBuffer for Vec { fn pad_nulls( &mut self, read_offset: usize, @@ -220,8 +111,7 @@ impl ValuesBuffer for ScalarBuffer { levels_read: usize, valid_mask: &[u8], ) { - let slice = self.as_slice_mut(); - assert!(slice.len() >= read_offset + levels_read); + assert!(self.len() >= read_offset + levels_read); let values_range = read_offset..read_offset + values_read; for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { @@ -229,7 +119,7 @@ impl ValuesBuffer for ScalarBuffer { if level_pos <= value_pos { break; } - slice[level_pos] = slice[value_pos]; + self[level_pos] = self[value_pos]; } } } diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 9009c596c4bf..fa041f5fdb0a 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -30,12 +30,10 @@ use crate::column::reader::decoder::{ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use super::buffer::ScalarBuffer; - enum BufferInner { /// Compute levels and null mask Full { - levels: ScalarBuffer, + levels: Vec, nulls: BooleanBufferBuilder, max_level: i16, }, @@ -77,7 +75,7 @@ impl DefinitionLevelBuffer { } } false => BufferInner::Full { - levels: ScalarBuffer::new(), + levels: Vec::new(), nulls: BooleanBufferBuilder::new(0), max_level: desc.max_def_level(), }, @@ -89,7 +87,7 @@ impl DefinitionLevelBuffer { /// Returns the built level data pub fn consume_levels(&mut self) -> Option { match &mut self.inner { - BufferInner::Full { levels, .. } => Some(std::mem::take(levels).into()), + BufferInner::Full { levels, .. } => Some(Buffer::from_vec(std::mem::take(levels))), BufferInner::Mask { .. } => None, } } @@ -174,9 +172,9 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { assert_eq!(self.max_level, *max_level); assert_eq!(range.start + writer.len, nulls.len()); - levels.resize(range.end + writer.len); + levels.resize(range.end + writer.len, 0); - let slice = &mut levels.as_slice_mut()[writer.len..]; + let slice = &mut levels[writer.len..]; let levels_read = decoder.read_def_levels(slice, range.clone())?; nulls.reserve(levels_read); diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index ea982341994e..49c69c87e302 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -18,7 +18,7 @@ use arrow_buffer::Buffer; use crate::arrow::record_reader::{ - buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, + buffer::{BufferQueue, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; use crate::column::reader::decoder::RepetitionLevelDecoderImpl; @@ -37,8 +37,7 @@ pub(crate) mod buffer; mod definition_levels; /// A `RecordReader` is a stateful column reader that delimits semantic records. -pub type RecordReader = - GenericRecordReader::T>, ColumnValueDecoderImpl>; +pub type RecordReader = GenericRecordReader::T>, ColumnValueDecoderImpl>; pub(crate) type ColumnReader = GenericColumnReader; @@ -53,7 +52,7 @@ pub struct GenericRecordReader { values: V, def_levels: Option, - rep_levels: Option>, + rep_levels: Option>, column_reader: Option>, /// Number of buffered levels / null-padded values num_values: usize, @@ -81,7 +80,7 @@ where let def_levels = (desc.max_def_level() > 0) .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc))); - let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new); + let rep_levels = (desc.max_rep_level() > 0).then(Vec::new); Self { values: records, @@ -174,7 +173,9 @@ where /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. pub fn consume_rep_levels(&mut self) -> Option { - self.rep_levels.as_mut().map(|x| x.consume()) + self.rep_levels + .as_mut() + .map(|x| Buffer::from_vec(x.consume())) } /// Returns currently stored buffer data. @@ -209,9 +210,9 @@ where let rep_levels = self .rep_levels .as_mut() - .map(|levels| levels.spare_capacity_mut(batch_size)); + .map(|levels| levels.get_output_slice(batch_size)); let def_levels = self.def_levels.as_mut(); - let values = self.values.spare_capacity_mut(batch_size); + let values = self.values.get_output_slice(batch_size); let (records_read, values_read, levels_read) = self .column_reader @@ -234,9 +235,9 @@ where self.num_records += records_read; self.num_values += levels_read; - self.values.set_len(self.num_values); + self.values.truncate_buffer(self.num_values); if let Some(ref mut buf) = self.rep_levels { - buf.set_len(self.num_values) + buf.truncate_buffer(self.num_values) }; if let Some(ref mut buf) = self.def_levels { buf.set_len(self.num_values) @@ -257,7 +258,7 @@ mod tests { use std::sync::Arc; use arrow::buffer::Buffer; - use arrow_array::builder::{Int16BufferBuilder, Int32BufferBuilder}; + use arrow_array::builder::Int16BufferBuilder; use crate::basic::Encoding; use crate::data_type::Int32Type; @@ -334,10 +335,7 @@ mod tests { assert_eq!(7, record_reader.num_values()); } - let mut bb = Int32BufferBuilder::new(7); - bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]); - let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 9]); assert_eq!(None, record_reader.consume_def_levels()); assert_eq!(None, record_reader.consume_bitmap()); } @@ -434,13 +432,12 @@ mod tests { // Verify result record data let actual = record_reader.consume_record_data(); - let actual_values = actual.typed_data::(); let expected = &[0, 7, 0, 6, 3, 0, 8]; - assert_eq!(actual_values.len(), expected.len()); + assert_eq!(actual.len(), expected.len()); // Only validate valid values are equal - let iter = expected_valid.iter().zip(actual_values).zip(expected); + let iter = expected_valid.iter().zip(&actual).zip(expected); for ((valid, actual), expected) in iter { if *valid { assert_eq!(actual, expected) @@ -544,12 +541,11 @@ mod tests { // Verify result record data let actual = record_reader.consume_record_data(); - let actual_values = actual.typed_data::(); let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9]; - assert_eq!(actual_values.len(), expected.len()); + assert_eq!(actual.len(), expected.len()); // Only validate valid values are equal - let iter = expected_valid.iter().zip(actual_values).zip(expected); + let iter = expected_valid.iter().zip(&actual).zip(expected); for ((valid, actual), expected) in iter { if *valid { assert_eq!(actual, expected) @@ -713,10 +709,7 @@ mod tests { assert_eq!(0, record_reader.read_records(10).unwrap()); } - let mut bb = Int32BufferBuilder::new(3); - bb.append_slice(&[6, 3, 2]); - let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]); assert_eq!(None, record_reader.consume_def_levels()); assert_eq!(None, record_reader.consume_bitmap()); } @@ -814,13 +807,12 @@ mod tests { // Verify result record data let actual = record_reader.consume_record_data(); - let actual_values = actual.typed_data::(); let expected = &[0, 6, 3]; - assert_eq!(actual_values.len(), expected.len()); + assert_eq!(actual.len(), expected.len()); // Only validate valid values are equal - let iter = expected_valid.iter().zip(actual_values).zip(expected); + let iter = expected_valid.iter().zip(&actual).zip(expected); for ((valid, actual), expected) in iter { if *valid { assert_eq!(actual, expected)