Skip to content

Commit

Permalink
fix: predicate shall use real schema to create physical exprs (#2642)
Browse files Browse the repository at this point in the history
* fix: prune predicate show use real schema to create physical exprs

* refactor: remove redundant results

* fix: unit tests

* test: add more sqlness cases

* test: add more sqlness cases

* fix: sqlness orderby

* chore: update log

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
v0y4g3r and evenyag authored Oct 24, 2023
1 parent 1fc42a6 commit 694d126
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 72 deletions.
40 changes: 38 additions & 2 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_query::logical_plan::DfExpr;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatches;
use datafusion_common::ScalarValue;
use datafusion_expr::lit;
use datafusion_expr::{col, lit};
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
rows: build_rows(0, 15),
},
)
.await;
Expand Down Expand Up @@ -76,6 +76,16 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 10 | 10.0 | 1970-01-01T00:00:10 |
| 11 | 11.0 | 1970-01-01T00:00:11 |
| 12 | 12.0 | 1970-01-01T00:00:12 |
| 13 | 13.0 | 1970-01-01T00:00:13 |
| 14 | 14.0 | 1970-01-01T00:00:14 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
Expand All @@ -84,7 +94,11 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+",
)
.await;
}

#[tokio::test]
async fn test_prune_tag() {
// prune result: only row group 1&2
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
"\
Expand All @@ -100,3 +114,25 @@ async fn test_read_parquet_stats() {
)
.await;
}

#[tokio::test]
async fn test_prune_tag_and_field() {
common_telemetry::init_default_ut_logging();
// prune result: only row group 1
check_prune_row_groups(
col("tag_0")
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
.and(col("field_0").lt(lit(8.0))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
}
9 changes: 2 additions & 7 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::{BuildPredicateSnafu, Result};
use crate::error::Result;
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
Expand Down Expand Up @@ -173,11 +172,7 @@ impl ScanRegion {
total_ssts
);

let predicate = Predicate::try_new(
self.request.filters.clone(),
self.version.metadata.schema.clone(),
)
.context(BuildPredicateSnafu)?;
let predicate = Predicate::new(self.request.filters.clone());
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ impl ParquetReaderBuilder {
&read_format,
column_ids,
);

let pruned_row_groups = predicate
.prune_with_stats(&stats)
.prune_with_stats(&stats, read_format.metadata().schema.arrow_schema())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
Expand Down
6 changes: 1 addition & 5 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,7 @@ impl ChunkReaderBuilder {
reader_builder = reader_builder.push_batch_iter(iter);
}

let predicate = Predicate::try_new(
self.filters.clone(),
self.schema.store_schema().schema().clone(),
)
.context(error::BuildPredicateSnafu)?;
let predicate = Predicate::new(self.filters.clone());

let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
Expand Down
21 changes: 8 additions & 13 deletions src/storage/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl ParquetReader {

let pruned_row_groups = self
.predicate
.prune_row_groups(builder.metadata().row_groups())
.prune_row_groups(
builder.metadata().row_groups(),
store_schema.schema().clone(),
)
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
Expand Down Expand Up @@ -549,12 +552,11 @@ mod tests {
let operator = create_object_store(dir.path().to_str().unwrap());

let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let user_schema = projected_schema.projected_user_schema().clone();
let reader = ParquetReader::new(
sst_file_handle,
operator,
projected_schema,
Predicate::empty(user_schema),
Predicate::empty(),
TimestampRange::min_to_max(),
);

Expand Down Expand Up @@ -636,12 +638,11 @@ mod tests {
let operator = create_object_store(dir.path().to_str().unwrap());

let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let user_schema = projected_schema.projected_user_schema().clone();
let reader = ParquetReader::new(
file_handle,
operator,
projected_schema,
Predicate::empty(user_schema),
Predicate::empty(),
TimestampRange::min_to_max(),
);

Expand All @@ -665,14 +666,8 @@ mod tests {
range: TimestampRange,
expect: Vec<i64>,
) {
let store_schema = schema.schema_to_read().clone();
let reader = ParquetReader::new(
file_handle,
object_store,
schema,
Predicate::empty(store_schema.schema().clone()),
range,
);
let reader =
ParquetReader::new(file_handle, object_store, schema, Predicate::empty(), range);
let mut stream = reader.chunk_stream().await.unwrap();
let result = stream.next_batch().await;

Expand Down
16 changes: 13 additions & 3 deletions src/storage/src/sst/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use datatypes::prelude::ConcreteDataType;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::schema::types::SchemaDescriptor;
use snafu::ResultExt;
use table::predicate::Predicate;

use crate::error;
use crate::error::BuildPredicateSnafu;
use crate::schema::StoreSchema;

/// Builds row filters according to predicates.
Expand Down Expand Up @@ -80,7 +82,11 @@ pub(crate) fn build_row_filter(
Box::new(PlainTimestampRowFilter::new(time_range, ts_col_projection)) as _
};
let mut predicates = vec![time_range_row_filter];
if let Ok(datafusion_filters) = predicate_to_row_filter(predicate, projection_mask) {
if let Ok(datafusion_filters) = predicate_to_row_filter(
predicate,
projection_mask,
store_schema.schema().arrow_schema(),
) {
predicates.extend(datafusion_filters);
}
let filter = RowFilter::new(predicates);
Expand All @@ -90,9 +96,13 @@ pub(crate) fn build_row_filter(
fn predicate_to_row_filter(
predicate: &Predicate,
projection_mask: ProjectionMask,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Box<dyn ArrowPredicate>>> {
let mut datafusion_predicates = Vec::with_capacity(predicate.exprs().len());
for expr in predicate.exprs() {
let physical_exprs = predicate
.to_physical_exprs(schema)
.context(BuildPredicateSnafu)?;
let mut datafusion_predicates = Vec::with_capacity(physical_exprs.len());
for expr in &physical_exprs {
datafusion_predicates.push(Box::new(DatafusionArrowPredicate {
projection_mask: projection_mask.clone(),
physical_expr: expr.clone(),
Expand Down
92 changes: 51 additions & 41 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datatypes::arrow;
use datatypes::arrow::array::BooleanArray;
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
Expand All @@ -39,19 +40,24 @@ mod stats;

#[derive(Clone)]
pub struct Predicate {
/// The schema of the table that the expressions being applied.
schema: SchemaRef,
/// Physical expressions of this predicate.
exprs: Vec<Arc<dyn PhysicalExpr>>,
/// logical exprs
exprs: Vec<Expr>,
}

impl Predicate {
/// Creates a new `Predicate` by converting logical exprs to physical exprs that can be
/// evaluated against record batches.
/// Returns error when failed to convert exprs.
pub fn try_new(exprs: Vec<Expr>, schema: SchemaRef) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let df_schema = arrow_schema
pub fn new(exprs: Vec<Expr>) -> Self {
Self { exprs }
}

/// Builds physical exprs according to provided schema.
pub fn to_physical_exprs(
&self,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Arc<dyn PhysicalExpr>>> {
let df_schema = schema
.clone()
.to_dfschema_ref()
.context(error::DatafusionSnafu)?;
Expand All @@ -61,47 +67,38 @@ impl Predicate {
// registering variables.
let execution_props = &ExecutionProps::new();

let physical_exprs = exprs
self.exprs
.iter()
.map(|expr| {
create_physical_expr(
expr.df_expr(),
df_schema.as_ref(),
arrow_schema.as_ref(),
execution_props,
)
create_physical_expr(expr.df_expr(), df_schema.as_ref(), schema, execution_props)
})
.collect::<Result<_, _>>()
.context(error::DatafusionSnafu)?;

Ok(Self {
schema,
exprs: physical_exprs,
})
}

#[inline]
pub fn exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.exprs
.context(error::DatafusionSnafu)
}

/// Builds an empty predicate from given schema.
pub fn empty(schema: SchemaRef) -> Self {
Self {
schema,
exprs: vec![],
}
pub fn empty() -> Self {
Self { exprs: vec![] }
}

/// Evaluates the predicate against row group metadata.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_row_groups(&self, row_groups: &[RowGroupMetaData]) -> Vec<bool> {
pub fn prune_row_groups(
&self,
row_groups: &[RowGroupMetaData],
schema: SchemaRef,
) -> Vec<bool> {
let mut res = vec![true; row_groups.len()];
let arrow_schema = self.schema.arrow_schema();
for expr in &self.exprs {

let Ok(physical_exprs) = self.to_physical_exprs(schema.arrow_schema()) else {
return res;
};

let arrow_schema = schema.arrow_schema();
for expr in &physical_exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
Ok(p) => {
let stat = RowGroupPruningStatistics::new(row_groups, &self.schema);
let stat = RowGroupPruningStatistics::new(row_groups, &schema);
match p.prune(&stat) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
Expand All @@ -123,7 +120,9 @@ impl Predicate {

/// Prunes primary keys
pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result<bool> {
for expr in &self.exprs {
let pk_schema = primary_key.schema();
let physical_exprs = self.to_physical_exprs(&pk_schema)?;
for expr in &physical_exprs {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
Expand Down Expand Up @@ -156,11 +155,22 @@ impl Predicate {

/// Evaluates the predicate against the `stats`.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_with_stats<S: PruningStatistics>(&self, stats: &S) -> Vec<bool> {
pub fn prune_with_stats<S: PruningStatistics>(
&self,
stats: &S,
schema: &arrow::datatypes::SchemaRef,
) -> Vec<bool> {
let mut res = vec![true; stats.num_containers()];
let arrow_schema = self.schema.arrow_schema();
for expr in &self.exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
let physical_exprs = match self.to_physical_exprs(schema) {
Ok(expr) => expr,
Err(e) => {
warn!(e; "Failed to build physical expr from predicates: {:?}", &self.exprs);
return res;
}
};

for expr in &physical_exprs {
match PruningPredicate::try_new(expr.clone(), schema.clone()) {
Ok(p) => match p.prune(stats) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
Expand Down Expand Up @@ -641,7 +651,7 @@ mod tests {
let dir = create_temp_dir("prune_parquet");
let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await;
let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap());
let arrow_predicate = Predicate::try_new(filters, schema.clone()).unwrap();
let arrow_predicate = Predicate::new(filters);
let builder = ParquetRecordBatchStreamBuilder::new(
tokio::fs::OpenOptions::new()
.read(true)
Expand All @@ -653,7 +663,7 @@ mod tests {
.unwrap();
let metadata = builder.metadata().clone();
let row_groups = metadata.row_groups();
let res = arrow_predicate.prune_row_groups(row_groups);
let res = arrow_predicate.prune_row_groups(row_groups, schema);
assert_eq!(expect, res);
}

Expand Down
Loading

0 comments on commit 694d126

Please sign in to comment.