Skip to content

Commit

Permalink
feat: update parquet row filter to handle type coercion
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreyssmith2nd committed May 29, 2024
1 parent 235b044 commit 484b2fd
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
Arc::clone(&schema_mapping),
);

match row_filter {
Expand Down
29 changes: 23 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;

use crate::datasource::schema_adapter::SchemaMapper;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
Expand Down Expand Up @@ -79,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate {
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
time: metrics::Time,
/// used to perform type coercion while filtering rows
schema_mapping: Arc<dyn SchemaMapper>,
}

impl DatafusionArrowPredicate {
Expand All @@ -88,6 +91,7 @@ impl DatafusionArrowPredicate {
metadata: &ParquetMetaData,
rows_filtered: metrics::Count,
time: metrics::Time,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
let schema = Arc::new(schema.project(&candidate.projection)?);
let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?;
Expand All @@ -109,6 +113,7 @@ impl DatafusionArrowPredicate {
),
rows_filtered,
time,
schema_mapping,
})
}
}
Expand All @@ -124,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
false => batch.project(&self.projection)?,
};

let batch = self.schema_mapping.map_partial_batch(batch)?;

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
Expand Down Expand Up @@ -324,6 +331,7 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Option<RowFilter>> {
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let time = &file_metrics.pushdown_eval_time;
Expand Down Expand Up @@ -361,6 +369,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -373,6 +382,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -388,6 +398,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -408,6 +419,9 @@ mod test {
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;

use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use crate::datasource::schema_adapter::SchemaAdapterFactory;

use datafusion_common::ToDFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{cast, col, lit, Expr};
Expand Down Expand Up @@ -510,12 +524,19 @@ mod test {
.expect("building candidate")
.expect("candidate expected");

let schema_adapter = DefaultSchemaAdapterFactory{}.create(Arc::new(table_schema));
let (schema_mapping, _) = schema_adapter
.map_schema(&file_schema)
.expect("creating schema mapping");


let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&file_schema,
metadata,
Count::new(),
Time::new(),
schema_mapping,
)
.expect("creating filter predicate");

Expand All @@ -528,7 +549,7 @@ mod test {
);
// We need a matching schema to create a record batch
let batch_schema = Schema::new(vec![Field::new(
"timestamp",
"timestamp_col",
DataType::Timestamp(Nanosecond, None),
false,
)]);
Expand All @@ -539,11 +560,7 @@ mod test {

let filtered = row_filter.evaluate(record_batch);

let message = String::from("Error evaluating filter predicate: ArrowError(InvalidArgumentError(\"Invalid comparison operation: Timestamp(Nanosecond, None) == Timestamp(Nanosecond, Some(\\\"UTC\\\"))\"), None)");
assert!(matches!(filtered, Err(ArrowError::ComputeError(msg)) if message == msg));

// This currently fails (and should replace the above assert once passing)
// assert!(matches!(filtered, Ok(_)));
assert!(matches!(filtered, Ok(_)));
}

#[test]
Expand Down
38 changes: 37 additions & 1 deletion datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,16 @@ pub trait SchemaAdapter: Send + Sync {

/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;

/// Adapts a `RecordBatch` that does not have all the columns (as defined in the schema).
/// This method is slower than `map_batch` and should only be used when explicitly needed.
fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch>;
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -185,6 +192,31 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();

let mut cols = vec![];
let mut fields = vec![];
for (i, f) in schema.fields().iter().enumerate() {
let table_field = self.table_schema.field_with_name(f.name());
if let Ok(tf) = table_field {
cols.push(cast(&batch_cols[i], tf.data_type())?);
fields.push(tf.clone());
}
}

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new(fields));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -339,5 +371,9 @@ mod tests {

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
self.map_batch(batch)
}
}
}

0 comments on commit 484b2fd

Please sign in to comment.