Skip to content

Commit

Permalink
Minor: Add ParquetExec::table_parquet_options accessor (apache#9909)
Browse files Browse the repository at this point in the history
alamb authored Apr 2, 2024
1 parent 544e49b commit e8ab555
Showing 2 changed files with 21 additions and 13 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
@@ -1362,6 +1362,8 @@ impl TableOptions {
}
}

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
32 changes: 19 additions & 13 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -89,9 +89,10 @@ pub struct ParquetExec {
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
/// Cached plan properties such as equivalence properties, ordering, partitioning, etc.
cache: PlanProperties,
/// Parquet Options
parquet_options: TableParquetOptions,
/// Options for reading Parquet files
table_parquet_options: TableParquetOptions,
}

impl ParquetExec {
@@ -100,7 +101,7 @@ impl ParquetExec {
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
parquet_options: TableParquetOptions,
table_parquet_options: TableParquetOptions,
) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
@@ -155,15 +156,20 @@ impl ParquetExec {
metadata_size_hint,
parquet_file_reader_factory: None,
cache,
parquet_options,
table_parquet_options,
}
}

/// Ref to the base configs
/// [`FileScanConfig`] that controls this scan (such as which files to read)
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

/// Options passed to the parquet reader for this scan
pub fn table_parquet_options(&self) -> &TableParquetOptions {
&self.table_parquet_options
}

/// Optional predicate.
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.predicate.as_ref()
@@ -197,13 +203,13 @@ impl ParquetExec {
///
/// [`Expr`]: datafusion_expr::Expr
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
self.parquet_options.global.pushdown_filters = pushdown_filters;
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
self
}

/// Return the value described in [`Self::with_pushdown_filters`]
fn pushdown_filters(&self) -> bool {
self.parquet_options.global.pushdown_filters
self.table_parquet_options.global.pushdown_filters
}

/// If true, the `RowFilter` made by `pushdown_filters` may try to
@@ -213,38 +219,38 @@ impl ParquetExec {
///
/// [`Expr`]: datafusion_expr::Expr
pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
self.parquet_options.global.reorder_filters = reorder_filters;
self.table_parquet_options.global.reorder_filters = reorder_filters;
self
}

/// Return the value described in [`Self::with_reorder_filters`]
fn reorder_filters(&self) -> bool {
self.parquet_options.global.reorder_filters
self.table_parquet_options.global.reorder_filters
}

/// If enabled, the reader will read the page index
/// This is used to optimise filter pushdown
/// via `RowSelector` and `RowFilter` by
/// eliminating unnecessary IO and decoding
pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
self.parquet_options.global.enable_page_index = enable_page_index;
self.table_parquet_options.global.enable_page_index = enable_page_index;
self
}

/// Return the value described in [`Self::with_enable_page_index`]
fn enable_page_index(&self) -> bool {
self.parquet_options.global.enable_page_index
self.table_parquet_options.global.enable_page_index
}

/// If enabled, the reader will read by the bloom filter
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.parquet_options.global.bloom_filter_enabled = enable_bloom_filter;
self.table_parquet_options.global.bloom_filter_enabled = enable_bloom_filter;
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self) -> bool {
self.parquet_options.global.bloom_filter_enabled
self.table_parquet_options.global.bloom_filter_enabled
}

fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning {

0 comments on commit e8ab555

Please sign in to comment.