Skip to content

Commit

Permalink
fix: remove prediactes once they have been added to timestamp filters…
Browse files Browse the repository at this point in the history
… to avoid duplicate filtering
  • Loading branch information
v0y4g3r committed May 17, 2024
1 parent 253110d commit def4ede
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 17 deletions.
19 changes: 10 additions & 9 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};

async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
async fn check_prune_row_groups(exprs: Vec<DfExpr>, expected: &str) {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

Expand Down Expand Up @@ -56,7 +56,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
.handle_query(
region_id,
ScanRequest {
filters: vec![Expr::from(expr)],
filters: exprs.into_iter().map(Expr::from).collect(),
..Default::default()
},
)
Expand All @@ -71,7 +71,9 @@ async fn test_read_parquet_stats() {
common_telemetry::init_default_ut_logging();

check_prune_row_groups(
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))),
vec![
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None)))
],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
Expand All @@ -95,7 +97,7 @@ async fn test_read_parquet_stats() {
async fn test_prune_tag() {
// prune result: only row group 1&2
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
Expand All @@ -115,18 +117,17 @@ async fn test_prune_tag_and_field() {
common_telemetry::init_default_ut_logging();
// prune result: only row group 1
check_prune_row_groups(
col("tag_0")
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
.and(col("field_0").lt(lit(8.0))),
vec![
col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
col("field_0").lt(lit(8.0)),
],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
Expand Down
9 changes: 7 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Instant;

use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
use store_api::storage::ScanRequest;
use table::predicate::{build_time_range_predicate, Predicate};
Expand Down Expand Up @@ -269,6 +269,7 @@ impl ScanRegion {
);

let index_applier = self.build_index_applier();
info!("Request filters: {:?}", self.request.filters);
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Expand Down Expand Up @@ -299,7 +300,11 @@ impl ScanRegion {
.as_timestamp()
.expect("Time index must have timestamp-compatible type")
.unit();
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
build_time_range_predicate(
&time_index.column_schema.name,
unit,
&mut self.request.filters,
)
}

/// Use the latest schema to build the index applier.
Expand Down
4 changes: 2 additions & 2 deletions src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ struct TimeRangeTester {
impl TimeRangeTester {
async fn check(&self, sql: &str, expect: TimestampRange) {
let _ = exec_selection(self.engine.clone(), sql).await;
let filters = self.get_filters();
let mut filters = self.get_filters();

let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &filters);
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters);
assert_eq!(expect, range);
}

Expand Down
11 changes: 7 additions & 4 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,19 @@ impl Predicate {
pub fn build_time_range_predicate<'a>(
ts_col_name: &'a str,
ts_col_unit: TimeUnit,
filters: &'a [Expr],
filters: &'a mut Vec<Expr>,
) -> TimestampRange {
let mut res = TimestampRange::min_to_max();

for expr in filters {
let mut filters_remain = vec![];
for expr in std::mem::take(filters) {
if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr.df_expr())
{
res = res.and(&range);
} else {
filters_remain.push(expr);
}
}
*filters = filters_remain;
res
}

Expand Down Expand Up @@ -394,7 +397,7 @@ mod tests {
fn check_build_predicate(expr: DfExpr, expect: TimestampRange) {
assert_eq!(
expect,
build_time_range_predicate("ts", TimeUnit::Millisecond, &[Expr::from(expr)])
build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![Expr::from(expr)])
);
}

Expand Down
2 changes: 2 additions & 0 deletions src/table/src/table/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::{Arc, Mutex};

use common_query::logical_plan::Expr;
use common_recordbatch::OrderOption;
use common_telemetry::info;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::datasource::{TableProvider, TableType as DfTableType};
use datafusion::error::Result as DfResult;
Expand Down Expand Up @@ -85,6 +86,7 @@ impl TableProvider for DfTableProviderAdapter {
limit: Option<usize>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let filters: Vec<Expr> = filters.iter().map(Clone::clone).map(Into::into).collect();
info!("Filters: {:?}", filters);
let request = {
let mut request = self.scan_req.lock().unwrap();
request.filters = filters;
Expand Down

0 comments on commit def4ede

Please sign in to comment.