Skip to content

Commit

Permalink
Support lifetiems in thrift format
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 16, 2024
1 parent cd39b8c commit 5ddbd83
Show file tree
Hide file tree
Showing 20 changed files with 472 additions and 381 deletions.
54 changes: 54 additions & 0 deletions parquet/postprocess.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

set -e

echo "Removing TProcessor"

sed -i "/use thrift::server::TProcessor;/d" $1

echo "Replacing TSerializable"

sed -i "s/impl TSerializable for/impl<'de> crate::thrift::TSerializable<'de> for/g" $1

echo "Rewriting write_to_out_protocol"

sed -i "s/fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol)/fn write_to_out_protocol<T: TOutputProtocol>(\&self, o_prot: \&mut T)/g" $1

echo "Rewriting read_from_in_protocol"

sed -i "s/fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol)/fn read_from_in_protocol<T: TInputProtocol>(i_prot: \&mut T)/g" $1

echo "Rewriting return value expectations"

sed -i "s/Ok(ret.expect(\"return value should have been constructed\"))/ret.ok_or_else(|| thrift::Error::Protocol(ProtocolError::new(ProtocolErrorKind::InvalidData, \"return value should have been constructed\")))/g" $1

echo "Rewriting TInputProtocol"

sed -i "s/T: TInputProtocol/T: crate::thrift::TInputProtocolRef<'de>/g" $1

echo "Rewriting read_string()"

sed -i "s/read_string()/read_str()/g" $1
sed -i "s/<String>/<std::borrow::Cow<'de, str>>/g" $1
sed -i "s/: String/: std::borrow::Cow<'de, str>/g" $1

echo "Rewriting read_bytes()"

sed -i "s/read_bytes()/read_buf()/g" $1
sed -i "s/<Vec<u8>>/<std::borrow::Cow<'de, [u8]>>/g" $1
sed -i "s/: Vec<u8>/: std::borrow::Cow<'de, [u8]>/g" $1



for d in ColumnChunk Statistics DataPageHeader DataPageHeaderV2 ColumnMetaData ColumnIndex FileMetaData FileCryptoMetaData AesGcmV1 EncryptionWithColumnKey PageHeader RowGroup AesGcmCtrV1 EncryptionAlgorithm KeyValue ColumnCryptoMetaData SchemaElement; do
echo "Rewriting $d with lifetime"

sed -i "s/enum $d {/enum $d<'de> {/g" $1
sed -i "s/struct $d {/struct $d<'de> {/g" $1
sed -i "s/for $d {/for $d<'de> {/g" $1
sed -i "s/impl $d {/impl<'de> $d<'de> {/g" $1
sed -i "s/<$d>/<$d<'de>>/g" $1
sed -i "s/: $d/: $d<'de>/g" $1
sed -i "s/($d)/($d<'de>)/g" $1
sed -i "s/-> $d /-> $d<'de> /g" $1
done
15 changes: 3 additions & 12 deletions parquet/regen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,10 @@ REVISION=46cc3a0647d301bb9579ca8dd2cc356caf2a72d2

SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)"

docker run -v $SOURCE_DIR:/thrift -it archlinux /bin/bash -c "\
docker run -v $SOURCE_DIR:/thrift --env REVISION=$REVISION -it archlinux /bin/bash -c "\
pacman -Sy --noconfirm wget thrift && \
wget https://raw.githubusercontent.com/apache/parquet-format/$REVISION/src/main/thrift/parquet.thrift -O /tmp/parquet.thrift && \
thrift --gen rs /tmp/parquet.thrift && \
echo 'Removing TProcessor' && \
sed -i '/use thrift::server::TProcessor;/d' parquet.rs && \
echo 'Replacing TSerializable' && \
sed -i 's/impl TSerializable for/impl crate::thrift::TSerializable for/g' parquet.rs && \
echo 'Rewriting write_to_out_protocol' && \
sed -i 's/fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol)/fn write_to_out_protocol<T: TOutputProtocol>(\&self, o_prot: \&mut T)/g' parquet.rs && \
echo 'Rewriting read_from_in_protocol' && \
sed -i 's/fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol)/fn read_from_in_protocol<T: TInputProtocol>(i_prot: \&mut T)/g' parquet.rs && \
echo 'Rewriting return value expectations' && \
sed -i 's/Ok(ret.expect(\"return value should have been constructed\"))/ret.ok_or_else(|| thrift::Error::Protocol(ProtocolError::new(ProtocolErrorKind::InvalidData, \"return value should have been constructed\")))/g' parquet.rs && \
bash /thrift/postprocess.sh parquet.rs
mv parquet.rs /thrift/src/format.rs
"
"
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2027,7 +2027,7 @@ mod tests {
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
) -> Result<crate::format::FileMetaData> {
) -> Result<crate::format::FileMetaData<'static>> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ impl<W: Write + Send> ArrowWriter<W> {
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
pub fn finish(&mut self) -> Result<crate::format::FileMetaData<'static>> {
self.flush()?;
self.writer.finish()
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
pub fn close(mut self) -> Result<crate::format::FileMetaData<'static>> {
self.finish()
}
}
Expand Down Expand Up @@ -3054,7 +3054,7 @@ mod tests {
{
assert!(!key_value_metadata
.iter()
.any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
.any(|kv| kv.key.as_ref() == ARROW_SCHEMA_META_KEY));
}
}

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
///
/// This method allows to append metadata after [`RecordBatch`]es are written.
pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue<'static>) {
self.sync_writer.append_key_value_metadata(kv_metadata);
}

/// Close and finalize the writer.
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
pub async fn close(mut self) -> Result<FileMetaData<'static>> {
let metadata = self.sync_writer.finish()?;

// Force to flush the remaining data.
Expand Down
13 changes: 7 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
let encoded = encode_arrow_schema(schema);

let schema_kv = KeyValue {
key: super::ARROW_SCHEMA_META_KEY.to_string(),
value: Some(encoded),
key: super::ARROW_SCHEMA_META_KEY.into(),
value: Some(encoded.into()),
};

let meta = props
Expand All @@ -210,7 +210,7 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
let schema_meta = meta
.iter()
.enumerate()
.find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
.find(|(_, kv)| kv.key.as_ref() == super::ARROW_SCHEMA_META_KEY);
match schema_meta {
Some((i, _)) => {
meta.remove(i);
Expand Down Expand Up @@ -245,7 +245,7 @@ fn parse_key_value_metadata(
.filter_map(|kv| {
kv.value
.as_ref()
.map(|value| (kv.key.clone(), value.clone()))
.map(|value| (kv.key.to_string(), value.to_string()))
})
.collect();

Expand Down Expand Up @@ -579,6 +579,7 @@ fn field_id(field: &Field) -> Option<i32> {
mod tests {
use super::*;

use std::borrow::Cow;
use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
Expand Down Expand Up @@ -1581,8 +1582,8 @@ mod tests {
let parquet_group_type = parse_message_type(message_type).unwrap();

let key_value_metadata = vec![
KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
KeyValue::new("baz".to_owned(), None),
KeyValue::new("foo".into(), Cow::Borrowed("bar")),
KeyValue::new("baz".into(), None),
];

let mut expected_metadata: HashMap<String, String> = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ pub struct PageMetadata {
pub is_dict: bool,
}

impl TryFrom<&PageHeader> for PageMetadata {
impl<'a> TryFrom<&PageHeader<'a>> for PageMetadata {
type Error = ParquetError;

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
Expand Down
28 changes: 14 additions & 14 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub struct ColumnCloseResult {
/// Optional bloom filter for this column
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
pub column_index: Option<ColumnIndex<'static>>,
/// Optional offset index, identifying page locations
pub offset_index: Option<OffsetIndex>,
}
Expand Down Expand Up @@ -2563,8 +2563,8 @@ mod tests {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
// note that we don't increment here, as this is a non BinaryArray type.
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
assert_eq!(stats.max_bytes(), column_index.max_values.get(1).unwrap());
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_ref());
assert_eq!(stats.max_bytes(), column_index.max_values[1].as_ref());
} else {
panic!("expecting Statistics::Int32");
}
Expand Down Expand Up @@ -2619,14 +2619,14 @@ mod tests {
let column_index_max_value = &column_index.max_values[0];

// Column index stats are truncated, while the column chunk's aren't.
assert_ne!(stats.min_bytes(), column_index_min_value.as_slice());
assert_ne!(stats.max_bytes(), column_index_max_value.as_slice());
assert_ne!(stats.min_bytes(), column_index_min_value.as_ref());
assert_ne!(stats.max_bytes(), column_index_max_value.as_ref());

assert_eq!(
column_index_min_value.len(),
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
);
assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
assert_eq!(column_index_min_value.as_ref(), &[97_u8; 64]);
assert_eq!(
column_index_max_value.len(),
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
Expand Down Expand Up @@ -2682,14 +2682,14 @@ mod tests {
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::FixedLenByteArray(_stats) = stats {
let column_index_min_value = &column_index.min_values[0];
let column_index_max_value = &column_index.max_values[0];
let column_index_min_value = column_index.min_values[0].as_ref();
let column_index_max_value = column_index.max_values[0].as_ref();

assert_eq!(column_index_min_value.len(), 1);
assert_eq!(column_index_max_value.len(), 1);

assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
assert_eq!("B".as_bytes(), column_index_min_value.as_ref());
assert_eq!("C".as_bytes(), column_index_max_value.as_ref());

assert_ne!(column_index_min_value, stats.min_bytes());
assert_ne!(column_index_max_value, stats.max_bytes());
Expand Down Expand Up @@ -2719,8 +2719,8 @@ mod tests {
// stats should still be written
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index_min_bytes = column_index.min_values[0].as_slice();
let column_index_max_bytes = column_index.max_values[0].as_slice();
let column_index_min_bytes = column_index.min_values[0].as_ref();
let column_index_max_bytes = column_index.max_values[0].as_ref();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);

Expand Down Expand Up @@ -2759,8 +2759,8 @@ mod tests {
// stats should still be written
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index_min_bytes = column_index.min_values[0].as_slice();
let column_index_max_bytes = column_index.max_values[0].as_slice();
let column_index_min_bytes = column_index.min_values[0].as_ref();
let column_index_max_bytes = column_index.max_values[0].as_ref();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);

Expand Down
7 changes: 7 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! representations.
use bytes::Bytes;
use half::f16;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::fmt;
use std::mem;
Expand Down Expand Up @@ -210,6 +211,12 @@ impl<'a> From<&'a [u8]> for ByteArray {
}
}

impl<'a> From<Cow<'a, [u8]>> for ByteArray {
fn from(value: Cow<'a, [u8]>) -> Self {
value.to_vec().into()
}
}

impl<'a> From<&'a str> for ByteArray {
fn from(s: &'a str) -> ByteArray {
let mut v = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
t_file_metadata.created_by.map(|x| x.to_string()),
None, //t_file_metadata.key_value_metadata, // TODO
schema_descr,
column_orders,
);
Expand Down
23 changes: 13 additions & 10 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ParquetMetaData {
}
}

pub type KeyValue = crate::format::KeyValue;
pub type KeyValue = crate::format::KeyValue<'static>;

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
Expand Down Expand Up @@ -376,7 +376,7 @@ impl RowGroupMetaData {
}

/// Method to convert to Thrift.
pub fn to_thrift(&self) -> RowGroup {
pub fn to_thrift(&self) -> RowGroup<'static> {
RowGroup {
columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
total_byte_size: self.total_byte_size,
Expand Down Expand Up @@ -647,7 +647,7 @@ impl ColumnChunkMetaData {
.map(Encoding::try_from)
.collect::<Result<_>>()?;
let compression = Compression::try_from(col_metadata.codec)?;
let file_path = cc.file_path;
let file_path = cc.file_path.map(|x| x.to_string());
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
let total_compressed_size = col_metadata.total_compressed_size;
Expand Down Expand Up @@ -697,11 +697,11 @@ impl ColumnChunkMetaData {
}

/// Method to convert to Thrift.
pub fn to_thrift(&self) -> ColumnChunk {
pub fn to_thrift(&self) -> ColumnChunk<'static> {
let column_metadata = self.to_column_metadata_thrift();

ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_path: self.file_path().map(|s| s.to_string().into()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
Expand All @@ -714,11 +714,14 @@ impl ColumnChunkMetaData {
}

/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData<'static> {
let path_parts = self.column_path().parts();
let path_in_schema = path_parts.iter().map(|x| x.clone().into()).collect();

ColumnMetaData {
path_in_schema,
type_: self.column_type().into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: self.column_path().as_ref().to_vec(),
codec: self.compression.into(),
num_values: self.num_values,
total_uncompressed_size: self.total_uncompressed_size,
Expand Down Expand Up @@ -941,11 +944,11 @@ impl ColumnIndexBuilder {
}

/// Build and get the thrift metadata of column index
pub fn build_to_thrift(self) -> ColumnIndex {
pub fn build_to_thrift(self) -> ColumnIndex<'static> {
ColumnIndex::new(
self.null_pages,
self.min_values,
self.max_values,
self.min_values.into_iter().map(|x| x.into()).collect(),
self.max_values.into_iter().map(|x| x.into()).collect(),
self.boundary_order,
self.null_counts,
)
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ impl<T: ParquetValueType> NativeIndex<T> {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice();
let max = max.as_slice();
let min = min.as_ref();
let max = max.as_ref();
(Some(from_le_slice::<T>(min)), Some(from_le_slice::<T>(max)))
};
Ok(PageIndex {
Expand Down
Loading

0 comments on commit 5ddbd83

Please sign in to comment.