Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid Allocating When Decoding Thrift #5777

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions parquet/postprocess.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

set -e

echo "Removing TProcessor"

sed -i "/use thrift::server::TProcessor;/d" $1

echo "Replacing TSerializable"

sed -i "s/impl TSerializable for/impl<'de> crate::thrift::TSerializable<'de> for/g" $1

echo "Rewriting write_to_out_protocol"

sed -i "s/fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol)/fn write_to_out_protocol<T: TOutputProtocol>(\&self, o_prot: \&mut T)/g" $1

echo "Rewriting read_from_in_protocol"

sed -i "s/fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol)/fn read_from_in_protocol<T: TInputProtocol>(i_prot: \&mut T)/g" $1

echo "Rewriting return value expectations"

sed -i "s/Ok(ret.expect(\"return value should have been constructed\"))/ret.ok_or_else(|| thrift::Error::Protocol(ProtocolError::new(ProtocolErrorKind::InvalidData, \"return value should have been constructed\")))/g" $1

echo "Rewriting TInputProtocol"

sed -i "s/T: TInputProtocol/T: crate::thrift::TInputProtocolRef<'de>/g" $1

echo "Rewriting read_string()"

sed -i "s/read_string()/read_str()/g" $1
sed -i "s/<String>/<std::borrow::Cow<'de, str>>/g" $1
sed -i "s/: String/: std::borrow::Cow<'de, str>/g" $1

echo "Rewriting read_bytes()"

sed -i "s/read_bytes()/read_buf()/g" $1
sed -i "s/<Vec<u8>>/<std::borrow::Cow<'de, [u8]>>/g" $1
sed -i "s/: Vec<u8>/: std::borrow::Cow<'de, [u8]>/g" $1



for d in ColumnChunk Statistics DataPageHeader DataPageHeaderV2 ColumnMetaData ColumnIndex FileMetaData FileCryptoMetaData AesGcmV1 EncryptionWithColumnKey PageHeader RowGroup AesGcmCtrV1 EncryptionAlgorithm KeyValue ColumnCryptoMetaData SchemaElement; do
echo "Rewriting $d with lifetime"

sed -i "s/enum $d {/enum $d<'de> {/g" $1
sed -i "s/struct $d {/struct $d<'de> {/g" $1
sed -i "s/for $d {/for $d<'de> {/g" $1
sed -i "s/impl $d {/impl<'de> $d<'de> {/g" $1
sed -i "s/<$d>/<$d<'de>>/g" $1
sed -i "s/: $d/: $d<'de>/g" $1
sed -i "s/($d)/($d<'de>)/g" $1
sed -i "s/-> $d /-> $d<'de> /g" $1
done
15 changes: 3 additions & 12 deletions parquet/regen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,10 @@ REVISION=46cc3a0647d301bb9579ca8dd2cc356caf2a72d2

SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)"

docker run -v $SOURCE_DIR:/thrift -it archlinux /bin/bash -c "\
docker run -v $SOURCE_DIR:/thrift --env REVISION=$REVISION -it archlinux /bin/bash -c "\
pacman -Sy --noconfirm wget thrift && \
wget https://raw.githubusercontent.com/apache/parquet-format/$REVISION/src/main/thrift/parquet.thrift -O /tmp/parquet.thrift && \
thrift --gen rs /tmp/parquet.thrift && \
echo 'Removing TProcessor' && \
sed -i '/use thrift::server::TProcessor;/d' parquet.rs && \
echo 'Replacing TSerializable' && \
sed -i 's/impl TSerializable for/impl crate::thrift::TSerializable for/g' parquet.rs && \
echo 'Rewriting write_to_out_protocol' && \
sed -i 's/fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol)/fn write_to_out_protocol<T: TOutputProtocol>(\&self, o_prot: \&mut T)/g' parquet.rs && \
echo 'Rewriting read_from_in_protocol' && \
sed -i 's/fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol)/fn read_from_in_protocol<T: TInputProtocol>(i_prot: \&mut T)/g' parquet.rs && \
echo 'Rewriting return value expectations' && \
sed -i 's/Ok(ret.expect(\"return value should have been constructed\"))/ret.ok_or_else(|| thrift::Error::Protocol(ProtocolError::new(ProtocolErrorKind::InvalidData, \"return value should have been constructed\")))/g' parquet.rs && \
bash /thrift/postprocess.sh parquet.rs
mv parquet.rs /thrift/src/format.rs
"
"
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2027,7 +2027,7 @@ mod tests {
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
) -> Result<crate::format::FileMetaData> {
) -> Result<crate::format::FileMetaData<'static>> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ impl<W: Write + Send> ArrowWriter<W> {
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
pub fn finish(&mut self) -> Result<crate::format::FileMetaData<'static>> {
self.flush()?;
self.writer.finish()
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
pub fn close(mut self) -> Result<crate::format::FileMetaData<'static>> {
self.finish()
}
}
Expand Down Expand Up @@ -3054,7 +3054,7 @@ mod tests {
{
assert!(!key_value_metadata
.iter()
.any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
.any(|kv| kv.key.as_ref() == ARROW_SCHEMA_META_KEY));
}
}

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
///
/// This method allows to append metadata after [`RecordBatch`]es are written.
pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue<'static>) {
self.sync_writer.append_key_value_metadata(kv_metadata);
}

/// Close and finalize the writer.
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
pub async fn close(mut self) -> Result<FileMetaData<'static>> {
let metadata = self.sync_writer.finish()?;

// Force to flush the remaining data.
Expand Down
13 changes: 7 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
let encoded = encode_arrow_schema(schema);

let schema_kv = KeyValue {
key: super::ARROW_SCHEMA_META_KEY.to_string(),
value: Some(encoded),
key: super::ARROW_SCHEMA_META_KEY.into(),
value: Some(encoded.into()),
};

let meta = props
Expand All @@ -210,7 +210,7 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
let schema_meta = meta
.iter()
.enumerate()
.find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
.find(|(_, kv)| kv.key.as_ref() == super::ARROW_SCHEMA_META_KEY);
match schema_meta {
Some((i, _)) => {
meta.remove(i);
Expand Down Expand Up @@ -245,7 +245,7 @@ fn parse_key_value_metadata(
.filter_map(|kv| {
kv.value
.as_ref()
.map(|value| (kv.key.clone(), value.clone()))
.map(|value| (kv.key.to_string(), value.to_string()))
})
.collect();

Expand Down Expand Up @@ -579,6 +579,7 @@ fn field_id(field: &Field) -> Option<i32> {
mod tests {
use super::*;

use std::borrow::Cow;
use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
Expand Down Expand Up @@ -1581,8 +1582,8 @@ mod tests {
let parquet_group_type = parse_message_type(message_type).unwrap();

let key_value_metadata = vec![
KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
KeyValue::new("baz".to_owned(), None),
KeyValue::new("foo".into(), Cow::Borrowed("bar")),
KeyValue::new("baz".into(), None),
];

let mut expected_metadata: HashMap<String, String> = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ pub struct PageMetadata {
pub is_dict: bool,
}

impl TryFrom<&PageHeader> for PageMetadata {
impl<'a> TryFrom<&PageHeader<'a>> for PageMetadata {
type Error = ParquetError;

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
Expand Down
28 changes: 14 additions & 14 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub struct ColumnCloseResult {
/// Optional bloom filter for this column
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
pub column_index: Option<ColumnIndex<'static>>,
/// Optional offset index, identifying page locations
pub offset_index: Option<OffsetIndex>,
}
Expand Down Expand Up @@ -2563,8 +2563,8 @@ mod tests {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
// note that we don't increment here, as this is a non BinaryArray type.
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
assert_eq!(stats.max_bytes(), column_index.max_values.get(1).unwrap());
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_ref());
assert_eq!(stats.max_bytes(), column_index.max_values[1].as_ref());
} else {
panic!("expecting Statistics::Int32");
}
Expand Down Expand Up @@ -2619,14 +2619,14 @@ mod tests {
let column_index_max_value = &column_index.max_values[0];

// Column index stats are truncated, while the column chunk's aren't.
assert_ne!(stats.min_bytes(), column_index_min_value.as_slice());
assert_ne!(stats.max_bytes(), column_index_max_value.as_slice());
assert_ne!(stats.min_bytes(), column_index_min_value.as_ref());
assert_ne!(stats.max_bytes(), column_index_max_value.as_ref());

assert_eq!(
column_index_min_value.len(),
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
);
assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
assert_eq!(column_index_min_value.as_ref(), &[97_u8; 64]);
assert_eq!(
column_index_max_value.len(),
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
Expand Down Expand Up @@ -2682,14 +2682,14 @@ mod tests {
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::FixedLenByteArray(_stats) = stats {
let column_index_min_value = &column_index.min_values[0];
let column_index_max_value = &column_index.max_values[0];
let column_index_min_value = column_index.min_values[0].as_ref();
let column_index_max_value = column_index.max_values[0].as_ref();

assert_eq!(column_index_min_value.len(), 1);
assert_eq!(column_index_max_value.len(), 1);

assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
assert_eq!("B".as_bytes(), column_index_min_value.as_ref());
assert_eq!("C".as_bytes(), column_index_max_value.as_ref());

assert_ne!(column_index_min_value, stats.min_bytes());
assert_ne!(column_index_max_value, stats.max_bytes());
Expand Down Expand Up @@ -2719,8 +2719,8 @@ mod tests {
// stats should still be written
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index_min_bytes = column_index.min_values[0].as_slice();
let column_index_max_bytes = column_index.max_values[0].as_slice();
let column_index_min_bytes = column_index.min_values[0].as_ref();
let column_index_max_bytes = column_index.max_values[0].as_ref();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);

Expand Down Expand Up @@ -2759,8 +2759,8 @@ mod tests {
// stats should still be written
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index_min_bytes = column_index.min_values[0].as_slice();
let column_index_max_bytes = column_index.max_values[0].as_slice();
let column_index_min_bytes = column_index.min_values[0].as_ref();
let column_index_max_bytes = column_index.max_values[0].as_ref();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);

Expand Down
7 changes: 7 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! representations.
use bytes::Bytes;
use half::f16;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::fmt;
use std::mem;
Expand Down Expand Up @@ -210,6 +211,12 @@ impl<'a> From<&'a [u8]> for ByteArray {
}
}

impl<'a> From<Cow<'a, [u8]>> for ByteArray {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth highlighting that this does perform an allocation, and does mean that when decoding into the Rust versions of statistics, etc... we still allocate. However, we now only do this for ByteArray types, whereas previously all columns would have associated allocations, and theoretically the reader could perform projection pushdown at this point.

fn from(value: Cow<'a, [u8]>) -> Self {
value.to_vec().into()
}
}

impl<'a> From<&'a str> for ByteArray {
fn from(s: &'a str) -> ByteArray {
let mut v = Vec::new();
Expand Down
14 changes: 12 additions & 2 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,22 @@ pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr);
let kv_metadata = t_file_metadata.key_value_metadata.map(|x| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formulation is a little obtuse, we'd likely want to do something to make this better

x.into_iter()
.map(|x| {
KeyValue::new(
x.key.into_owned().into(),
x.value.map(|x| x.into_owned().into()),
)
})
.collect()
});

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
t_file_metadata.created_by.map(|x| x.to_string()),
kv_metadata,
schema_descr,
column_orders,
);
Expand Down
23 changes: 13 additions & 10 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ParquetMetaData {
}
}

pub type KeyValue = crate::format::KeyValue;
pub type KeyValue = crate::format::KeyValue<'static>;

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
Expand Down Expand Up @@ -376,7 +376,7 @@ impl RowGroupMetaData {
}

/// Method to convert to Thrift.
pub fn to_thrift(&self) -> RowGroup {
pub fn to_thrift(&self) -> RowGroup<'static> {
RowGroup {
columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
total_byte_size: self.total_byte_size,
Expand Down Expand Up @@ -647,7 +647,7 @@ impl ColumnChunkMetaData {
.map(Encoding::try_from)
.collect::<Result<_>>()?;
let compression = Compression::try_from(col_metadata.codec)?;
let file_path = cc.file_path;
let file_path = cc.file_path.map(|x| x.to_string());
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
let total_compressed_size = col_metadata.total_compressed_size;
Expand Down Expand Up @@ -697,11 +697,11 @@ impl ColumnChunkMetaData {
}

/// Method to convert to Thrift.
pub fn to_thrift(&self) -> ColumnChunk {
pub fn to_thrift(&self) -> ColumnChunk<'static> {
let column_metadata = self.to_column_metadata_thrift();

ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_path: self.file_path().map(|s| s.to_string().into()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
Expand All @@ -714,11 +714,14 @@ impl ColumnChunkMetaData {
}

/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData<'static> {
let path_parts = self.column_path().parts();
let path_in_schema = path_parts.iter().map(|x| x.clone().into()).collect();

ColumnMetaData {
path_in_schema,
type_: self.column_type().into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: self.column_path().as_ref().to_vec(),
codec: self.compression.into(),
num_values: self.num_values,
total_uncompressed_size: self.total_uncompressed_size,
Expand Down Expand Up @@ -941,11 +944,11 @@ impl ColumnIndexBuilder {
}

/// Build and get the thrift metadata of column index
pub fn build_to_thrift(self) -> ColumnIndex {
pub fn build_to_thrift(self) -> ColumnIndex<'static> {
ColumnIndex::new(
self.null_pages,
self.min_values,
self.max_values,
self.min_values.into_iter().map(|x| x.into()).collect(),
self.max_values.into_iter().map(|x| x.into()).collect(),
self.boundary_order,
self.null_counts,
)
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ impl<T: ParquetValueType> NativeIndex<T> {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice();
let max = max.as_slice();
let min = min.as_ref();
let max = max.as_ref();
(Some(from_le_slice::<T>(min)), Some(from_le_slice::<T>(max)))
};
Ok(PageIndex {
Expand Down
Loading
Loading