Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 16, 2024
1 parent 9d17990 commit 29d55eb
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 79 deletions.
22 changes: 19 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::collections::VecDeque;
use std::sync::Arc;

use num::ToPrimitive;
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
Expand All @@ -42,7 +42,7 @@ mod filter;
mod selection;
pub mod statistics;

use crate::encryption::ciphers::FileDecryptionProperties;
use crate::encryption::ciphers::{CryptoContext, FileDecryptionProperties};

/// Builder for constructing parquet readers into arrow.
///
Expand Down Expand Up @@ -695,7 +695,18 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());
// let aad_file_unique = file_decryptor?.aad_file_unique();
// let aad_prefix = file_decryptor?.aad_prefix();
//
// let file_decryptor = FileDecryptor::new(file_decryptor, aad_file_unique.clone(), aad_prefix.clone());

let crypto_context = CryptoContext::new(
meta.dictionary_page_offset().is_some(), rg_idx.to_i16()?, self.column_idx.to_i16()?, file_decryptor.clone(), file_decryptor);
let crypto_context = Arc::new(crypto_context);

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context));
// let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
Some(ret.map(|x| Box::new(x) as _))
}
}
Expand Down Expand Up @@ -1728,6 +1739,11 @@ mod tests {
});

// todo: decrypting data
let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_footer_key(key_code.to_vec())
.build(),
);
let record_reader =
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref())
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
self.metadata.column(i),
self.row_count,
page_locations,
None,
)?);

Ok(Box::new(ColumnChunkIterator {
Expand Down
84 changes: 62 additions & 22 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Encryption implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
use ring::rand::{SecureRandom, SystemRandom};
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -172,8 +173,12 @@ pub fn create_footer_aad(file_aad: &[u8]) -> Result<Vec<u8>> {
create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1)
}

fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32,
column_ordinal: i32, page_ordinal: i32) -> Result<Vec<u8>> {
pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {
create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal)
}

fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16,
column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {

let module_buf = [module_type as u8];

Expand All @@ -187,15 +192,15 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
if row_group_ordinal < 0 {
return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal));
}
if row_group_ordinal > u16::MAX as i32 {
if row_group_ordinal > i16::MAX {
return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}",
u16::MAX, row_group_ordinal));
}

if column_ordinal < 0 {
return Err(general_err!("Wrong column ordinal: {}", column_ordinal));
}
if column_ordinal > u16::MAX as i32 {
if column_ordinal > i16::MAX {
return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}",
u16::MAX, column_ordinal));
}
Expand All @@ -205,25 +210,25 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
let mut aad = Vec::with_capacity(file_aad.len() + 5);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
return Ok(aad)
}

if page_ordinal < 0 {
return Err(general_err!("Wrong column ordinal: {}", page_ordinal));
return Err(general_err!("Wrong page ordinal: {}", page_ordinal));
}
if page_ordinal > u16::MAX as i32 {
if page_ordinal > i32::MAX {
return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}",
u16::MAX, page_ordinal));
}

let mut aad = Vec::with_capacity(file_aad.len() + 7);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref());
Ok(aad)
}

Expand Down Expand Up @@ -266,7 +271,9 @@ impl DecryptionPropertiesBuilder {
pub struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
// todo decr: change to BlockDecryptor
footer_decryptor: RingGcmBlockDecryptor
footer_decryptor: RingGcmBlockDecryptor,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
}

impl PartialEq for FileDecryptor {
Expand All @@ -276,30 +283,63 @@ impl PartialEq for FileDecryptor {
}

impl FileDecryptor {
pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self {
pub(crate) fn new(decryption_properties: &FileDecryptionProperties, aad_file_unique: Vec<u8>, aad_prefix: Vec<u8>) -> Self {
Self {
// todo decr: if no key available yet (not set in properties, will be retrieved from metadata)
footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()),
decryption_properties: decryption_properties.clone()
decryption_properties: decryption_properties.clone(),
aad_file_unique,
aad_prefix,
}
}

// todo decr: change to BlockDecryptor
pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor {
self.footer_decryptor
}

pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties {
&self.decryption_properties
}

pub(crate) fn footer_decryptor(&self) -> RingGcmBlockDecryptor {
self.footer_decryptor.clone()
}

pub(crate) fn aad_file_unique(&self) -> &Vec<u8> {
&self.aad_file_unique
}

pub(crate) fn aad_prefix(&self) -> &Vec<u8> {
&self.aad_prefix
}
}

#[derive(Debug, Clone)]
pub struct CryptoContext {
row_group_ordinal: i32,
column_ordinal: i32,
metadata_decryptor: FileDecryptor,
data_decryptor: FileDecryptor,
file_decryption_properties: FileDecryptionProperties,
aad: Vec<u8>,
pub(crate) start_decrypt_with_dictionary_page: bool,
pub(crate) row_group_ordinal: i16,
pub(crate) column_ordinal: i16,
pub(crate) data_decryptor: Arc<FileDecryptor>,
pub(crate) metadata_decryptor: Arc<FileDecryptor>,

}

impl CryptoContext {
pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor }
pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties }
pub fn new(start_decrypt_with_dictionary_page: bool, row_group_ordinal: i16,
column_ordinal: i16, data_decryptor: Arc<FileDecryptor>,
metadata_decryptor: Arc<FileDecryptor>) -> Self {
Self {
start_decrypt_with_dictionary_page,
row_group_ordinal,
column_ordinal,
data_decryptor,
metadata_decryptor,
}
}
pub fn start_decrypt_with_dictionary_page(&self) -> &bool { &self.start_decrypt_with_dictionary_page }
pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal }
pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal }
pub fn data_decryptor(&self) -> Arc<FileDecryptor> { self.data_decryptor.clone()}
pub fn metadata_decryptor(&self) -> Arc<FileDecryptor> { self.metadata_decryptor.clone() }
}
7 changes: 7 additions & 0 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ impl ParquetMetaData {
&self.file_metadata
}

/// Returns file decryptor as reference.
pub fn file_decryptor(&self) -> &Option<FileDecryptor> {
&self.file_decryptor
}



/// Returns number of row groups in this file.
pub fn num_row_groups(&self) -> usize {
self.row_groups.len()
Expand Down
16 changes: 11 additions & 5 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,19 @@ impl ParquetMetaDataReader {
// todo decr: get key_metadata

// remaining buffer contains encrypted FileMetaData
file_decryptor = Some(FileDecryptor::new(file_decryption_properties));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();

// todo decr: get aad_prefix
// todo decr: set both aad_prefix and aad_file_unique in file_decryptor
let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_footer = create_footer_aad(aad_file_unique.as_ref())?;
let aad_prefix : Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad_file_unique.clone(), aad_prefix.clone()));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
// file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad, aad_prefix));

decrypted_fmd_buf =
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref());
decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref());
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
}

Expand Down Expand Up @@ -694,7 +700,7 @@ impl ParquetMetaDataReader {
schema_descr,
column_orders,
);
Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor))
Ok(ParquetMetaData::new(file_metadata, row_groups, Some(file_decryptor.unwrap())))
}

/// Parses column orders from Thrift definition.
Expand Down
Loading

0 comments on commit 29d55eb

Please sign in to comment.