Skip to content

Commit

Permalink
fix: predicate shall use real schema to create physical exprs (#2642)
Browse files Browse the repository at this point in the history
* fix: prune predicate show use real schema to create physical exprs

* refactor: remove redundant results

* fix: unit tests

* test: add more sqlness cases

* test: add more sqlness cases

* fix: sqlness orderby

* chore: update log

* fix: cache physical expr in memtable iter

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
v0y4g3r and evenyag authored Oct 24, 2023
1 parent 1fc42a6 commit 97897aa
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 85 deletions.
40 changes: 38 additions & 2 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_query::logical_plan::DfExpr;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatches;
use datafusion_common::ScalarValue;
use datafusion_expr::lit;
use datafusion_expr::{col, lit};
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
rows: build_rows(0, 15),
},
)
.await;
Expand Down Expand Up @@ -76,6 +76,16 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 10 | 10.0 | 1970-01-01T00:00:10 |
| 11 | 11.0 | 1970-01-01T00:00:11 |
| 12 | 12.0 | 1970-01-01T00:00:12 |
| 13 | 13.0 | 1970-01-01T00:00:13 |
| 14 | 14.0 | 1970-01-01T00:00:14 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
Expand All @@ -84,7 +94,11 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+",
)
.await;
}

#[tokio::test]
async fn test_prune_tag() {
// prune result: only row group 1&2
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
"\
Expand All @@ -100,3 +114,25 @@ async fn test_read_parquet_stats() {
)
.await;
}

#[tokio::test]
async fn test_prune_tag_and_field() {
common_telemetry::init_default_ut_logging();
// prune result: only row group 1
check_prune_row_groups(
col("tag_0")
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
.and(col("field_0").lt(lit(8.0))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
}
65 changes: 52 additions & 13 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use std::sync::{Arc, RwLock};

use api::v1::OpType;
use common_telemetry::debug;
use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
Expand Down Expand Up @@ -300,12 +303,16 @@ impl SeriesSet {
let (primary_key_builders, primary_key_schema) =
primary_key_builders(&self.region_metadata, 1);

let physical_exprs: Vec<_> = predicate
.and_then(|p| p.to_physical_exprs(&primary_key_schema).ok())
.unwrap_or_default();

Iter {
metadata: self.region_metadata.clone(),
series: self.series.clone(),
projection,
last_key: None,
predicate,
predicate: physical_exprs,
pk_schema: primary_key_schema,
primary_key_builders,
codec: self.codec.clone(),
Expand Down Expand Up @@ -341,7 +348,7 @@ struct Iter {
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
last_key: Option<Vec<u8>>,
predicate: Option<Predicate>,
predicate: Vec<Arc<dyn PhysicalExpr>>,
pk_schema: arrow::datatypes::SchemaRef,
primary_key_builders: Vec<Box<dyn MutableVector>>,
codec: Arc<McmpRowCodec>,
Expand All @@ -362,18 +369,18 @@ impl Iterator for Iter {
// TODO(hl): maybe yield more than one time series to amortize range overhead.
for (primary_key, series) in range {
let mut series = series.write().unwrap();
if let Some(predicate) = &self.predicate {
if !prune_primary_key(
if !self.predicate.is_empty()
&& !prune_primary_key(
&self.codec,
primary_key.as_slice(),
&mut series,
&mut self.primary_key_builders,
self.pk_schema.clone(),
predicate,
) {
// read next series
continue;
}
&self.predicate,
)
{
// read next series
continue;
}
self.last_key = Some(primary_key.clone());

Expand All @@ -392,28 +399,60 @@ fn prune_primary_key(
series: &mut Series,
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: arrow::datatypes::SchemaRef,
predicate: &Predicate,
predicate: &[Arc<dyn PhysicalExpr>],
) -> bool {
// no primary key, we simply return true.
if pk_schema.fields().is_empty() {
return true;
}

if let Some(rb) = series.pk_cache.as_ref() {
let res = predicate.prune_primary_key(rb).unwrap_or(true);
let res = prune_inner(predicate, rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
res
} else {
let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else {
return true;
};
let res = predicate.prune_primary_key(&rb).unwrap_or(true);
let res = prune_inner(predicate, &rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
series.update_pk_cache(rb);
res
}
}

fn prune_inner(predicates: &[Arc<dyn PhysicalExpr>], primary_key: &RecordBatch) -> Result<bool> {
for expr in predicates {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
};
let result = match eva {
ColumnarValue::Array(array) => {
let predicate_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.next()
.unwrap_or(true)
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
_ => {
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
}
};
debug!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key, expr, result
);
if !result {
return Ok(false);
}
}
Ok(true)
}

fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &[u8],
Expand Down
9 changes: 2 additions & 7 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::{BuildPredicateSnafu, Result};
use crate::error::Result;
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
Expand Down Expand Up @@ -173,11 +172,7 @@ impl ScanRegion {
total_ssts
);

let predicate = Predicate::try_new(
self.request.filters.clone(),
self.version.metadata.schema.clone(),
)
.context(BuildPredicateSnafu)?;
let predicate = Predicate::new(self.request.filters.clone());
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ impl ParquetReaderBuilder {
&read_format,
column_ids,
);

let pruned_row_groups = predicate
.prune_with_stats(&stats)
.prune_with_stats(&stats, read_format.metadata().schema.arrow_schema())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
Expand Down
6 changes: 1 addition & 5 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,7 @@ impl ChunkReaderBuilder {
reader_builder = reader_builder.push_batch_iter(iter);
}

let predicate = Predicate::try_new(
self.filters.clone(),
self.schema.store_schema().schema().clone(),
)
.context(error::BuildPredicateSnafu)?;
let predicate = Predicate::new(self.filters.clone());

let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
Expand Down
21 changes: 8 additions & 13 deletions src/storage/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl ParquetReader {

let pruned_row_groups = self
.predicate
.prune_row_groups(builder.metadata().row_groups())
.prune_row_groups(
builder.metadata().row_groups(),
store_schema.schema().clone(),
)
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
Expand Down Expand Up @@ -549,12 +552,11 @@ mod tests {
let operator = create_object_store(dir.path().to_str().unwrap());

let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let user_schema = projected_schema.projected_user_schema().clone();
let reader = ParquetReader::new(
sst_file_handle,
operator,
projected_schema,
Predicate::empty(user_schema),
Predicate::empty(),
TimestampRange::min_to_max(),
);

Expand Down Expand Up @@ -636,12 +638,11 @@ mod tests {
let operator = create_object_store(dir.path().to_str().unwrap());

let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let user_schema = projected_schema.projected_user_schema().clone();
let reader = ParquetReader::new(
file_handle,
operator,
projected_schema,
Predicate::empty(user_schema),
Predicate::empty(),
TimestampRange::min_to_max(),
);

Expand All @@ -665,14 +666,8 @@ mod tests {
range: TimestampRange,
expect: Vec<i64>,
) {
let store_schema = schema.schema_to_read().clone();
let reader = ParquetReader::new(
file_handle,
object_store,
schema,
Predicate::empty(store_schema.schema().clone()),
range,
);
let reader =
ParquetReader::new(file_handle, object_store, schema, Predicate::empty(), range);
let mut stream = reader.chunk_stream().await.unwrap();
let result = stream.next_batch().await;

Expand Down
16 changes: 13 additions & 3 deletions src/storage/src/sst/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use datatypes::prelude::ConcreteDataType;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::schema::types::SchemaDescriptor;
use snafu::ResultExt;
use table::predicate::Predicate;

use crate::error;
use crate::error::BuildPredicateSnafu;
use crate::schema::StoreSchema;

/// Builds row filters according to predicates.
Expand Down Expand Up @@ -80,7 +82,11 @@ pub(crate) fn build_row_filter(
Box::new(PlainTimestampRowFilter::new(time_range, ts_col_projection)) as _
};
let mut predicates = vec![time_range_row_filter];
if let Ok(datafusion_filters) = predicate_to_row_filter(predicate, projection_mask) {
if let Ok(datafusion_filters) = predicate_to_row_filter(
predicate,
projection_mask,
store_schema.schema().arrow_schema(),
) {
predicates.extend(datafusion_filters);
}
let filter = RowFilter::new(predicates);
Expand All @@ -90,9 +96,13 @@ pub(crate) fn build_row_filter(
fn predicate_to_row_filter(
predicate: &Predicate,
projection_mask: ProjectionMask,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Box<dyn ArrowPredicate>>> {
let mut datafusion_predicates = Vec::with_capacity(predicate.exprs().len());
for expr in predicate.exprs() {
let physical_exprs = predicate
.to_physical_exprs(schema)
.context(BuildPredicateSnafu)?;
let mut datafusion_predicates = Vec::with_capacity(physical_exprs.len());
for expr in &physical_exprs {
datafusion_predicates.push(Box::new(DatafusionArrowPredicate {
projection_mask: projection_mask.clone(),
physical_expr: expr.clone(),
Expand Down
Loading

0 comments on commit 97897aa

Please sign in to comment.