Skip to content

Commit

Permalink
Err on try_from_le_slice (#6295)
Browse files Browse the repository at this point in the history
* Err on try_from_le_slice, fix #3577

* format and changes

* small cleanup

* fix clippy

* add bad metadata test

* run test only if feature is enabled

* add MRE test

* fmt
  • Loading branch information
samuelcolvin authored Aug 26, 2024
1 parent 855666d commit ee2f75a
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 32 deletions.
9 changes: 4 additions & 5 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::{cmp, mem::size_of};
use bytes::Bytes;

use crate::errors::{ParquetError, Result};
use crate::util::bit_util::from_le_slice;
use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes};

/// Rle/Bit-Packing Hybrid Encoding
Expand Down Expand Up @@ -356,13 +355,13 @@ impl RleDecoder {
}

let value = if self.rle_left > 0 {
let rle_value = from_le_slice(
let rle_value = T::try_from_le_slice(
&self
.current_value
.as_mut()
.expect("current_value should be Some")
.to_ne_bytes(),
);
)?;
self.rle_left -= 1;
rle_value
} else {
Expand All @@ -388,9 +387,9 @@ impl RleDecoder {
let num_values =
cmp::min(buffer.len() - values_read, self.rle_left as usize);
for i in 0..num_values {
let repeated_value = from_le_slice(
let repeated_value = T::try_from_le_slice(
&self.current_value.as_mut().unwrap().to_ne_bytes(),
);
)?;
buffer[values_read + i] = repeated_value;
}
self.rle_left -= num_values as u32;
Expand Down
35 changes: 30 additions & 5 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96};
use crate::errors::ParquetError;
use crate::file::metadata::LevelHistogram;
use crate::format::{BoundaryOrder, ColumnIndex};
use crate::util::bit_util::from_le_slice;
use std::fmt::Debug;

/// Typed statistics for one data page
Expand Down Expand Up @@ -192,7 +191,7 @@ impl<T: ParquetValueType> NativeIndex<T> {
let indexes = index
.min_values
.iter()
.zip(index.max_values.into_iter())
.zip(index.max_values.iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.zip(rep_hists.into_iter())
Expand All @@ -205,9 +204,10 @@ impl<T: ParquetValueType> NativeIndex<T> {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice();
let max = max.as_slice();
(Some(from_le_slice::<T>(min)), Some(from_le_slice::<T>(max)))
(
Some(T::try_from_le_slice(min)?),
Some(T::try_from_le_slice(max)?),
)
};
Ok(PageIndex {
min,
Expand Down Expand Up @@ -321,4 +321,29 @@ mod tests {
assert_eq!(page_index.repetition_level_histogram(), None);
assert_eq!(page_index.definition_level_histogram(), None);
}

#[test]
fn test_invalid_column_index() {
let column_index = ColumnIndex {
null_pages: vec![true, false],
min_values: vec![
vec![],
vec![], // this shouldn't be empty as null_pages[1] is false
],
max_values: vec![
vec![],
vec![], // this shouldn't be empty as null_pages[1] is false
],
null_counts: None,
repetition_level_histograms: None,
definition_level_histograms: None,
boundary_order: BoundaryOrder::UNORDERED,
};

let err = NativeIndex::<i32>::try_new(column_index).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: error converting value, expected 4 bytes got 0"
);
}
}
5 changes: 2 additions & 3 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,6 @@ mod tests {
use crate::file::writer::SerializedFileWriter;
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
use crate::util::bit_util::from_le_slice;
use crate::util::test_common::file_util::{get_test_file, get_test_path};

use super::*;
Expand Down Expand Up @@ -1537,8 +1536,8 @@ mod tests {
assert_eq!(row_group_index.indexes.len(), page_size);
assert_eq!(row_group_index.boundary_order, boundary_order);
row_group_index.indexes.iter().all(|x| {
x.min.as_ref().unwrap() >= &from_le_slice::<T>(min_max.0)
&& x.max.as_ref().unwrap() <= &from_le_slice::<T>(min_max.1)
x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
&& x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
});
}

Expand Down
18 changes: 11 additions & 7 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
use crate::util::bit_util::from_le_slice;
use crate::util::bit_util::FromBytes;

pub(crate) mod private {
use super::*;
Expand Down Expand Up @@ -186,14 +186,18 @@ pub fn from_thrift(
// INT96 statistics may not be correct, because comparison is signed
// byte-wise, not actual timestamps. It is recommended to ignore
// min/max statistics for INT96 columns.
let min = min.map(|data| {
let min = if let Some(data) = min {
assert_eq!(data.len(), 12);
from_le_slice::<Int96>(&data)
});
let max = max.map(|data| {
Some(Int96::try_from_le_slice(&data)?)
} else {
None
};
let max = if let Some(data) = max {
assert_eq!(data.len(), 12);
from_le_slice::<Int96>(&data)
});
Some(Int96::try_from_le_slice(&data)?)
} else {
None
};
Statistics::int96(min, max, distinct_count, null_count, old_format)
}
Type::FLOAT => Statistics::float(
Expand Down
25 changes: 13 additions & 12 deletions parquet/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96};
use crate::errors::{ParquetError, Result};
use crate::util::bit_pack::{unpack16, unpack32, unpack64, unpack8};

#[inline]
pub fn from_le_slice<T: FromBytes>(bs: &[u8]) -> T {
// TODO: propagate the error (#3577)
T::try_from_le_slice(bs).unwrap()
}

#[inline]
fn array_from_slice<const N: usize>(bs: &[u8]) -> Result<[u8; N]> {
// Need to slice as may be called with zero-padded values
Expand Down Expand Up @@ -91,15 +85,22 @@ unsafe impl FromBytes for Int96 {
type Buffer = [u8; 12];

fn try_from_le_slice(b: &[u8]) -> Result<Self> {
Ok(Self::from_le_bytes(array_from_slice(b)?))
let bs: [u8; 12] = array_from_slice(b)?;
let mut i = Int96::new();
i.set_data(
u32::try_from_le_slice(&bs[0..4])?,
u32::try_from_le_slice(&bs[4..8])?,
u32::try_from_le_slice(&bs[8..12])?,
);
Ok(i)
}

fn from_le_bytes(bs: Self::Buffer) -> Self {
let mut i = Int96::new();
i.set_data(
from_le_slice(&bs[0..4]),
from_le_slice(&bs[4..8]),
from_le_slice(&bs[8..12]),
u32::try_from_le_slice(&bs[0..4]).unwrap(),
u32::try_from_le_slice(&bs[4..8]).unwrap(),
u32::try_from_le_slice(&bs[8..12]).unwrap(),
);
i
}
Expand Down Expand Up @@ -438,7 +439,7 @@ impl BitReader {
}

// TODO: better to avoid copying here
Some(from_le_slice(v.as_bytes()))
T::try_from_le_slice(v.as_bytes()).ok()
}

/// Read multiple values from their packed representation where each element is represented
Expand Down Expand Up @@ -1026,7 +1027,7 @@ mod tests {
.collect();

// Generic values used to check against actual values read from `get_batch`.
let expected_values: Vec<T> = values.iter().map(|v| from_le_slice(v.as_bytes())).collect();
let expected_values: Vec<T> = values.iter().map(|v| T::try_from_le_slice(v.as_bytes()).unwrap()).collect();

(0..total).for_each(|i| writer.put_value(values[i], num_bits));

Expand Down
25 changes: 25 additions & 0 deletions parquet/tests/arrow_reader/bad_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,28 @@ fn read_file(name: &str) -> Result<usize, ParquetError> {
}
Ok(num_rows)
}

#[cfg(feature = "async")]
#[tokio::test]
async fn bad_metadata_err() {
use bytes::Bytes;
use parquet::arrow::async_reader::MetadataLoader;

let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin"));

let metadata_length = metadata_buffer.len();

let mut reader = std::io::Cursor::new(&metadata_buffer);
let mut loader = MetadataLoader::load(&mut reader, metadata_length, None)
.await
.unwrap();
loader.load_page_index(false, false).await.unwrap();
loader.load_page_index(false, true).await.unwrap();

let err = loader.load_page_index(true, false).await.unwrap_err();

assert_eq!(
err.to_string(),
"Parquet error: error converting value, expected 4 bytes got 0"
);
}
Binary file added parquet/tests/arrow_reader/bad_raw_metadata.bin
Binary file not shown.

0 comments on commit ee2f75a

Please sign in to comment.