From c00cb27f43051d890543a0c85fca78a5d9a8e94f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 26 Jun 2024 14:14:57 -0400 Subject: [PATCH] Add ParquetMetadata::memory_size size estimation --- parquet/src/data_type.rs | 18 ++ parquet/src/file/metadata/memory.rs | 213 ++++++++++++++++++ .../src/file/{metadata.rs => metadata/mod.rs} | 90 +++++++- parquet/src/schema/types.rs | 36 +++ 4 files changed, 356 insertions(+), 1 deletion(-) create mode 100644 parquet/src/file/metadata/memory.rs rename parquet/src/file/{metadata.rs => metadata/mod.rs} (92%) diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 5e1d53badba2..8316c3048a92 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -654,6 +654,13 @@ pub(crate) mod private { /// Return the value as an mutable Any to allow for downcasts without transmutation fn as_mut_any(&mut self) -> &mut dyn std::any::Any; + + /// Returns the number of bytes of memory this instance uses on the heap. + /// + /// Defaults to none (0) + fn heap_size(&self) -> usize { + 0 + } } impl ParquetValueType for bool { @@ -968,6 +975,13 @@ pub(crate) mod private { fn as_mut_any(&mut self) -> &mut dyn std::any::Any { self } + + fn heap_size(&self) -> usize { + // note: this is an estimate, not exact, so just return the size + // of the actual data used, don't try to handle the fact that it may + // be shared. + self.data.as_ref().map(|data| data.len()).unwrap_or(0) + } } impl ParquetValueType for super::FixedLenByteArray { @@ -1054,6 +1068,10 @@ pub(crate) mod private { fn as_mut_any(&mut self) -> &mut dyn std::any::Any { self } + + fn heap_size(&self) -> usize { + self.0.heap_size() + } } } diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs new file mode 100644 index 000000000000..477d3b8499d0 --- /dev/null +++ b/parquet/src/file/metadata/memory.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory calculations for [`ParquetMetadata::memory_size`] +//! +//! [`ParquetMetadata::memory_size`]: crate::file::metadata::ParquetMetaData::memory_size +use crate::basic::{ColumnOrder, Compression, Encoding, PageType}; +use crate::data_type::private::ParquetValueType; +use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData}; +use crate::file::page_encoding_stats::PageEncodingStats; +use crate::file::page_index::index::{Index, NativeIndex, PageIndex}; +use crate::file::statistics::{Statistics, ValueStatistics}; +use crate::format::{BoundaryOrder, PageLocation, SortingColumn}; +use std::sync::Arc; + +/// Trait for calculating the size of various containers +pub(crate) trait HeapSize { + /// Return the size of any bytes allocated on the heap by this object, + /// including heap memory in those structures + /// + /// Note that the size of the type itself is not included in the result -- + /// instead, that size is added by the caller (e.g. container). + fn heap_size(&self) -> usize; +} + +impl HeapSize for Vec { + fn heap_size(&self) -> usize { + let item_size = std::mem::size_of::(); + // account for the contents of the Vec + (self.capacity() * item_size) + + // add any heap allocations by contents + self.iter().map(|t| t.heap_size()).sum::() + } +} + +impl HeapSize for Arc { + fn heap_size(&self) -> usize { + self.as_ref().heap_size() + } +} + +impl HeapSize for Option { + fn heap_size(&self) -> usize { + self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0) + } +} + +impl HeapSize for String { + fn heap_size(&self) -> usize { + self.capacity() + } +} + +impl HeapSize for FileMetaData { + fn heap_size(&self) -> usize { + self.created_by.heap_size() + + self.key_value_metadata.heap_size() + + self.schema_descr.heap_size() + + self.column_orders.heap_size() + } +} + +impl HeapSize for KeyValue { + fn heap_size(&self) -> usize { + self.key.heap_size() + self.value.heap_size() + } +} + +impl HeapSize for RowGroupMetaData { + fn heap_size(&self) -> usize { + // don't count schema_descr here because it is already + // counted in FileMetaData + self.columns.heap_size() + self.sorting_columns.heap_size() + } +} + +impl HeapSize for ColumnChunkMetaData { + fn heap_size(&self) -> usize { + // don't count column_descr here because it is already counted in + // FileMetaData + self.encodings.heap_size() + + self.file_path.heap_size() + + self.compression.heap_size() + + self.statistics.heap_size() + + self.encoding_stats.heap_size() + } +} + +impl HeapSize for Encoding { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for PageEncodingStats { + fn heap_size(&self) -> usize { + self.page_type.heap_size() + self.encoding.heap_size() + } +} + +impl HeapSize for SortingColumn { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl HeapSize for Compression { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for PageType { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl HeapSize for Statistics { + fn heap_size(&self) -> usize { + match self { + Statistics::Boolean(value_statistics) => value_statistics.heap_size(), + Statistics::Int32(value_statistics) => value_statistics.heap_size(), + Statistics::Int64(value_statistics) => value_statistics.heap_size(), + Statistics::Int96(value_statistics) => value_statistics.heap_size(), + Statistics::Float(value_statistics) => value_statistics.heap_size(), + Statistics::Double(value_statistics) => value_statistics.heap_size(), + Statistics::ByteArray(value_statistics) => value_statistics.heap_size(), + Statistics::FixedLenByteArray(value_statistics) => value_statistics.heap_size(), + } + } +} + +impl HeapSize for Index { + fn heap_size(&self) -> usize { + match self { + Index::NONE => 0, + Index::BOOLEAN(native_index) => native_index.heap_size(), + Index::INT32(native_index) => native_index.heap_size(), + Index::INT64(native_index) => native_index.heap_size(), + Index::INT96(native_index) => native_index.heap_size(), + Index::FLOAT(native_index) => native_index.heap_size(), + Index::DOUBLE(native_index) => native_index.heap_size(), + Index::BYTE_ARRAY(native_index) => native_index.heap_size(), + Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index.heap_size(), + } + } +} + +impl HeapSize for NativeIndex { + fn heap_size(&self) -> usize { + let Self { + indexes, + boundary_order, + } = self; + indexes.heap_size() + boundary_order.heap_size() + } +} + +impl HeapSize for PageIndex { + fn heap_size(&self) -> usize { + self.min.heap_size() + self.max.heap_size() + self.null_count.heap_size() + } +} + +impl HeapSize for ValueStatistics { + fn heap_size(&self) -> usize { + self.min().heap_size() + self.max().heap_size() + } +} + +// Note this impl gets most primitive types like bool, i32, etc +impl HeapSize for T { + fn heap_size(&self) -> usize { + self.heap_size() + } +} + +impl HeapSize for usize { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for BoundaryOrder { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for PageLocation { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for ColumnOrder { + fn heap_size(&self) -> usize { + 0 // no heap allocations in ColumnOrder + } +} diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata/mod.rs similarity index 92% rename from parquet/src/file/metadata.rs rename to parquet/src/file/metadata/mod.rs index fb8f798fd3ac..40922d52bfd4 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata/mod.rs @@ -29,6 +29,8 @@ //! * [`ColumnChunkMetaData`]: Metadata for each column chunk (primitive leaf) //! within a Row Group including encoding and compression information, //! number of values, statistics, etc. +mod memory; + use std::ops::Range; use std::sync::Arc; @@ -39,6 +41,7 @@ use crate::format::{ use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::errors::{ParquetError, Result}; +pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::page_encoding_stats::{self, PageEncodingStats}; use crate::file::page_index::index::Index; use crate::file::statistics::{self, Statistics}; @@ -176,6 +179,28 @@ impl ParquetMetaData { self.offset_index.as_ref() } + /// Estimate of the bytes allocated to store `ParquetMetadata` + /// + /// # Notes: + /// + /// 1. Includes size of self + /// + /// 2. Includes heap memory for sub fields such as [`FileMetaData`] and + /// [`RowGroupMetaData`]. + /// + /// 3. Includes memory from shared pointers (e.g. [`SchemaDescPtr`]). This + /// means `memory_size` will over estimate the memory size if such pointers + /// are shared. + /// + /// 4. Does not include any allocator overheads + pub fn memory_size(&self) -> usize { + std::mem::size_of::() + + self.file_metadata.heap_size() + + self.row_groups.heap_size() + + self.column_index.heap_size() + + self.offset_index.heap_size() + } + /// Override the column index #[cfg(feature = "arrow")] pub(crate) fn set_column_index(&mut self, index: Option) { @@ -1034,7 +1059,8 @@ impl OffsetIndexBuilder { #[cfg(test)] mod tests { use super::*; - use crate::basic::PageType; + use crate::basic::{PageType, SortOrder}; + use crate::file::page_index::index::NativeIndex; #[test] fn test_row_group_metadata_thrift_conversion() { @@ -1227,6 +1253,68 @@ mod tests { assert_eq!(compressed_size_res, compressed_size_exp); } + #[test] + fn test_memory_size() { + let schema_descr = get_test_schema_descr(); + + let columns = schema_descr + .columns() + .iter() + .map(|column_descr| ColumnChunkMetaData::builder(column_descr.clone()).build()) + .collect::>>() + .unwrap(); + let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(1000) + .set_column_metadata(columns) + .build() + .unwrap(); + let row_group_meta = vec![row_group_meta]; + + let version = 2; + let num_rows = 1000; + let created_by = Some(String::from("test harness")); + let key_value_metadata = Some(vec![KeyValue::new( + String::from("Foo"), + Some(String::from("bar")), + )]); + let column_orders = Some(vec![ + ColumnOrder::UNDEFINED, + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED), + ]); + let file_metadata = FileMetaData::new( + version, + num_rows, + created_by, + key_value_metadata, + schema_descr, + column_orders, + ); + let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta.clone()); + let base_expected_size = 1320; + assert_eq!(parquet_meta.memory_size(), base_expected_size); + + let mut column_index = ColumnIndexBuilder::new(); + column_index.append(false, vec![1u8], vec![2u8, 3u8], 4); + let column_index = column_index.build_to_thrift(); + let native_index = NativeIndex::::try_new(column_index).unwrap(); + + // Now, add in OffsetIndex + let parquet_meta = ParquetMetaData::new_with_page_index( + file_metadata, + row_group_meta, + Some(vec![vec![Index::BOOLEAN(native_index)]]), + Some(vec![vec![ + vec![PageLocation::new(1, 2, 3)], + vec![PageLocation::new(1, 2, 3)], + ]]), + ); + + let bigger_expected_size = 2304; + // more set fields means more memory usage + assert!(bigger_expected_size > base_expected_size); + assert_eq!(parquet_meta.memory_size(), bigger_expected_size); + } + /// Returns sample schema descriptor so we can create column metadata. fn get_test_schema_descr() -> SchemaDescPtr { let schema = SchemaType::group_type_builder("schema") diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 13cc016df02b..0eb08f557dce 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, sync::Arc}; +use crate::file::metadata::HeapSize; use crate::format::SchemaElement; use crate::basic::{ @@ -57,6 +58,15 @@ pub enum Type { }, } +impl HeapSize for Type { + fn heap_size(&self) -> usize { + match self { + Type::PrimitiveType { basic_info, .. } => basic_info.heap_size(), + Type::GroupType { basic_info, fields } => basic_info.heap_size() + fields.heap_size(), + } + } +} + impl Type { /// Creates primitive type builder with provided field name and physical type. pub fn primitive_type_builder(name: &str, physical_type: PhysicalType) -> PrimitiveTypeBuilder { @@ -625,6 +635,13 @@ pub struct BasicTypeInfo { id: Option, } +impl HeapSize for BasicTypeInfo { + fn heap_size(&self) -> usize { + // no heap allocations in any other subfield + self.name.heap_size() + } +} + impl BasicTypeInfo { /// Returns field name. pub fn name(&self) -> &str { @@ -693,6 +710,12 @@ pub struct ColumnPath { parts: Vec, } +impl HeapSize for ColumnPath { + fn heap_size(&self) -> usize { + self.parts.heap_size() + } +} + impl ColumnPath { /// Creates new column path from vector of field names. pub fn new(parts: Vec) -> Self { @@ -781,6 +804,12 @@ pub struct ColumnDescriptor { path: ColumnPath, } +impl HeapSize for ColumnDescriptor { + fn heap_size(&self) -> usize { + self.primitive_type.heap_size() + self.path.heap_size() + } +} + impl ColumnDescriptor { /// Creates new descriptor for leaf-level column. pub fn new( @@ -925,6 +954,13 @@ impl fmt::Debug for SchemaDescriptor { } } +// Need to implement HeapSize in this module as the fields are private +impl HeapSize for SchemaDescriptor { + fn heap_size(&self) -> usize { + self.schema.heap_size() + self.leaves.heap_size() + self.leaf_to_base.heap_size() + } +} + impl SchemaDescriptor { /// Creates new schema descriptor from Parquet schema. pub fn new(tp: TypePtr) -> Self {