diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d2b8b596c646..9ef142cec7d4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -698,7 +698,7 @@ impl Iterator for ReaderPageIterator { let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); let crypto_context = CryptoContext::new( - meta.dictionary_page_offset().is_some(), rg_idx as i16, self.column_idx as i16, file_decryptor.clone(), file_decryptor); + rg_idx as i16, self.column_idx as i16, file_decryptor.clone(), file_decryptor); let crypto_context = Arc::new(crypto_context); let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context)); diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 5c866318e185..931241e4259f 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -90,6 +90,14 @@ impl Page { } } + pub fn is_data_page(&self) -> bool { + matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. }) + } + + pub fn is_dictionary_page(&self) -> bool { + matches!(self, Page::DictionaryPage { .. }) + } + /// Returns internal byte buffer reference for this page. pub fn buffer(&self) -> &Bytes { match self { diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 5379357326b9..f795da547933 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -35,7 +35,6 @@ const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; const NONCE_LEN: usize = 12; const TAG_LEN: usize = 16; const SIZE_LEN: usize = 4; -const NON_PAGE_ORDINAL: i32 = -1; struct CounterNonce { start: u128, @@ -171,15 +170,15 @@ pub(crate) enum ModuleType { } pub fn create_footer_aad(file_aad: &[u8]) -> Result> { - create_module_aad(file_aad, ModuleType::Footer, -1, -1, NON_PAGE_ORDINAL) + create_module_aad(file_aad, ModuleType::Footer, -1, -1, None) } -pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: i32) -> Result> { +pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: Option) -> Result> { create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal) } pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, - column_ordinal: i16, page_ordinal: i32) -> Result> { + column_ordinal: i16, page_ordinal: Option) -> Result> { let module_buf = [module_type as u8]; @@ -217,20 +216,19 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord return Ok(aad) } + let page_ordinal = page_ordinal.ok_or_else(|| general_err!( + "Page ordinal must be set for data pages"))?; + if page_ordinal < 0 { return Err(general_err!("Wrong page ordinal: {}", page_ordinal)); } - if page_ordinal > i16::MAX as i32 { - return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}", - i16::MAX, page_ordinal)); - } let mut aad = Vec::with_capacity(file_aad.len() + 7); aad.extend_from_slice(file_aad); aad.extend_from_slice(module_buf.as_ref()); aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref()); aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref()); - aad.extend_from_slice((page_ordinal as i16).to_le_bytes().as_ref()); + aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref()); Ok(aad) } @@ -315,41 +313,57 @@ impl FileDecryptor { pub(crate) fn aad_prefix(&self) -> &Vec { &self.aad_prefix } - - pub fn update_aad(&mut self, aad: Vec, row_group_ordinal: i16, column_ordinal: i16, module_type: ModuleType) { - // todo decr: update aad - debug_assert!(!self.aad_file_unique().is_empty(), "AAD is empty"); - - let aad = create_module_aad(self.aad_file_unique(), module_type, row_group_ordinal, column_ordinal, NON_PAGE_ORDINAL).unwrap(); - self.aad_file_unique = aad; - } } #[derive(Debug, Clone)] pub struct CryptoContext { - pub(crate) start_decrypt_with_dictionary_page: bool, pub(crate) row_group_ordinal: i16, pub(crate) column_ordinal: i16, + pub(crate) page_ordinal: Option, + pub(crate) dictionary_page: bool, pub(crate) data_decryptor: Arc, pub(crate) metadata_decryptor: Arc, - } impl CryptoContext { - pub fn new(start_decrypt_with_dictionary_page: bool, row_group_ordinal: i16, + pub fn new(row_group_ordinal: i16, column_ordinal: i16, data_decryptor: Arc, metadata_decryptor: Arc) -> Self { Self { - start_decrypt_with_dictionary_page, row_group_ordinal, column_ordinal, + page_ordinal: None, + dictionary_page: false, data_decryptor, metadata_decryptor, } } - pub fn start_decrypt_with_dictionary_page(&self) -> &bool { &self.start_decrypt_with_dictionary_page } + + pub fn with_page_ordinal(&self, page_ordinal: i16) -> Self { + Self { + row_group_ordinal: self.row_group_ordinal, + column_ordinal: self.column_ordinal, + page_ordinal: Some(page_ordinal), + dictionary_page: false, + data_decryptor: self.data_decryptor.clone(), + metadata_decryptor: self.metadata_decryptor.clone(), + } + } + + pub fn for_dictionary_page(&self) -> Self { + Self { + row_group_ordinal: self.row_group_ordinal, + column_ordinal: self.column_ordinal, + page_ordinal: self.page_ordinal, + dictionary_page: true, + data_decryptor: self.data_decryptor.clone(), + metadata_decryptor: self.metadata_decryptor.clone(), + } + } + pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal } pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal } + pub fn page_ordinal(&self) -> &Option { &self.page_ordinal } pub fn data_decryptor(&self) -> Arc { self.data_decryptor.clone()} pub fn metadata_decryptor(&self) -> Arc { self.metadata_decryptor.clone() } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index f2c2907db979..427f05c512e0 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -39,6 +39,7 @@ use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use bytes::Bytes; +use num::ToPrimitive; use thrift::protocol::{TCompactInputProtocol, TInputProtocol}; use zstd::zstd_safe::WriteBuf; use crate::data_type::AsBytes; @@ -347,13 +348,17 @@ pub(crate) fn read_page_header(input: &mut T, crypto_context: Option>, + + /// The index of the data page within this column chunk + page_ordinal: usize, + + /// Whether the next page is expected to be a dictionary page + require_dictionary: bool, }, Pages { /// Remaining page locations @@ -613,6 +628,8 @@ impl SerializedPageReader { offset: start as usize, remaining_bytes: len as usize, next_page_header: None, + page_ordinal: 0, + require_dictionary: meta.dictionary_page_offset().is_some(), }, }; if crypto_context.is_some() { @@ -650,6 +667,8 @@ impl PageReader for SerializedPageReader { offset, remaining_bytes: remaining, next_page_header, + page_ordinal, + require_dictionary, } => { if *remaining == 0 { return Ok(None); @@ -659,7 +678,8 @@ impl PageReader for SerializedPageReader { let header = if let Some(header) = next_page_header.take() { *header } else { - let (header_len, header) = read_page_header_len(&mut read, self.crypto_context.clone())?; + let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?; + let (header_len, header) = read_page_header_len(&mut read, crypto_context)?; *offset += header_len; *remaining -= header_len; header @@ -683,13 +703,20 @@ impl PageReader for SerializedPageReader { )); } - decode_page( + let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?; + let page = decode_page( header, Bytes::from(buffer), self.physical_type, self.decompressor.as_mut(), - self.crypto_context.clone(), - )? + crypto_context, + )?; + if page.is_data_page() { + *page_ordinal += 1; + } else if page.is_dictionary_page() { + *require_dictionary = false; + } + page } SerializedPageReaderState::Pages { page_locations, @@ -733,6 +760,7 @@ impl PageReader for SerializedPageReader { offset, remaining_bytes, next_page_header, + .. } => { loop { if *remaining_bytes == 0 { @@ -797,6 +825,7 @@ impl PageReader for SerializedPageReader { offset, remaining_bytes, next_page_header, + .. } => { if let Some(buffered_header) = next_page_header.take() { // The next page header has already been peeked, so just advance the offset @@ -827,6 +856,17 @@ impl PageReader for SerializedPageReader { } } +fn page_crypto_context(crypto_context: &Option>, page_ordinal: usize, dictionary_page: bool) -> Result>> { + let page_ordinal = page_ordinal + .to_i16() + .ok_or_else(|| general_err!( + "Page ordinal {} is greater than the maximum allowed in encrypted Parquet files ({})", + page_ordinal, i16::MAX))?; + + Ok(crypto_context.as_ref().map( + |c| Arc::new(if dictionary_page { c.for_dictionary_page() } else { c.with_page_ordinal(page_ordinal) }))) +} + #[cfg(test)] mod tests { use bytes::Buf;