From 42126c6a70422719e1588850214d20331e4a00db Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 24 Oct 2023 11:26:51 +0800 Subject: [PATCH] fix: cache physical expr in memtable iter --- src/mito2/src/memtable/time_series.rs | 65 +++++++++++++++++++++------ 1 file changed, 52 insertions(+), 13 deletions(-) diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 6ab6f984ad05..efa6bc9506d6 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -20,8 +20,11 @@ use std::sync::{Arc, RwLock}; use api::v1::OpType; use common_telemetry::debug; +use datafusion::physical_plan::PhysicalExpr; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue; use datatypes::arrow; -use datatypes::arrow::array::ArrayRef; +use datatypes::arrow::array::{ArrayRef, BooleanArray}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; @@ -300,12 +303,16 @@ impl SeriesSet { let (primary_key_builders, primary_key_schema) = primary_key_builders(&self.region_metadata, 1); + let physical_exprs: Vec<_> = predicate + .and_then(|p| p.to_physical_exprs(&primary_key_schema).ok()) + .unwrap_or_default(); + Iter { metadata: self.region_metadata.clone(), series: self.series.clone(), projection, last_key: None, - predicate, + predicate: physical_exprs, pk_schema: primary_key_schema, primary_key_builders, codec: self.codec.clone(), @@ -341,7 +348,7 @@ struct Iter { series: Arc, projection: HashSet, last_key: Option>, - predicate: Option, + predicate: Vec>, pk_schema: arrow::datatypes::SchemaRef, primary_key_builders: Vec>, codec: Arc, @@ -362,18 +369,18 @@ impl Iterator for Iter { // TODO(hl): maybe yield more than one time series to amortize range overhead. for (primary_key, series) in range { let mut series = series.write().unwrap(); - if let Some(predicate) = &self.predicate { - if !prune_primary_key( + if !self.predicate.is_empty() + && !prune_primary_key( &self.codec, primary_key.as_slice(), &mut series, &mut self.primary_key_builders, self.pk_schema.clone(), - predicate, - ) { - // read next series - continue; - } + &self.predicate, + ) + { + // read next series + continue; } self.last_key = Some(primary_key.clone()); @@ -392,7 +399,7 @@ fn prune_primary_key( series: &mut Series, builders: &mut Vec>, pk_schema: arrow::datatypes::SchemaRef, - predicate: &Predicate, + predicate: &[Arc], ) -> bool { // no primary key, we simply return true. if pk_schema.fields().is_empty() { @@ -400,20 +407,52 @@ fn prune_primary_key( } if let Some(rb) = series.pk_cache.as_ref() { - let res = predicate.prune_primary_key(rb).unwrap_or(true); + let res = prune_inner(predicate, rb).unwrap_or(true); debug!("Prune primary key: {:?}, res: {:?}", rb, res); res } else { let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else { return true; }; - let res = predicate.prune_primary_key(&rb).unwrap_or(true); + let res = prune_inner(predicate, &rb).unwrap_or(true); debug!("Prune primary key: {:?}, res: {:?}", rb, res); series.update_pk_cache(rb); res } } +fn prune_inner(predicates: &[Arc], primary_key: &RecordBatch) -> Result { + for expr in predicates { + // evaluate every filter against primary key + let Ok(eva) = expr.evaluate(primary_key) else { + continue; + }; + let result = match eva { + ColumnarValue::Array(array) => { + let predicate_array = array.as_any().downcast_ref::().unwrap(); + predicate_array + .into_iter() + .map(|x| x.unwrap_or(true)) + .next() + .unwrap_or(true) + } + // result was a column + ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true), + _ => { + unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key); + } + }; + debug!( + "Evaluate primary key {:?} against filter: {:?}, result: {:?}", + primary_key, expr, result + ); + if !result { + return Ok(false); + } + } + Ok(true) +} + fn pk_to_record_batch( codec: &Arc, bytes: &[u8],