Skip to content

Commit

Permalink
start non-uniform decryption
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 19, 2024
1 parent 42695b4 commit 2334eb3
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 7 deletions.
81 changes: 81 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,87 @@ mod tests {
assert!(col.value(2).is_nan());
}

#[test]
fn test_non_uniform_encryption_plaintext_footer() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
let file = File::open(path).unwrap();

let column_1_key = "1234567890123450".as_bytes();
let column_2_key = "1234567890123451".as_bytes();

let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_column_key("kc1".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("kc2".as_bytes().to_vec(), column_2_key.to_vec())
.build(),
);

let metadata =
ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref())
.unwrap();
let file_metadata = metadata.metadata.file_metadata();

assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 14.0.0-SNAPSHOT"
);

metadata.metadata.row_groups().iter().for_each(|rg| {
assert_eq!(rg.num_columns(), 8);
assert_eq!(rg.num_rows(), 50);
assert_eq!(rg.total_byte_size(), 3816);
});

let record_reader = ParquetRecordBatchReader::try_new_with_decryption(
file,
128,
decryption_properties.as_ref(),
)
.unwrap();

let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();
}

assert_eq!(row_count, file_metadata.num_rows() as usize);
}

#[test]
fn test_non_uniform_encryption() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
let file = File::open(path).unwrap();

let footer_key = "0123456789012345".as_bytes(); // 128bit/16
let column_1_key = "1234567890123450".as_bytes();
let column_2_key = "1234567890123451".as_bytes();

let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_footer_key(footer_key.to_vec())
.with_column_key("kc1".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("kc2".as_bytes().to_vec(), column_2_key.to_vec())
.build(),
);

let metadata =
ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref())
.unwrap();
// let file_metadata = metadata.metadata.file_metadata();
//
// assert_eq!(file_metadata.num_rows(), 50);
// assert_eq!(file_metadata.schema_descr().num_columns(), 8);
// assert_eq!(
// file_metadata.created_by().unwrap(),
// "parquet-cpp-arrow version 19.0.0-SNAPSHOT"
// );
}

#[test]
fn test_uniform_encryption() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down
27 changes: 23 additions & 4 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Encryption implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
use std::collections::HashMap;
use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
use ring::rand::{SecureRandom, SystemRandom};
Expand Down Expand Up @@ -227,29 +228,34 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal

#[derive(Debug, Clone, PartialEq)]
pub struct FileDecryptionProperties {
footer_key: Option<Vec<u8>>
footer_key: Option<Vec<u8>>,
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
}

impl FileDecryptionProperties {
pub fn builder() -> DecryptionPropertiesBuilder {
DecryptionPropertiesBuilder::with_defaults()
}
pub fn has_footer_key(&self) -> bool { self.footer_key.is_some() }
}

pub struct DecryptionPropertiesBuilder {
footer_key: Option<Vec<u8>>
footer_key: Option<Vec<u8>>,
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
}

impl DecryptionPropertiesBuilder {
pub fn with_defaults() -> Self {
Self {
footer_key: None
footer_key: None,
column_keys: None,
}
}

pub fn build(self) -> FileDecryptionProperties {
FileDecryptionProperties {
footer_key: self.footer_key
footer_key: self.footer_key,
column_keys: self.column_keys,
}
}

Expand All @@ -258,6 +264,14 @@ impl DecryptionPropertiesBuilder {
self.footer_key = Some(value);
self
}

pub fn with_column_key(mut self, key: Vec<u8>, value: Vec<u8>) -> Self {
let mut column_keys= self.column_keys.unwrap_or_else(HashMap::new);
column_keys.insert(key, value);
// let _ = column_keys.insert(key, value);
self.column_keys = Some(column_keys);
self
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -291,6 +305,11 @@ impl FileDecryptor {
self.footer_decryptor
}

pub(crate) fn get_column_decryptor(&self, column_key: &[u8]) -> RingGcmBlockDecryptor {
let column_key = self.decryption_properties.column_keys.as_ref().unwrap().get(column_key).unwrap();
RingGcmBlockDecryptor::new(column_key)
}

pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties {
&self.decryption_properties
}
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,12 @@ impl ParquetMetaDataReader {
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<ParquetMetaData> {
let mut prot = TCompactSliceInputProtocol::new(buf);

let mut file_decryptor = None;
let decrypted_fmd_buf;
if let Some(file_decryption_properties) = file_decryption_properties {

if file_decryption_properties.is_some()
&& file_decryption_properties.unwrap().has_footer_key()
{
let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
Expand All @@ -667,7 +669,7 @@ impl ParquetMetaDataReader {
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

file_decryptor = Some(FileDecryptor::new(
file_decryption_properties,
file_decryption_properties.unwrap(),
aad_file_unique.clone(),
aad_prefix.clone(),
));
Expand All @@ -684,6 +686,7 @@ impl ParquetMetaDataReader {
let mut row_groups = Vec::new();
// TODO: row group filtering
for rg in t_file_metadata.row_groups {
// rg.
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ pub(crate) fn read_page_header<T: Read>(
) -> Result<PageHeader> {
if let Some(crypto_context) = crypto_context {
let decryptor = &crypto_context.data_decryptor();
// todo: get column decryptor
// let file_decryptor = decryptor.get_column_decryptor(crypto_context.column_ordinal);
let file_decryptor = decryptor.footer_decryptor();
let aad_file_unique = decryptor.aad_file_unique();

Expand Down

0 comments on commit 2334eb3

Please sign in to comment.