From b06ffceaab2b59edc098d86f75b2a5125a8352ee Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 26 Jul 2024 10:37:30 -0700 Subject: [PATCH] Add support for level histograms added in PARQUET-2261 to `ParquetMetaData` (#6105) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041) * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` Signed-off-by: Bugen Zhao * fix example tests Signed-off-by: Bugen Zhao --------- Signed-off-by: Bugen Zhao * Remove `impl> From for Buffer` that easily accidentally copies data (#6043) * deprecate auto copy, ask explicit reference * update comments * make cargo doc happy * Make display of interval types more pretty (#6006) * improve dispaly for interval. * update test in pretty, and fix display problem. * tmp * fix tests in arrow-cast. * fix tests in pretty. * fix style. * Update snafu (#5930) * Update Parquet thrift generated structures (#6045) * update to latest thrift (as of 11 Jul 2024) from parquet-format * pass None for optional size statistics * escape HTML tags * don't need to escape brackets in arrays * Revert "Revert "Write Bloom filters between row groups instead of the end (#…" (#5933) This reverts commit 22e0b4432c9838f2536284015271d3de9a165135. * Revert "Update snafu (#5930)" (#6069) This reverts commit 756b1fb26d1702f36f446faf9bb40a4869c3e840. * Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075) * Update pyo3 requirement from 0.21.1 to 0.22.1 Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit the latest version. - [Release notes](https://github.com/pyo3/pyo3/releases) - [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md) - [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.1) --- updated-dependencies: - dependency-name: pyo3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] * refactor: remove deprecated `FromPyArrow::from_pyarrow` "GIL Refs" are being phased out. * chore: update `pyo3` in integration tests --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * remove repeated codes to make the codes more concise. (#6080) * Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068) * update to latest thrift (as of 11 Jul 2024) from parquet-format * pass None for optional size statistics * escape HTML tags * don't need to escape brackets in arrays * add support for unencoded_byte_array_data_bytes * add comments * change sig of ColumnMetrics::update_variable_length_bytes() * rename ParquetOffsetIndex to OffsetSizeIndex * rename some functions * suggestion from review Co-authored-by: Andrew Lamb * add Default trait to ColumnMetrics as suggested in review * rename OffsetSizeIndex to OffsetIndexMetaData --------- Co-authored-by: Andrew Lamb * deprecate read_page_locations * add level histograms to metadata * add to_thrift() to OffsetIndexMetaData * Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085) Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit the latest version. - [Release notes](https://github.com/pyo3/pyo3/releases) - [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md) - [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.2) --- updated-dependencies: - dependency-name: pyo3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Deprecate read_page_locations() and simplify offset index in `ParquetMetaData` (#6095) * deprecate read_page_locations * add to_thrift() to OffsetIndexMetaData * move valid test into ColumnIndexBuilder::append_histograms * move update_histogram() inside ColumnMetrics * Update parquet/src/column/writer/mod.rs Co-authored-by: Ed Seidl * Implement LevelHistograms as a struct * formatting * fix error in docs --------- Signed-off-by: Bugen Zhao Signed-off-by: dependabot[bot] Co-authored-by: Bugen Zhao Co-authored-by: Xiangpeng Hao Co-authored-by: kamille Co-authored-by: Jesse Co-authored-by: Andrew Lamb Co-authored-by: Marco Neumann Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- parquet/src/column/writer/mod.rs | 137 +++++++++++++++-- parquet/src/file/metadata/memory.rs | 2 + parquet/src/file/metadata/mod.rs | 217 +++++++++++++++++++++++++-- parquet/src/file/page_index/index.rs | 87 +++++++++-- parquet/src/file/writer.rs | 144 +++++++++++++++++- 5 files changed, 550 insertions(+), 37 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 2c0c957d87d3..54d8fd3cc13e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::encodings::levels::LevelEncoder; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; +use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder}; use crate::file::properties::EnabledStatistics; use crate::file::statistics::{Statistics, ValueStatistics}; use crate::file::{ @@ -189,6 +189,54 @@ struct PageMetrics { num_buffered_values: u32, num_buffered_rows: u32, num_page_nulls: u64, + repetition_level_histogram: Option, + definition_level_histogram: Option, +} + +impl PageMetrics { + fn new() -> Self { + Default::default() + } + + /// Initialize the repetition level histogram + fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { + self.repetition_level_histogram = LevelHistogram::try_new(max_level); + self + } + + /// Initialize the definition level histogram + fn with_definition_level_histogram(mut self, max_level: i16) -> Self { + self.definition_level_histogram = LevelHistogram::try_new(max_level); + self + } + + /// Resets the state of this `PageMetrics` to the initial state. + /// If histograms have been initialized their contents will be reset to zero. + fn new_page(&mut self) { + self.num_buffered_values = 0; + self.num_buffered_rows = 0; + self.num_page_nulls = 0; + self.repetition_level_histogram + .as_mut() + .map(LevelHistogram::reset); + self.definition_level_histogram + .as_mut() + .map(LevelHistogram::reset); + } + + /// Updates histogram values using provided repetition levels + fn update_repetition_level_histogram(&mut self, levels: &[i16]) { + if let Some(ref mut rep_hist) = self.repetition_level_histogram { + rep_hist.update_from_levels(levels); + } + } + + /// Updates histogram values using provided definition levels + fn update_definition_level_histogram(&mut self, levels: &[i16]) { + if let Some(ref mut def_hist) = self.definition_level_histogram { + def_hist.update_from_levels(levels); + } + } } // Metrics per column writer @@ -206,6 +254,8 @@ struct ColumnMetrics { num_column_nulls: u64, column_distinct_count: Option, variable_length_bytes: Option, + repetition_level_histogram: Option, + definition_level_histogram: Option, } impl ColumnMetrics { @@ -213,6 +263,41 @@ impl ColumnMetrics { Default::default() } + /// Initialize the repetition level histogram + fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { + self.repetition_level_histogram = LevelHistogram::try_new(max_level); + self + } + + /// Initialize the definition level histogram + fn with_definition_level_histogram(mut self, max_level: i16) -> Self { + self.definition_level_histogram = LevelHistogram::try_new(max_level); + self + } + + /// Sum `page_histogram` into `chunk_histogram` + fn update_histogram( + chunk_histogram: &mut Option, + page_histogram: &Option, + ) { + if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) { + chunk_hist.add(page_hist); + } + } + + /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if + /// page histograms are not initialized. + fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) { + ColumnMetrics::::update_histogram( + &mut self.definition_level_histogram, + &page_metrics.definition_level_histogram, + ); + ColumnMetrics::::update_histogram( + &mut self.repetition_level_histogram, + &page_metrics.repetition_level_histogram, + ); + } + /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes fn update_variable_length_bytes(&mut self, variable_length_bytes: Option) { if let Some(var_bytes) = variable_length_bytes { @@ -275,6 +360,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Used for level information encodings.insert(Encoding::RLE); + let mut page_metrics = PageMetrics::new(); + let mut column_metrics = ColumnMetrics::::new(); + + // Initialize level histograms if collecting page or chunk statistics + if statistics_enabled != EnabledStatistics::None { + page_metrics = page_metrics + .with_repetition_level_histogram(descr.max_rep_level()) + .with_definition_level_histogram(descr.max_def_level()); + column_metrics = column_metrics + .with_repetition_level_histogram(descr.max_rep_level()) + .with_definition_level_histogram(descr.max_def_level()) + } + // Disable column_index_builder if not collecting page statistics. let mut column_index_builder = ColumnIndexBuilder::new(); if statistics_enabled != EnabledStatistics::Page { @@ -292,12 +390,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { def_levels_sink: vec![], rep_levels_sink: vec![], data_pages: VecDeque::new(), - page_metrics: PageMetrics { - num_buffered_values: 0, - num_buffered_rows: 0, - num_page_nulls: 0, - }, - column_metrics: ColumnMetrics::::new(), + page_metrics, + column_metrics, column_index_builder, offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -547,6 +641,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } + // Update histogram + self.page_metrics.update_definition_level_histogram(levels); + self.def_levels_sink.extend_from_slice(levels); values_to_write } else { @@ -575,6 +672,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.page_metrics.num_buffered_rows += (level == 0) as u32 } + // Update histogram + self.page_metrics.update_repetition_level_histogram(levels); + self.rep_levels_sink.extend_from_slice(levels); } else { // Each value is exactly one row. @@ -718,7 +818,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } } - // update the offset index + + // Append page histograms to the `ColumnIndex` histograms + self.column_index_builder.append_histograms( + &self.page_metrics.repetition_level_histogram, + &self.page_metrics.definition_level_histogram, + ); + + // Update the offset index self.offset_index_builder .append_row_count(self.page_metrics.num_buffered_rows as i64); @@ -804,7 +911,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { values_data.variable_length_bytes, ); - // Update variable_length_bytes in column_metrics + // Update histograms and variable_length_bytes in column_metrics + self.column_metrics + .update_from_page_metrics(&self.page_metrics); self.column_metrics .update_variable_length_bytes(values_data.variable_length_bytes); @@ -911,7 +1020,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Reset state. self.rep_levels_sink.clear(); self.def_levels_sink.clear(); - self.page_metrics = PageMetrics::default(); + self.page_metrics.new_page(); Ok(()) } @@ -1019,7 +1128,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { builder = builder .set_statistics(statistics) - .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes); + .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes) + .set_repetition_level_histogram( + self.column_metrics.repetition_level_histogram.take(), + ) + .set_definition_level_histogram( + self.column_metrics.definition_level_histogram.take(), + ); } let metadata = builder.build()?; diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index 0b6d1f0d1a24..bb822b4ccbe7 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -99,6 +99,8 @@ impl HeapSize for ColumnChunkMetaData { + self.statistics.heap_size() + self.encoding_stats.heap_size() + self.unencoded_byte_array_data_bytes.heap_size() + + self.repetition_level_histogram.heap_size() + + self.definition_level_histogram.heap_size() } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 52206e66a590..cd3555de828c 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -557,6 +557,114 @@ pub struct ColumnChunkMetaData { column_index_offset: Option, column_index_length: Option, unencoded_byte_array_data_bytes: Option, + repetition_level_histogram: Option, + definition_level_histogram: Option, +} + +/// Histograms for repetition and definition levels. +/// +/// Each histogram is a vector of length `max_level + 1`. The value at index `i` is the number of +/// values at level `i`. +/// +/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the +/// number of rows with level 1, and so on. +/// +#[derive(Debug, Clone, PartialEq)] +pub struct LevelHistogram { + inner: Vec, +} + +impl LevelHistogram { + /// Creates a new level histogram data. + /// + /// Length will be `max_level + 1`. + /// + /// Returns `None` when `max_level == 0` (because histograms are not necessary in this case) + pub fn try_new(max_level: i16) -> Option { + if max_level > 0 { + Some(Self { + inner: vec![0; max_level as usize + 1], + }) + } else { + None + } + } + /// Returns a reference to the the histogram's values. + pub fn values(&self) -> &[i64] { + &self.inner + } + + /// Return the inner vector, consuming self + pub fn into_inner(self) -> Vec { + self.inner + } + + /// Returns the histogram value at the given index. + /// + /// The value of `i` is the number of values with level `i`. For example, + /// `get(1)` returns the number of values with level 1. + /// + /// Returns `None` if the index is out of bounds. + pub fn get(&self, index: usize) -> Option { + self.inner.get(index).copied() + } + + /// Adds the values from the other histogram to this histogram + /// + /// # Panics + /// If the histograms have different lengths + pub fn add(&mut self, other: &Self) { + assert_eq!(self.len(), other.len()); + for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) { + *dst += src; + } + } + + /// return the length of the histogram + pub fn len(&self) -> usize { + self.inner.len() + } + + /// returns if the histogram is empty + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Sets the values of all histogram levels to 0. + pub fn reset(&mut self) { + for value in self.inner.iter_mut() { + *value = 0; + } + } + + /// Updates histogram values using provided repetition levels + /// + /// # Panics + /// if any of the levels is greater than the length of the histogram ( + /// the argument supplied to [`Self::try_new`]) + pub fn update_from_levels(&mut self, levels: &[i16]) { + for &level in levels { + self.inner[level as usize] += 1; + } + } +} + +impl From> for LevelHistogram { + fn from(inner: Vec) -> Self { + Self { inner } + } +} + +impl From for Vec { + fn from(value: LevelHistogram) -> Self { + value.into_inner() + } +} + +impl HeapSize for LevelHistogram { + fn heap_size(&self) -> usize { + self.inner.heap_size() + } } /// Represents common operations for a column chunk. @@ -717,6 +825,24 @@ impl ColumnChunkMetaData { self.unencoded_byte_array_data_bytes } + /// Returns the repetition level histogram. + /// + /// The returned value `vec[i]` is how many values are at repetition level `i`. For example, + /// `vec[0]` indicates how many rows the page contains. + /// This field may not be set by older writers. + pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> { + self.repetition_level_histogram.as_ref() + } + + /// Returns the definition level histogram. + /// + /// The returned value `vec[i]` is how many values are at definition level `i`. For example, + /// `vec[max_definition_level]` indicates how many non-null values are present in the page. + /// This field may not be set by older writers. + pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> { + self.definition_level_histogram.as_ref() + } + /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { if cc.meta_data.is_none() { @@ -754,13 +880,23 @@ impl ColumnChunkMetaData { let offset_index_length = cc.offset_index_length; let column_index_offset = cc.column_index_offset; let column_index_length = cc.column_index_length; - let unencoded_byte_array_data_bytes = if let Some(size_stats) = col_metadata.size_statistics - { - size_stats.unencoded_byte_array_data_bytes + let ( + unencoded_byte_array_data_bytes, + repetition_level_histogram, + definition_level_histogram, + ) = if let Some(size_stats) = col_metadata.size_statistics { + ( + size_stats.unencoded_byte_array_data_bytes, + size_stats.repetition_level_histogram, + size_stats.definition_level_histogram, + ) } else { - None + (None, None, None) }; + let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from); + let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from); + let result = ColumnChunkMetaData { column_descr, encodings, @@ -782,6 +918,8 @@ impl ColumnChunkMetaData { column_index_offset, column_index_length, unencoded_byte_array_data_bytes, + repetition_level_histogram, + definition_level_histogram, }; Ok(result) } @@ -805,11 +943,24 @@ impl ColumnChunkMetaData { /// Method to convert to Thrift `ColumnMetaData` pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { - let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() { + let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() + || self.repetition_level_histogram.is_some() + || self.definition_level_histogram.is_some() + { + let repetition_level_histogram = self + .repetition_level_histogram + .as_ref() + .map(|hist| hist.clone().into_inner()); + + let definition_level_histogram = self + .definition_level_histogram + .as_ref() + .map(|hist| hist.clone().into_inner()); + Some(SizeStatistics { unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, - repetition_level_histogram: None, - definition_level_histogram: None, + repetition_level_histogram, + definition_level_histogram, }) } else { None @@ -871,6 +1022,8 @@ impl ColumnChunkMetaDataBuilder { column_index_offset: None, column_index_length: None, unencoded_byte_array_data_bytes: None, + repetition_level_histogram: None, + definition_level_histogram: None, }) } @@ -988,6 +1141,18 @@ impl ColumnChunkMetaDataBuilder { self } + /// Sets optional repetition level histogram + pub fn set_repetition_level_histogram(mut self, value: Option) -> Self { + self.0.repetition_level_histogram = value; + self + } + + /// Sets optional repetition level histogram + pub fn set_definition_level_histogram(mut self, value: Option) -> Self { + self.0.definition_level_histogram = value; + self + } + /// Builds column chunk metadata. pub fn build(self) -> Result { Ok(self.0) @@ -1003,6 +1168,10 @@ pub struct ColumnIndexBuilder { max_values: Vec>, null_counts: Vec, boundary_order: BoundaryOrder, + /// contains the concatenation of the histograms of all pages + repetition_level_histograms: Option>, + /// contains the concatenation of the histograms of all pages + definition_level_histograms: Option>, /// Is the information in the builder valid? /// /// Set to `false` if any entry in the page doesn't have statistics for @@ -1027,6 +1196,8 @@ impl ColumnIndexBuilder { max_values: Vec::new(), null_counts: Vec::new(), boundary_order: BoundaryOrder::UNORDERED, + repetition_level_histograms: None, + definition_level_histograms: None, valid: true, } } @@ -1045,6 +1216,28 @@ impl ColumnIndexBuilder { self.null_counts.push(null_count); } + /// Append the given page-level histograms to the [`ColumnIndex`] histograms. + /// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state. + pub fn append_histograms( + &mut self, + repetition_level_histogram: &Option, + definition_level_histogram: &Option, + ) { + if !self.valid { + return; + } + if let Some(ref rep_lvl_hist) = repetition_level_histogram { + let hist = self.repetition_level_histograms.get_or_insert(Vec::new()); + hist.reserve(rep_lvl_hist.len()); + hist.extend(rep_lvl_hist.values()); + } + if let Some(ref def_lvl_hist) = definition_level_histogram { + let hist = self.definition_level_histograms.get_or_insert(Vec::new()); + hist.reserve(def_lvl_hist.len()); + hist.extend(def_lvl_hist.values()); + } + } + pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) { self.boundary_order = boundary_order; } @@ -1069,8 +1262,8 @@ impl ColumnIndexBuilder { self.max_values, self.boundary_order, self.null_counts, - None, - None, + self.repetition_level_histograms, + self.definition_level_histograms, ) } } @@ -1286,6 +1479,8 @@ mod tests { .set_column_index_offset(Some(8000)) .set_column_index_length(Some(25)) .set_unencoded_byte_array_data_bytes(Some(2000)) + .set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100]))) + .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200]))) .build() .unwrap(); @@ -1397,7 +1592,7 @@ mod tests { let row_group_meta_with_stats = vec![row_group_meta_with_stats]; let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats); - let base_expected_size = 2088; + let base_expected_size = 2280; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -1425,7 +1620,7 @@ mod tests { ]]), ); - let bigger_expected_size = 2400; + let bigger_expected_size = 2784; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index 7eba949042f1..68412572b5f2 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -36,6 +36,17 @@ pub struct PageIndex { pub max: Option, /// Null values in the page pub null_count: Option, + /// Repetition level histogram for the page + /// + /// `repetition_level_histogram[i]` is a count of how many values are at repetition level `i`. + /// For example, `repetition_level_histogram[0]` indicates how many rows the page contains. + pub repetition_level_histogram: Option>, + /// Definition level histogram for the page + /// + /// `definition_level_histogram[i]` is a count of how many values are at definition level `i`. + /// For example, `definition_level_histogram[max_definition_level]` indicates how many + /// non-null values are present in the page. + pub definition_level_histogram: Option>, } impl PageIndex { @@ -48,6 +59,12 @@ impl PageIndex { pub fn null_count(&self) -> Option { self.null_count } + pub fn repetition_level_histogram(&self) -> Option<&Vec> { + self.repetition_level_histogram.as_ref() + } + pub fn definition_level_histogram(&self) -> Option<&Vec> { + self.definition_level_histogram.as_ref() + } } impl PageIndex @@ -149,26 +166,57 @@ impl NativeIndex { .map(|x| x.into_iter().map(Some).collect::>()) .unwrap_or_else(|| vec![None; len]); + // histograms are a 1D array encoding a 2D num_pages X num_levels matrix. + let to_page_histograms = |opt_hist: Option>| { + if let Some(hist) = opt_hist { + // TODO: should we assert (hist.len() % len) == 0? + let num_levels = hist.len() / len; + let mut res = Vec::with_capacity(len); + for i in 0..len { + let page_idx = i * num_levels; + let page_hist = hist[page_idx..page_idx + num_levels].to_vec(); + res.push(Some(page_hist)); + } + res + } else { + vec![None; len] + } + }; + + let rep_hists: Vec>> = + to_page_histograms(index.repetition_level_histograms); + let def_hists: Vec>> = + to_page_histograms(index.definition_level_histograms); + let indexes = index .min_values .iter() .zip(index.max_values.into_iter()) .zip(index.null_pages.into_iter()) .zip(null_counts.into_iter()) - .map(|(((min, max), is_null), null_count)| { - let (min, max) = if is_null { - (None, None) - } else { - let min = min.as_slice(); - let max = max.as_slice(); - (Some(from_le_slice::(min)), Some(from_le_slice::(max))) - }; - Ok(PageIndex { - min, - max, - null_count, - }) - }) + .zip(rep_hists.into_iter()) + .zip(def_hists.into_iter()) + .map( + |( + ((((min, max), is_null), null_count), repetition_level_histogram), + definition_level_histogram, + )| { + let (min, max) = if is_null { + (None, None) + } else { + let min = min.as_slice(); + let max = max.as_slice(); + (Some(from_le_slice::(min)), Some(from_le_slice::(max))) + }; + Ok(PageIndex { + min, + max, + null_count, + repetition_level_histogram, + definition_level_histogram, + }) + }, + ) .collect::, ParquetError>>()?; Ok(Self { @@ -188,6 +236,8 @@ mod tests { min: Some(-123), max: Some(234), null_count: Some(0), + repetition_level_histogram: Some(vec![1, 2]), + definition_level_histogram: Some(vec![1, 2, 3]), }; assert_eq!(page_index.min().unwrap(), &-123); @@ -195,6 +245,11 @@ mod tests { assert_eq!(page_index.min_bytes().unwrap(), (-123).as_bytes()); assert_eq!(page_index.max_bytes().unwrap(), 234.as_bytes()); assert_eq!(page_index.null_count().unwrap(), 0); + assert_eq!(page_index.repetition_level_histogram(), Some(&vec![1, 2])); + assert_eq!( + page_index.definition_level_histogram(), + Some(&vec![1, 2, 3]) + ); } #[test] @@ -203,6 +258,8 @@ mod tests { min: None, max: None, null_count: None, + repetition_level_histogram: None, + definition_level_histogram: None, }; assert_eq!(page_index.min(), None); @@ -210,5 +267,7 @@ mod tests { assert_eq!(page_index.min_bytes(), None); assert_eq!(page_index.max_bytes(), None); assert_eq!(page_index.null_count(), None); + assert_eq!(page_index.repetition_level_histogram(), None); + assert_eq!(page_index.definition_level_histogram(), None); } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index c44a7e6697f0..f2e8f74a378c 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -663,6 +663,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)) .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes()); + if let Some(rep_hist) = metadata.repetition_level_histogram() { + builder = builder.set_repetition_level_histogram(Some(rep_hist.clone())) + } + if let Some(def_hist) = metadata.definition_level_histogram() { + builder = builder.set_definition_level_histogram(Some(def_hist.clone())) + } if let Some(statistics) = metadata.statistics() { builder = builder.set_statistics(statistics.clone()) } @@ -1889,6 +1895,12 @@ mod tests { assert_eq!(file_metadata.row_groups[0].columns.len(), 1); assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + let check_def_hist = |def_hist: &[i64]| { + assert_eq!(def_hist.len(), 2); + assert_eq!(def_hist[0], 3); + assert_eq!(def_hist[1], 7); + }; + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); let meta_data = file_metadata.row_groups[0].columns[0] .meta_data @@ -1898,12 +1910,13 @@ mod tests { let size_stats = meta_data.size_statistics.as_ref().unwrap(); assert!(size_stats.repetition_level_histogram.is_none()); - assert!(size_stats.definition_level_histogram.is_none()); + assert!(size_stats.definition_level_histogram.is_some()); assert!(size_stats.unencoded_byte_array_data_bytes.is_some()); assert_eq!( unenc_size, size_stats.unencoded_byte_array_data_bytes.unwrap() ); + check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap()); // check that the read metadata is also correct let options = ReadOptionsBuilder::new().with_page_index().build(); @@ -1915,12 +1928,31 @@ mod tests { let rowgroup = reader.get_row_group(0).unwrap(); assert_eq!(rowgroup.num_columns(), 1); let column = rowgroup.metadata().column(0); + assert!(column.definition_level_histogram().is_some()); + assert!(column.repetition_level_histogram().is_none()); assert!(column.unencoded_byte_array_data_bytes().is_some()); + check_def_hist(column.definition_level_histogram().unwrap().values()); assert_eq!( unenc_size, column.unencoded_byte_array_data_bytes().unwrap() ); + // check histogram in column index as well + assert!(reader.metadata().column_index().is_some()); + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); + assert_eq!(column_index[0].len(), 1); + let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] { + assert_eq!(index.indexes.len(), 1); + &index.indexes[0] + } else { + unreachable!() + }; + + assert!(col_idx.repetition_level_histogram().is_none()); + assert!(col_idx.definition_level_histogram().is_some()); + check_def_hist(col_idx.definition_level_histogram().unwrap()); + assert!(reader.metadata().offset_index().is_some()); let offset_index = reader.metadata().offset_index().unwrap(); assert_eq!(offset_index.len(), 1); @@ -1933,4 +1965,114 @@ mod tests { assert_eq!(page_sizes.len(), 1); assert_eq!(page_sizes[0], unenc_size); } + + #[test] + fn test_size_statistics_with_repetition_and_nulls() { + let message_type = " + message test_schema { + OPTIONAL group i32_list (LIST) { + REPEATED group list { + OPTIONAL INT32 element; + } + } + } + "; + // column is: + // row 0: [1, 2] + // row 1: NULL + // row 2: [4, NULL] + // row 3: [] + // row 4: [7, 8, 9, 10] + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let data = [1, 2, 4, 7, 8, 9, 10]; + let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3]; + let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1]; + let file = tempfile::tempfile().unwrap(); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + ); + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + let mut row_group_writer = writer.next_row_group().unwrap(); + + let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); + col_writer + .typed::() + .write_batch(&data, Some(&def_levels), Some(&rep_levels)) + .unwrap(); + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + let file_metadata = writer.close().unwrap(); + + assert_eq!(file_metadata.row_groups.len(), 1); + assert_eq!(file_metadata.row_groups[0].columns.len(), 1); + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + + let check_def_hist = |def_hist: &[i64]| { + assert_eq!(def_hist.len(), 4); + assert_eq!(def_hist[0], 1); + assert_eq!(def_hist[1], 1); + assert_eq!(def_hist[2], 1); + assert_eq!(def_hist[3], 7); + }; + + let check_rep_hist = |rep_hist: &[i64]| { + assert_eq!(rep_hist.len(), 2); + assert_eq!(rep_hist[0], 5); + assert_eq!(rep_hist[1], 5); + }; + + // check that histograms are set properly in the write and read metadata + // also check that unencoded_byte_array_data_bytes is not set + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + let meta_data = file_metadata.row_groups[0].columns[0] + .meta_data + .as_ref() + .unwrap(); + assert!(meta_data.size_statistics.is_some()); + let size_stats = meta_data.size_statistics.as_ref().unwrap(); + assert!(size_stats.repetition_level_histogram.is_some()); + assert!(size_stats.definition_level_histogram.is_some()); + assert!(size_stats.unencoded_byte_array_data_bytes.is_none()); + check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap()); + check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap()); + + // check that the read metadata is also correct + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options).unwrap(); + + let rfile_metadata = reader.metadata().file_metadata(); + assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows); + assert_eq!(reader.num_row_groups(), 1); + let rowgroup = reader.get_row_group(0).unwrap(); + assert_eq!(rowgroup.num_columns(), 1); + let column = rowgroup.metadata().column(0); + assert!(column.definition_level_histogram().is_some()); + assert!(column.repetition_level_histogram().is_some()); + assert!(column.unencoded_byte_array_data_bytes().is_none()); + check_def_hist(column.definition_level_histogram().unwrap().values()); + check_rep_hist(column.repetition_level_histogram().unwrap().values()); + + // check histogram in column index as well + assert!(reader.metadata().column_index().is_some()); + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); + assert_eq!(column_index[0].len(), 1); + let col_idx = if let Index::INT32(index) = &column_index[0][0] { + assert_eq!(index.indexes.len(), 1); + &index.indexes[0] + } else { + unreachable!() + }; + + check_def_hist(col_idx.definition_level_histogram().unwrap()); + check_rep_hist(col_idx.repetition_level_histogram().unwrap()); + + assert!(reader.metadata().offset_index().is_some()); + let offset_index = reader.metadata().offset_index().unwrap(); + assert_eq!(offset_index.len(), 1); + assert_eq!(offset_index[0].len(), 1); + assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none()); + } }