diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1ee3a464f8c..c1fb02ef161 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -19,7 +19,7 @@ use std::collections::VecDeque; use std::sync::Arc; - +use num::ToPrimitive; use arrow_array::cast::AsArray; use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; @@ -42,7 +42,7 @@ mod filter; mod selection; pub mod statistics; -use crate::encryption::ciphers::FileDecryptionProperties; +use crate::encryption::ciphers::{CryptoContext, FileDecryptionProperties}; /// Builder for constructing parquet readers into arrow. /// @@ -695,7 +695,18 @@ impl Iterator for ReaderPageIterator { let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); - let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations); + let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); + // let aad_file_unique = file_decryptor?.aad_file_unique(); + // let aad_prefix = file_decryptor?.aad_prefix(); + // + // let file_decryptor = FileDecryptor::new(file_decryptor, aad_file_unique.clone(), aad_prefix.clone()); + + let crypto_context = CryptoContext::new( + meta.dictionary_page_offset().is_some(), rg_idx.to_i16()?, self.column_idx.to_i16()?, file_decryptor.clone(), file_decryptor); + let crypto_context = Arc::new(crypto_context); + + let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context)); + // let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations); Some(ret.map(|x| Box::new(x) as _)) } } @@ -1728,6 +1739,11 @@ mod tests { }); // todo: decrypting data + let decryption_properties = Some( + ciphers::FileDecryptionProperties::builder() + .with_footer_key(key_code.to_vec()) + .build(), + ); let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref()) .unwrap(); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c10443418ae..550b53bad6b 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -842,6 +842,7 @@ impl RowGroups for InMemoryRowGroup<'_> { self.metadata.column(i), self.row_count, page_locations, + None, )?); Ok(Box::new(ColumnChunkIterator { diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index b4b7f47df5b..89515fe0e00 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -18,6 +18,7 @@ //! Encryption implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). +use std::sync::Arc; use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; use ring::rand::{SecureRandom, SystemRandom}; use crate::errors::{ParquetError, Result}; @@ -172,8 +173,12 @@ pub fn create_footer_aad(file_aad: &[u8]) -> Result> { create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1) } -fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32, - column_ordinal: i32, page_ordinal: i32) -> Result> { +pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: i32) -> Result> { + create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal) +} + +fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, + column_ordinal: i16, page_ordinal: i32) -> Result> { let module_buf = [module_type as u8]; @@ -187,7 +192,7 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal if row_group_ordinal < 0 { return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal)); } - if row_group_ordinal > u16::MAX as i32 { + if row_group_ordinal > i16::MAX { return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}", u16::MAX, row_group_ordinal)); } @@ -195,7 +200,7 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal if column_ordinal < 0 { return Err(general_err!("Wrong column ordinal: {}", column_ordinal)); } - if column_ordinal > u16::MAX as i32 { + if column_ordinal > i16::MAX { return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}", u16::MAX, column_ordinal)); } @@ -205,15 +210,15 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal let mut aad = Vec::with_capacity(file_aad.len() + 5); aad.extend_from_slice(file_aad); aad.extend_from_slice(module_buf.as_ref()); - aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); - aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref()); return Ok(aad) } if page_ordinal < 0 { - return Err(general_err!("Wrong column ordinal: {}", page_ordinal)); + return Err(general_err!("Wrong page ordinal: {}", page_ordinal)); } - if page_ordinal > u16::MAX as i32 { + if page_ordinal > i32::MAX { return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}", u16::MAX, page_ordinal)); } @@ -221,9 +226,9 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal let mut aad = Vec::with_capacity(file_aad.len() + 7); aad.extend_from_slice(file_aad); aad.extend_from_slice(module_buf.as_ref()); - aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); - aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); - aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref()); + aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref()); + aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref()); Ok(aad) } @@ -266,7 +271,9 @@ impl DecryptionPropertiesBuilder { pub struct FileDecryptor { decryption_properties: FileDecryptionProperties, // todo decr: change to BlockDecryptor - footer_decryptor: RingGcmBlockDecryptor + footer_decryptor: RingGcmBlockDecryptor, + aad_file_unique: Vec, + aad_prefix: Vec, } impl PartialEq for FileDecryptor { @@ -276,11 +283,13 @@ impl PartialEq for FileDecryptor { } impl FileDecryptor { - pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self { + pub(crate) fn new(decryption_properties: &FileDecryptionProperties, aad_file_unique: Vec, aad_prefix: Vec) -> Self { Self { // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()), - decryption_properties: decryption_properties.clone() + decryption_properties: decryption_properties.clone(), + aad_file_unique, + aad_prefix, } } @@ -288,18 +297,49 @@ impl FileDecryptor { pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor { self.footer_decryptor } + + pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties { + &self.decryption_properties + } + + pub(crate) fn footer_decryptor(&self) -> RingGcmBlockDecryptor { + self.footer_decryptor.clone() + } + + pub(crate) fn aad_file_unique(&self) -> &Vec { + &self.aad_file_unique + } + + pub(crate) fn aad_prefix(&self) -> &Vec { + &self.aad_prefix + } } +#[derive(Debug, Clone)] pub struct CryptoContext { - row_group_ordinal: i32, - column_ordinal: i32, - metadata_decryptor: FileDecryptor, - data_decryptor: FileDecryptor, - file_decryption_properties: FileDecryptionProperties, - aad: Vec, + pub(crate) start_decrypt_with_dictionary_page: bool, + pub(crate) row_group_ordinal: i16, + pub(crate) column_ordinal: i16, + pub(crate) data_decryptor: Arc, + pub(crate) metadata_decryptor: Arc, + } impl CryptoContext { - pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor } - pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties } + pub fn new(start_decrypt_with_dictionary_page: bool, row_group_ordinal: i16, + column_ordinal: i16, data_decryptor: Arc, + metadata_decryptor: Arc) -> Self { + Self { + start_decrypt_with_dictionary_page, + row_group_ordinal, + column_ordinal, + data_decryptor, + metadata_decryptor, + } + } + pub fn start_decrypt_with_dictionary_page(&self) -> &bool { &self.start_decrypt_with_dictionary_page } + pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal } + pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal } + pub fn data_decryptor(&self) -> Arc { self.data_decryptor.clone()} + pub fn metadata_decryptor(&self) -> Arc { self.metadata_decryptor.clone() } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index b762548eb20..95b18be9e7c 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -218,6 +218,13 @@ impl ParquetMetaData { &self.file_metadata } + /// Returns file decryptor as reference. + pub fn file_decryptor(&self) -> &Option { + &self.file_decryptor + } + + + /// Returns number of row groups in this file. pub fn num_row_groups(&self) -> usize { self.row_groups.len() diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index a8686e695ad..61c0e0e4806 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -659,13 +659,19 @@ impl ParquetMetaDataReader { // todo decr: get key_metadata // remaining buffer contains encrypted FileMetaData - file_decryptor = Some(FileDecryptor::new(file_decryption_properties)); - let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor(); + // todo decr: get aad_prefix // todo decr: set both aad_prefix and aad_file_unique in file_decryptor - let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); + let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap(); + let aad_footer = create_footer_aad(aad_file_unique.as_ref())?; + let aad_prefix : Vec = aes_gcm_algo.aad_prefix.unwrap_or_default(); + + file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad_file_unique.clone(), aad_prefix.clone())); + let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor(); + // file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad, aad_prefix)); + decrypted_fmd_buf = - decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref()); + decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()); prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); } @@ -694,7 +700,7 @@ impl ParquetMetaDataReader { schema_descr, column_orders, ); - Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor)) + Ok(ParquetMetaData::new(file_metadata, row_groups, Some(file_decryptor.unwrap()))) } /// Parses column orders from Thrift definition. diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 57411264773..e87e9a80232 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -38,9 +38,11 @@ use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; -use bytes::{Buf, Bytes}; -use thrift::protocol::TCompactInputProtocol; -use crate::encryption::ciphers::{create_footer_aad, BlockDecryptor, CryptoContext, FileDecryptionProperties, FileDecryptor, RingGcmBlockDecryptor}; +use bytes::Bytes; +use thrift::protocol::{TCompactInputProtocol, TInputProtocol}; +use zstd::zstd_safe::WriteBuf; +use crate::data_type::AsBytes; +use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, FileDecryptionProperties, ModuleType}; impl TryFrom for SerializedFileReader { type Error = ParquetError; @@ -339,37 +341,40 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R } /// Reads a [`PageHeader`] from the provided [`Read`] -pub(crate) fn read_page_header(input: &mut T, crypto_context: Option<&CryptoContext>) -> Result { - let buf = &mut []; - let size = input.read(buf)?; - - // todo: decrypt buffer - let mut prot = TCompactSliceInputProtocol::new(buf.as_slice()); - let t_file_crypto_metadata: TFileCryptoMetaData = - TFileCryptoMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; - - let file_decryption_properties = crypto_context.unwrap().file_decryption_properties(); - let file_decryptor = FileDecryptor::new(file_decryption_properties); - - // let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); - let algo = t_file_crypto_metadata.encryption_algorithm; - let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { - a - } else { - unreachable!() - }; // todo decr: add support for GCMCTRV1 - let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); - let buf2 = file_decryptor.get_footer_decryptor().decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref()); - - let mut prot = TCompactInputProtocol::new(buf2.reader()); +pub(crate) fn read_page_header(input: &mut T, crypto_context: Option>) -> Result { + + if let Some(crypto_context) = crypto_context { + let mut buf = [0; 16 * 1024]; + let size = input.read(&mut buf)?; + + let decryptor = &crypto_context.data_decryptor(); + let file_decryptor = decryptor.footer_decryptor(); + let aad_file_unique = decryptor.aad_file_unique(); + // let aad_prefix = decryptor.aad_prefix(); + + let aad = create_page_aad( + aad_file_unique.as_slice(), + ModuleType::DictionaryPageHeader, + crypto_context.row_group_ordinal, + crypto_context.column_ordinal, + 0, + )?; + + // todo: This currently fails, possibly due to wrongly generated AAD + let buf = file_decryptor.decrypt(buf[4..].as_slice(), aad.as_ref()); + todo!("Decrypted page header!"); + let mut prot = TCompactSliceInputProtocol::new(buf.as_slice()); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + return Ok(page_header) + } + let mut prot = TCompactInputProtocol::new(input); let page_header = PageHeader::read_from_in_protocol(&mut prot)?; Ok(page_header) } /// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read -fn read_page_header_len(input: &mut T, crypto_context: Option<&CryptoContext>) -> Result<(usize, PageHeader)> { +fn read_page_header_len(input: &mut T, crypto_context: Option>) -> Result<(usize, PageHeader)> { /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read struct TrackedRead { inner: R, @@ -538,7 +543,7 @@ pub struct SerializedPageReader { state: SerializedPageReaderState, /// Crypto context - crypto_context: Option<&'static CryptoContext>, + crypto_context: Option>, } impl SerializedPageReader { @@ -548,9 +553,10 @@ impl SerializedPageReader { meta: &ColumnChunkMetaData, total_rows: usize, page_locations: Option>, + crypto_context: Option>, ) -> Result { let props = Arc::new(ReaderProperties::builder().build()); - SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, None) + SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, crypto_context) } /// Creates a new serialized page with custom options. @@ -560,7 +566,7 @@ impl SerializedPageReader { total_rows: usize, page_locations: Option>, props: ReaderPropertiesPtr, - crypto_context: Option<&'static CryptoContext>, + crypto_context: Option>, ) -> Result { let decompressor = create_codec(meta.compression(), props.codec_options())?; let (start, len) = meta.byte_range(); @@ -648,7 +654,7 @@ impl PageReader for SerializedPageReader { let header = if let Some(header) = next_page_header.take() { *header } else { - let (header_len, header) = read_page_header_len(&mut read, self.crypto_context)?; + let (header_len, header) = read_page_header_len(&mut read, self.crypto_context.clone())?; *offset += header_len; *remaining -= header_len; header