Skip to content

Commit

Permalink
Remove ScalarBuffer from parquet (#1849) (#5177) (#5178)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Dec 8, 2023
1 parent a43e82c commit 2a213bc
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 375 deletions.
66 changes: 26 additions & 40 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand Down Expand Up @@ -77,22 +76,19 @@ pub fn make_byte_array_reader(
}

/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: ScalarValue> {
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
}

impl<I: ScalarValue> ByteArrayReader<I> {
impl<I: OffsetSizeTrait> ByteArrayReader<I> {
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
record_reader: GenericRecordReader<
OffsetBuffer<I>,
ByteArrayColumnValueDecoder<I>,
>,
record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
) -> Self {
Self {
data_type,
Expand All @@ -104,7 +100,7 @@ impl<I: ScalarValue> ByteArrayReader<I> {
}
}

impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -167,15 +163,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

/// A [`ColumnValueDecoder`] for variable length byte arrays
struct ByteArrayColumnValueDecoder<I: ScalarValue> {
struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
dict: Option<OffsetBuffer<I>>,
decoder: Option<ByteArrayDecoder>,
validate_utf8: bool,
}

impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
for ByteArrayColumnValueDecoder<I>
{
impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
type Slice = OffsetBuffer<I>;

fn new(desc: &ColumnDescPtr) -> Self {
Expand Down Expand Up @@ -275,17 +269,15 @@ impl ByteArrayDecoder {
num_values,
validate_utf8,
)),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(
data, num_levels, num_values,
))
}
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary(
ByteArrayDecoderDictionary::new(data, num_levels, num_values),
),
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray(
ByteArrayDecoderDelta::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => {
ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, validate_utf8)?)
}
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -298,7 +290,7 @@ impl ByteArrayDecoder {
}

/// Read up to `len` values to `out` with the optional dictionary
pub fn read<I: OffsetSizeTrait + ScalarValue>(
pub fn read<I: OffsetSizeTrait>(
&mut self,
out: &mut OffsetBuffer<I>,
len: usize,
Expand All @@ -307,8 +299,8 @@ impl ByteArrayDecoder {
match self {
ByteArrayDecoder::Plain(d) => d.read(out, len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;
let dict =
dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.read(out, dict, len)
}
Expand All @@ -318,16 +310,16 @@ impl ByteArrayDecoder {
}

/// Skip `len` values
pub fn skip<I: OffsetSizeTrait + ScalarValue>(
pub fn skip<I: OffsetSizeTrait>(
&mut self,
len: usize,
dict: Option<&OffsetBuffer<I>>,
) -> Result<usize> {
match self {
ByteArrayDecoder::Plain(d) => d.skip(len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;
let dict =
dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.skip(dict, len)
}
Expand Down Expand Up @@ -363,7 +355,7 @@ impl ByteArrayDecoderPlain {
}
}

pub fn read<I: OffsetSizeTrait + ScalarValue>(
pub fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
Expand Down Expand Up @@ -392,8 +384,7 @@ impl ByteArrayDecoderPlain {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes);

let start_offset = self.offset + 4;
Expand Down Expand Up @@ -424,8 +415,7 @@ impl ByteArrayDecoderPlain {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
Expand Down Expand Up @@ -462,7 +452,7 @@ impl ByteArrayDecoderDeltaLength {
})
}

fn read<I: OffsetSizeTrait + ScalarValue>(
fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
Expand Down Expand Up @@ -529,7 +519,7 @@ impl ByteArrayDecoderDelta {
})
}

fn read<I: OffsetSizeTrait + ScalarValue>(
fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
Expand Down Expand Up @@ -564,7 +554,7 @@ impl ByteArrayDecoderDictionary {
}
}

fn read<I: OffsetSizeTrait + ScalarValue>(
fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
dict: &OffsetBuffer<I>,
Expand All @@ -576,15 +566,11 @@ impl ByteArrayDecoderDictionary {
}

self.decoder.read(len, |keys| {
output.extend_from_dictionary(
keys,
dict.offsets.as_slice(),
dict.values.as_slice(),
)
output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
})
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
fn skip<I: OffsetSizeTrait>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
Expand Down
45 changes: 16 additions & 29 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ use bytes::Bytes;

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer};
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand Down Expand Up @@ -123,7 +121,7 @@ pub fn make_byte_array_dictionary_reader(
/// An [`ArrayReader`] for dictionary encoded variable length byte arrays
///
/// Will attempt to preserve any dictionary encoding present in the parquet data
struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
Expand All @@ -133,16 +131,13 @@ struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {

impl<K, V> ByteArrayDictionaryReader<K, V>
where
K: FromBytes + ScalarValue + Ord + ArrowNativeType,
V: ScalarValue + OffsetSizeTrait,
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
record_reader: GenericRecordReader<
DictionaryBuffer<K, V>,
DictionaryDecoder<K, V>,
>,
record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
) -> Self {
Self {
data_type,
Expand All @@ -156,8 +151,8 @@ where

impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
where
K: FromBytes + ScalarValue + Ord + ArrowNativeType,
V: ScalarValue + OffsetSizeTrait,
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -226,16 +221,15 @@ struct DictionaryDecoder<K, V> {

impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
where
K: FromBytes + ScalarValue + Ord + ArrowNativeType,
V: ScalarValue + OffsetSizeTrait,
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
type Slice = DictionaryBuffer<K, V>;

fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;

let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8)
{
let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
(true, true) => ArrowType::LargeUtf8,
(true, false) => ArrowType::LargeBinary,
(false, true) => ArrowType::Utf8,
Expand Down Expand Up @@ -274,8 +268,7 @@ where

let len = num_values as usize;
let mut buffer = OffsetBuffer::<V>::default();
let mut decoder =
ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
decoder.read(&mut buffer, usize::MAX)?;

let array = buffer.into_array(None, self.value_type.clone());
Expand Down Expand Up @@ -339,8 +332,8 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
let keys_slice = keys.spare_capacity_mut(range.start + len);
let len = decoder.get_batch(&mut keys_slice[range.start..])?;
let keys_slice = keys.get_output_slice(len);
let len = decoder.get_batch(keys_slice)?;
*max_remaining_values -= len;
Ok(len)
}
Expand All @@ -360,11 +353,7 @@ where
let dict_offsets = dict_buffers[0].typed_data::<V>();
let dict_values = dict_buffers[1].as_slice();

values.extend_from_dictionary(
&keys[..len],
dict_offsets,
dict_values,
)?;
values.extend_from_dictionary(&keys[..len], dict_offsets, dict_values)?;
*max_remaining_values -= len;
Ok(len)
}
Expand All @@ -375,9 +364,7 @@ where

fn skip_values(&mut self, num_values: usize) -> Result<usize> {
match self.decoder.as_mut().expect("decoder set") {
MaybeDictionaryDecoder::Fallback(decoder) => {
decoder.skip::<V>(num_values, None)
}
MaybeDictionaryDecoder::Fallback(decoder) => decoder.skip::<V>(num_values, None),
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values,
Expand Down
Loading

0 comments on commit 2a213bc

Please sign in to comment.