diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 17c213dfcfa4..29dde1e8b40e 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -213,3 +213,56 @@ async fn test_prune_memtable() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_prune_memtable_complex_expr() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // 0 ~ 10 in memtable + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 10), + }, + ) + .await; + + // ts filter will be ignored when pruning time series in memtable. + let filters = vec![time_range_expr(4, 7), Expr::from(col("tag_0").lt(lit("6")))]; + + let stream = engine + .handle_query( + region_id, + ScanRequest { + filters, + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index bae772db9df1..f1f53315012a 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use api::v1::OpType; -use common_telemetry::debug; +use common_telemetry::{debug, error}; use common_time::Timestamp; use datafusion::physical_plan::PhysicalExpr; use datafusion_common::ScalarValue; @@ -400,7 +400,7 @@ fn prune_primary_key( codec: &Arc, pk: &[u8], series: &mut Series, - builders: &mut Vec>, + builders: &mut [Box], pk_schema: arrow::datatypes::SchemaRef, predicate: &[Arc], ) -> bool { @@ -411,11 +411,18 @@ fn prune_primary_key( if let Some(rb) = series.pk_cache.as_ref() { let res = prune_inner(predicate, rb).unwrap_or(true); - debug!("Prune primary key: {:?}, res: {:?}", rb, res); + debug!( + "Prune primary key: {:?}, predicate: {:?}, res: {:?}", + rb, predicate, res + ); res } else { - let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else { - return true; + let rb = match pk_to_record_batch(codec, pk, builders, pk_schema) { + Ok(rb) => rb, + Err(e) => { + error!(e; "Failed to build record batch from primary keys"); + return true; + } }; let res = prune_inner(predicate, &rb).unwrap_or(true); debug!("Prune primary key: {:?}, res: {:?}", rb, res); @@ -459,11 +466,10 @@ fn prune_inner(predicates: &[Arc], primary_key: &RecordBatch) fn pk_to_record_batch( codec: &Arc, bytes: &[u8], - builders: &mut Vec>, + builders: &mut [Box], pk_schema: arrow::datatypes::SchemaRef, ) -> Result { let pk_values = codec.decode(bytes).unwrap(); - assert_eq!(builders.len(), pk_values.len()); let arrays = builders .iter_mut() diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 4dcdb1ce0303..c47b3dded219 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -38,7 +38,7 @@ use crate::predicate::stats::RowGroupPruningStatistics; mod stats; -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Predicate { /// logical exprs exprs: Vec, @@ -67,13 +67,14 @@ impl Predicate { // registering variables. let execution_props = &ExecutionProps::new(); - self.exprs + Ok(self + .exprs .iter() - .map(|expr| { + .filter_map(|expr| { create_physical_expr(expr.df_expr(), df_schema.as_ref(), schema, execution_props) + .ok() }) - .collect::>() - .context(error::DatafusionSnafu) + .collect::>()) } /// Builds an empty predicate from given schema. @@ -749,4 +750,21 @@ mod tests { .or(datafusion_expr::Expr::Column(Column::from_name("cnt")).lt(20.lit())); assert_prune(40, vec![e.into()], vec![true, true, false, true]).await; } + + #[tokio::test] + async fn test_to_physical_expr() { + let predicate = Predicate::new(vec![ + Expr::from(col("host").eq(lit("host_a"))), + Expr::from(col("ts").gt(lit(ScalarValue::TimestampMicrosecond(Some(123), None)))), + ]); + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![Field::new( + "host", + arrow::datatypes::DataType::Utf8, + false, + )])); + + let predicates = predicate.to_physical_exprs(&schema).unwrap(); + assert!(!predicates.is_empty()); + } }