Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Verify 32-bit CRC checksum when decoding pages #6290

Merged
merged 16 commits into from
Sep 28, 2024
4 changes: 4 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"]
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]

[[example]]
name = "read_parquet"
Expand Down Expand Up @@ -215,3 +218,4 @@ harness = false

[lib]
bench = false

xmakro marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your
- `zstd` (default) - support for parquet using `zstd` compression
- `snap` (default) - support for parquet using `snappy` compression
- `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin)
- `crc` - verifies checksums when reading data pages
xmakro marked this conversation as resolved.
Show resolved Hide resolved
- `experimental` - Experimental APIs which may change, even between minor releases

## Parquet Feature Status
Expand All @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your

## License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Licensed under the Apache License, Version 2.0: <http://www.apache.org/licenses/LICENSE-2.0>.
xmakro marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 11 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,17 @@ pub(crate) fn decode_page(
physical_type: Type,
decompressor: Option<&mut Box<dyn Codec>>,
) -> Result<Page> {
// Verify the 32-bit CRC checksum of the page
#[cfg(feature = "crc")]
if let Some(expected_crc) = page_header.crc {
let crc = crc32fast::hash(&buffer);
if crc != expected_crc as u32 {
return Err(ParquetError::General(
"Page CRC checksum mismatch".to_string(),
));
xmakro marked this conversation as resolved.
Show resolved Hide resolved
}
}

// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
Expand Down
55 changes: 55 additions & 0 deletions parquet/tests/arrow_reader/checksum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::path::PathBuf;
xmakro marked this conversation as resolved.
Show resolved Hide resolved

use arrow::util::test_util::parquet_test_data;
use parquet::arrow::arrow_reader::ArrowReaderBuilder;

#[test]
fn test_datapage_v1_corrupt_checksum() {
let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet");
assert_eq!(errors, [
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string())
]);
}

#[test]
fn test_datapage_v1_uncompressed_checksum() {
let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet");
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
}

#[test]
fn test_datapage_v1_snappy_compressed_checksum() {
let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet");
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
}


#[test]
fn test_plain_dict_uncompressed_checksum() {
let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet");
assert_eq!(errors, [Ok(())]);
}
#[test]
fn test_rle_dict_snappy_checksum() {
let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet");
assert_eq!(errors, [Ok(())]);
}

/// Reads a file and returns a vector with one element per record batch.
/// The record batch data is replaced with () and errors are stringified.
fn read_file_batch_errors(name: &str) -> Vec<Result<(), String>> {
let path = PathBuf::from(parquet_test_data()).join(name);
println!("Reading file: {:?}", path);
let file = std::fs::File::open(&path).unwrap();
let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap();
reader
.map(|x| match x {
Ok(_) => Ok(()),
Err(e) => Err(e.to_string()),
})
.collect()
}
2 changes: 2 additions & 0 deletions parquet/tests/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use std::sync::Arc;
use tempfile::NamedTempFile;

mod bad_data;
#[cfg(feature = "crc")]
mod checksum;
mod statistics;

// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values
Expand Down
Loading