Skip to content

Commit

Permalink
Use correct page ordinal and module type in AADs
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Dec 18, 2024
1 parent b8fdff1 commit 7f27a97
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 33 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());

let crypto_context = CryptoContext::new(
meta.dictionary_page_offset().is_some(), rg_idx as i16, self.column_idx as i16, file_decryptor.clone(), file_decryptor);
rg_idx as i16, self.column_idx as i16, file_decryptor.clone(), file_decryptor);
let crypto_context = Arc::new(crypto_context);

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context));
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ impl Page {
}
}

pub fn is_data_page(&self) -> bool {
matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
}

pub fn is_dictionary_page(&self) -> bool {
matches!(self, Page::DictionaryPage { .. })
}

/// Returns internal byte buffer reference for this page.
pub fn buffer(&self) -> &Bytes {
match self {
Expand Down
58 changes: 36 additions & 22 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff;
const NONCE_LEN: usize = 12;
const TAG_LEN: usize = 16;
const SIZE_LEN: usize = 4;
const NON_PAGE_ORDINAL: i32 = -1;

struct CounterNonce {
start: u128,
Expand Down Expand Up @@ -171,15 +170,15 @@ pub(crate) enum ModuleType {
}

pub fn create_footer_aad(file_aad: &[u8]) -> Result<Vec<u8>> {
create_module_aad(file_aad, ModuleType::Footer, -1, -1, NON_PAGE_ORDINAL)
create_module_aad(file_aad, ModuleType::Footer, -1, -1, None)
}

pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {
pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: Option<i16>) -> Result<Vec<u8>> {
create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal)
}

pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16,
column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {
column_ordinal: i16, page_ordinal: Option<i16>) -> Result<Vec<u8>> {

let module_buf = [module_type as u8];

Expand Down Expand Up @@ -217,20 +216,19 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord
return Ok(aad)
}

let page_ordinal = page_ordinal.ok_or_else(|| general_err!(
"Page ordinal must be set for data pages"))?;

if page_ordinal < 0 {
return Err(general_err!("Wrong page ordinal: {}", page_ordinal));
}
if page_ordinal > i16::MAX as i32 {
return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}",
i16::MAX, page_ordinal));
}

let mut aad = Vec::with_capacity(file_aad.len() + 7);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice((page_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref());
Ok(aad)
}

Expand Down Expand Up @@ -315,41 +313,57 @@ impl FileDecryptor {
pub(crate) fn aad_prefix(&self) -> &Vec<u8> {
&self.aad_prefix
}

pub fn update_aad(&mut self, aad: Vec<u8>, row_group_ordinal: i16, column_ordinal: i16, module_type: ModuleType) {
// todo decr: update aad
debug_assert!(!self.aad_file_unique().is_empty(), "AAD is empty");

let aad = create_module_aad(self.aad_file_unique(), module_type, row_group_ordinal, column_ordinal, NON_PAGE_ORDINAL).unwrap();
self.aad_file_unique = aad;
}
}

#[derive(Debug, Clone)]
pub struct CryptoContext {
pub(crate) start_decrypt_with_dictionary_page: bool,
pub(crate) row_group_ordinal: i16,
pub(crate) column_ordinal: i16,
pub(crate) page_ordinal: Option<i16>,
pub(crate) dictionary_page: bool,
pub(crate) data_decryptor: Arc<FileDecryptor>,
pub(crate) metadata_decryptor: Arc<FileDecryptor>,

}

impl CryptoContext {
pub fn new(start_decrypt_with_dictionary_page: bool, row_group_ordinal: i16,
pub fn new(row_group_ordinal: i16,
column_ordinal: i16, data_decryptor: Arc<FileDecryptor>,
metadata_decryptor: Arc<FileDecryptor>) -> Self {
Self {
start_decrypt_with_dictionary_page,
row_group_ordinal,
column_ordinal,
page_ordinal: None,
dictionary_page: false,
data_decryptor,
metadata_decryptor,
}
}
pub fn start_decrypt_with_dictionary_page(&self) -> &bool { &self.start_decrypt_with_dictionary_page }

pub fn with_page_ordinal(&self, page_ordinal: i16) -> Self {
Self {
row_group_ordinal: self.row_group_ordinal,
column_ordinal: self.column_ordinal,
page_ordinal: Some(page_ordinal),
dictionary_page: false,
data_decryptor: self.data_decryptor.clone(),
metadata_decryptor: self.metadata_decryptor.clone(),
}
}

pub fn for_dictionary_page(&self) -> Self {
Self {
row_group_ordinal: self.row_group_ordinal,
column_ordinal: self.column_ordinal,
page_ordinal: self.page_ordinal,
dictionary_page: true,
data_decryptor: self.data_decryptor.clone(),
metadata_decryptor: self.metadata_decryptor.clone(),
}
}

pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal }
pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal }
pub fn page_ordinal(&self) -> &Option<i16> { &self.page_ordinal }
pub fn data_decryptor(&self) -> Arc<FileDecryptor> { self.data_decryptor.clone()}
pub fn metadata_decryptor(&self) -> Arc<FileDecryptor> { self.metadata_decryptor.clone() }
}
60 changes: 50 additions & 10 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::Bytes;
use num::ToPrimitive;
use thrift::protocol::{TCompactInputProtocol, TInputProtocol};
use zstd::zstd_safe::WriteBuf;
use crate::data_type::AsBytes;
Expand Down Expand Up @@ -347,13 +348,17 @@ pub(crate) fn read_page_header<T: Read>(input: &mut T, crypto_context: Option<Ar
let file_decryptor = decryptor.footer_decryptor();
let aad_file_unique = decryptor.aad_file_unique();

// todo: page ordinal and page type (ModuleType)
let module_type = if crypto_context.dictionary_page {
ModuleType::DictionaryPageHeader
} else {
ModuleType::DataPageHeader
};
let aad = create_page_aad(
aad_file_unique.as_slice(),
ModuleType::DataPageHeader,
module_type,
crypto_context.row_group_ordinal,
crypto_context.column_ordinal,
0,
crypto_context.page_ordinal,
)?;

let mut len_bytes = [0; 4];
Expand Down Expand Up @@ -435,13 +440,17 @@ pub(crate) fn decode_page(
let decryptor = crypto_context.data_decryptor();
let file_decryptor = decryptor.footer_decryptor();

// todo: page ordinal
let module_type = if crypto_context.dictionary_page {
ModuleType::DictionaryPage
} else {
ModuleType::DataPage
};
let aad = create_page_aad(
decryptor.aad_file_unique().as_slice(),
ModuleType::DataPage,
module_type,
crypto_context.row_group_ordinal,
crypto_context.column_ordinal,
0,
crypto_context.page_ordinal,
)?;
let decrypted = file_decryptor.decrypt(&buffer.as_ref(), &aad)?;
Bytes::from(decrypted)
Expand Down Expand Up @@ -539,6 +548,12 @@ enum SerializedPageReaderState {

// If the next page header has already been "peeked", we will cache it and it`s length here
next_page_header: Option<Box<PageHeader>>,

/// The index of the data page within this column chunk
page_ordinal: usize,

/// Whether the next page is expected to be a dictionary page
require_dictionary: bool,
},
Pages {
/// Remaining page locations
Expand Down Expand Up @@ -613,6 +628,8 @@ impl<R: ChunkReader> SerializedPageReader<R> {
offset: start as usize,
remaining_bytes: len as usize,
next_page_header: None,
page_ordinal: 0,
require_dictionary: meta.dictionary_page_offset().is_some(),
},
};
if crypto_context.is_some() {
Expand Down Expand Up @@ -650,6 +667,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
offset,
remaining_bytes: remaining,
next_page_header,
page_ordinal,
require_dictionary,
} => {
if *remaining == 0 {
return Ok(None);
Expand All @@ -659,7 +678,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
let header = if let Some(header) = next_page_header.take() {
*header
} else {
let (header_len, header) = read_page_header_len(&mut read, self.crypto_context.clone())?;
let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?;
let (header_len, header) = read_page_header_len(&mut read, crypto_context)?;
*offset += header_len;
*remaining -= header_len;
header
Expand All @@ -683,13 +703,20 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
));
}

decode_page(
let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?;
let page = decode_page(
header,
Bytes::from(buffer),
self.physical_type,
self.decompressor.as_mut(),
self.crypto_context.clone(),
)?
crypto_context,
)?;
if page.is_data_page() {
*page_ordinal += 1;
} else if page.is_dictionary_page() {
*require_dictionary = false;
}
page
}
SerializedPageReaderState::Pages {
page_locations,
Expand Down Expand Up @@ -733,6 +760,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
offset,
remaining_bytes,
next_page_header,
..
} => {
loop {
if *remaining_bytes == 0 {
Expand Down Expand Up @@ -797,6 +825,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
offset,
remaining_bytes,
next_page_header,
..
} => {
if let Some(buffered_header) = next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
Expand Down Expand Up @@ -827,6 +856,17 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}
}

fn page_crypto_context(crypto_context: &Option<Arc<CryptoContext>>, page_ordinal: usize, dictionary_page: bool) -> Result<Option<Arc<CryptoContext>>> {
let page_ordinal = page_ordinal
.to_i16()
.ok_or_else(|| general_err!(
"Page ordinal {} is greater than the maximum allowed in encrypted Parquet files ({})",
page_ordinal, i16::MAX))?;

Ok(crypto_context.as_ref().map(
|c| Arc::new(if dictionary_page { c.for_dictionary_page() } else { c.with_page_ordinal(page_ordinal) })))
}

#[cfg(test)]
mod tests {
use bytes::Buf;
Expand Down

0 comments on commit 7f27a97

Please sign in to comment.