diff --git a/Cargo.lock b/Cargo.lock index 8456bab511e4..da7ff9461400 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5500,6 +5500,7 @@ dependencies = [ "anymap", "api", "aquamarine", + "arc-swap", "async-channel", "async-compat", "async-stream", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 55795df28bbe..851b0f02a893 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -12,6 +12,7 @@ test = ["common-test-util"] anymap = "1.0.0-beta.2" api.workspace = true aquamarine.workspace = true +arc-swap = "1.6" async-channel = "1.9" async-compat = "0.2" async-stream.workspace = true diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index cc1bf0544d01..1417bf352c10 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -307,7 +307,7 @@ impl RegionFlushTask { } let file_id = FileId::random(); - let iter = mem.iter(None, &[]); + let iter = mem.iter(None, None); let source = Source::Iter(iter); let mut writer = self .access_layer diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index bb82fb60d85d..0ced4f547282 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -23,11 +23,11 @@ use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use common_query::logical_plan::Expr; use common_time::Timestamp; use metrics::{decrement_gauge, increment_gauge}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; +use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; @@ -73,7 +73,11 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Scans the memtable. /// `projection` selects columns to read, `None` means reading all columns. /// `filters` are the predicates to be pushed down to memtable. - fn iter(&self, projection: Option<&[ColumnId]>, filters: &[Expr]) -> BoxedBatchIterator; + fn iter( + &self, + projection: Option<&[ColumnId]>, + predicate: Option, + ) -> BoxedBatchIterator; /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index b5ab52447234..9ea4e04ec36f 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -19,9 +19,11 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use api::v1::OpType; -use common_query::logical_plan::Expr; +use arc_swap::ArcSwapOption; +use common_telemetry::debug; use datatypes::arrow; use datatypes::arrow::array::ArrayRef; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::value::ValueRef; @@ -31,8 +33,12 @@ use datatypes::vectors::{ use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; +use table::predicate::Predicate; -use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; +use crate::error::{ + ComputeArrowSnafu, ConvertVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu, + Result, +}; use crate::flush::WriteBufferManagerRef; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, @@ -76,7 +82,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder { pub struct TimeSeriesMemtable { id: MemtableId, region_metadata: RegionMetadataRef, - row_codec: McmpRowCodec, + row_codec: Arc, series_set: SeriesSet, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, @@ -89,13 +95,13 @@ impl TimeSeriesMemtable { id: MemtableId, write_buffer_manager: Option, ) -> Self { - let row_codec = McmpRowCodec::new( + let row_codec = Arc::new(McmpRowCodec::new( region_metadata .primary_key_columns() .map(|c| SortField::new(c.column_schema.data_type.clone())) .collect(), - ); - let series_set = SeriesSet::new(region_metadata.clone()); + )); + let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone()); Self { id, region_metadata, @@ -177,7 +183,7 @@ impl Memtable for TimeSeriesMemtable { actual: kv.num_primary_keys() } ); - let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; + let primary_key_encoded = PrimaryKey::new(self.row_codec.encode(kv.primary_keys())?); let fields = kv.fields().collect::>(); allocated += fields.len() * std::mem::size_of::(); @@ -200,7 +206,11 @@ impl Memtable for TimeSeriesMemtable { Ok(()) } - fn iter(&self, projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { + fn iter( + &self, + projection: Option<&[ColumnId]>, + predicate: Option, + ) -> BoxedBatchIterator { let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { @@ -210,7 +220,7 @@ impl Memtable for TimeSeriesMemtable { .collect() }; - Box::new(self.series_set.iter_series(projection)) + Box::new(self.series_set.iter_series(projection, predicate)) } fn is_empty(&self) -> bool { @@ -248,18 +258,76 @@ impl Memtable for TimeSeriesMemtable { } } -type SeriesRwLockMap = RwLock, Arc>>>; +struct PrimaryKey { + bytes: Vec, + record_batch: ArcSwapOption, +} + +impl Clone for PrimaryKey { + fn clone(&self) -> Self { + Self { + bytes: self.bytes.clone(), + record_batch: Default::default(), + } + } +} + +impl PrimaryKey { + fn new(bytes: Vec) -> Self { + Self { + bytes, + record_batch: ArcSwapOption::empty(), + } + } + + fn get_or_update_record_batch_with Result>( + &self, + mut f: F, + ) -> Result> { + if let Some(rb) = self.record_batch.load_full() { + return Ok(rb); + } + + let batch = Arc::new(f()?); + self.record_batch.store(Some(batch.clone())); + Ok(batch) + } +} + +impl Eq for PrimaryKey {} + +impl PartialEq for PrimaryKey { + fn eq(&self, other: &Self) -> bool { + self.bytes.eq(&other.bytes) + } +} + +impl PartialOrd for PrimaryKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PrimaryKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.bytes.cmp(&other.bytes) + } +} + +type SeriesRwLockMap = RwLock>>>; struct SeriesSet { region_metadata: RegionMetadataRef, series: Arc, + codec: Arc, } impl SeriesSet { - fn new(region_metadata: RegionMetadataRef) -> Self { + fn new(region_metadata: RegionMetadataRef, codec: Arc) -> Self { Self { region_metadata, series: Default::default(), + codec, } } } @@ -267,7 +335,7 @@ impl SeriesSet { impl SeriesSet { /// Returns the series for given primary key, or create a new series if not already exist, /// along with the allocated memory footprint for primary keys. - fn get_or_add_series(&self, primary_key: Vec) -> (Arc>, usize) { + fn get_or_add_series(&self, primary_key: PrimaryKey) -> (Arc>, usize) { if let Some(series) = self.series.read().unwrap().get(&primary_key) { return (series.clone(), 0); }; @@ -275,7 +343,7 @@ impl SeriesSet { let mut indices = self.series.write().unwrap(); match indices.entry(primary_key) { Entry::Vacant(v) => { - let key_len = v.key().len(); + let key_len = v.key().bytes.len(); v.insert(s.clone()); (s, key_len) } @@ -285,21 +353,55 @@ impl SeriesSet { } /// Iterates all series in [SeriesSet]. - fn iter_series(&self, projection: HashSet) -> Iter { + fn iter_series(&self, projection: HashSet, predicate: Option) -> Iter { + let (primary_key_builders, primary_key_schema) = + primary_key_builders(&self.region_metadata, 1); + Iter { metadata: self.region_metadata.clone(), series: self.series.clone(), projection, last_key: None, + predicate, + pk_schema: primary_key_schema, + primary_key_builders, + codec: self.codec.clone(), } } } +/// Creates primary key array builders and arrow's schema for primary keys of given region schema. +fn primary_key_builders( + region_metadata: &RegionMetadataRef, + num_pk_rows: usize, +) -> (Vec>, arrow::datatypes::SchemaRef) { + let (builders, fields): (_, Vec<_>) = region_metadata + .primary_key_columns() + .map(|pk| { + ( + pk.column_schema + .data_type + .create_mutable_vector(num_pk_rows), + arrow::datatypes::Field::new( + pk.column_schema.name.clone(), + pk.column_schema.data_type.as_arrow_type(), + pk.column_schema.is_nullable(), + ), + ) + }) + .unzip(); + (builders, Arc::new(arrow::datatypes::Schema::new(fields))) +} + struct Iter { metadata: RegionMetadataRef, series: Arc, projection: HashSet, - last_key: Option>, + last_key: Option, + predicate: Option, + pk_schema: arrow::datatypes::SchemaRef, + primary_key_builders: Vec>, + codec: Arc, } impl Iterator for Iter { @@ -307,21 +409,82 @@ impl Iterator for Iter { fn next(&mut self) -> Option { let map = self.series.read().unwrap(); - let mut range = match &self.last_key { - None => map.range::, _>(..), + let range = match &self.last_key { + None => map.range::(..), Some(last_key) => { - map.range::, _>((Bound::Excluded(last_key), Bound::Unbounded)) + map.range::((Bound::Excluded(last_key), Bound::Unbounded)) } }; - if let Some((primary_key, series)) = range.next() { + // TODO(hl): maybe yield more than one time series to amortize range overhead. + for (primary_key, series) in range { + if let Some(predicate) = &self.predicate { + if !prune_primary_key( + &self.codec, + primary_key, + &mut self.primary_key_builders, + &self.pk_schema, + predicate, + ) { + // read next series + continue; + } + } self.last_key = Some(primary_key.clone()); let values = series.write().unwrap().compact(&self.metadata); - Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection))) - } else { - None + return Some( + values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)), + ); } + None + } +} + +fn prune_primary_key( + codec: &Arc, + pk: &PrimaryKey, + builders: &mut Vec>, + pk_schema: &arrow::datatypes::SchemaRef, + predicate: &Predicate, +) -> bool { + // no primary key, we simply return true. + if pk_schema.fields().is_empty() { + return true; } + + let Ok(pk_record_batch) = pk.get_or_update_record_batch_with(move || { + pk_to_record_batch(codec, &pk.bytes, builders, pk_schema) + }) else { + return true; + }; + + let result = predicate.prune_primary_key(&pk_record_batch); + debug!( + "Prune primary key: {:?}, res: {:?}", + pk_record_batch, result + ); + result.unwrap_or(true) +} + +fn pk_to_record_batch( + codec: &Arc, + bytes: &[u8], + builders: &mut Vec>, + pk_schema: &arrow::datatypes::SchemaRef, +) -> Result { + let pk_values = codec.decode(bytes).unwrap(); + assert_eq!(builders.len(), pk_values.len()); + + let arrays = builders + .iter_mut() + .zip(pk_values.iter()) + .map(|(builder, pk_value)| { + builder.push_value_ref(pk_value.as_value_ref()); + builder.to_vector().to_arrow_array() + }) + .collect(); + + RecordBatch::try_new(pk_schema.clone(), arrays).context(NewRecordBatchSnafu) } /// A `Series` holds a list of field values of some given primary key. @@ -461,12 +624,12 @@ impl Values { /// keeps only the latest row for the same timestamp. pub fn to_batch( &self, - primary_key: &[u8], + primary_key: &PrimaryKey, metadata: &RegionMetadataRef, projection: &HashSet, ) -> Result { let builder = BatchBuilder::with_required_columns( - primary_key.to_vec(), + primary_key.bytes.clone(), self.timestamp.clone(), self.sequence.clone(), self.op_type.clone(), @@ -699,7 +862,11 @@ mod tests { }; let batch = values - .to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect()) + .to_batch( + &PrimaryKey::new(b"test".to_vec()), + &schema, + &[0, 1, 2, 3, 4].into_iter().collect(), + ) .unwrap(); check_value( &batch, @@ -784,7 +951,13 @@ mod tests { #[test] fn test_series_set_concurrency() { let schema = schema_for_test(); - let set = Arc::new(SeriesSet::new(schema.clone())); + let row_codec = Arc::new(McmpRowCodec::new( + schema + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + )); + let set = Arc::new(SeriesSet::new(schema.clone(), row_codec)); let concurrency = 32; let pk_num = concurrency * 2; @@ -795,7 +968,7 @@ mod tests { for j in i * 100..(i + 1) * 100 { let pk = j % pk_num; let primary_key = format!("pk-{}", pk).as_bytes().to_vec(); - let (series, _) = set.get_or_add_series(primary_key); + let (series, _) = set.get_or_add_series(PrimaryKey::new(primary_key)); let mut guard = series.write().unwrap(); guard.push( ts_value_ref(j as i64), @@ -818,7 +991,7 @@ mod tests { for i in 0..pk_num { let pk = format!("pk-{}", i).as_bytes().to_vec(); - let (series, _) = set.get_or_add_series(pk); + let (series, _) = set.get_or_add_series(PrimaryKey::new(pk)); let mut guard = series.write().unwrap(); let values = guard.compact(&schema).unwrap(); timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64)); @@ -866,7 +1039,7 @@ mod tests { .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) .collect::>(); - let iter = memtable.iter(None, &[]); + let iter = memtable.iter(None, None); let read = iter .flat_map(|batch| { batch @@ -892,7 +1065,7 @@ mod tests { let memtable = TimeSeriesMemtable::new(schema, 42, None); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(Some(&[3]), &[]); + let iter = memtable.iter(Some(&[3]), None); let mut v0_all = vec![]; diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index abe95b8db2f6..805e8d8df9e5 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -135,8 +135,7 @@ impl SeqScan { // Scans all memtables and SSTs. Builds a merge reader to merge results. let mut builder = MergeReaderBuilder::new(); for mem in &self.memtables { - // TODO(hl): pass filters once memtable supports filter pushdown. - let iter = mem.iter(Some(self.mapper.column_ids()), &[]); + let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone()); builder.push_batch_iter(iter); } for file in &self.files { diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 4b6c4142e753..22dca01156e7 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -17,9 +17,9 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use common_query::logical_plan::Expr; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; +use table::predicate::Predicate; use crate::error::Result; use crate::memtable::{ @@ -50,7 +50,11 @@ impl Memtable for EmptyMemtable { Ok(()) } - fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { + fn iter( + &self, + _projection: Option<&[ColumnId]>, + _filters: Option, + ) -> BoxedBatchIterator { Box::new(std::iter::empty()) } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index e4d5c24b0c6a..cfb3b066a8d6 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -15,17 +15,19 @@ use std::sync::Arc; use common_query::logical_plan::{DfExpr, Expr}; -use common_telemetry::{error, warn}; +use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::parquet::file::metadata::RowGroupMetaData; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use datafusion_common::ToDFSchema; +use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::expr::InList; -use datafusion_expr::{Between, BinaryExpr, Operator}; +use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator}; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; +use datatypes::arrow::array::BooleanArray; use datatypes::schema::SchemaRef; use datatypes::value::scalar_value_to_timestamp; use snafu::ResultExt; @@ -119,6 +121,39 @@ impl Predicate { res } + /// Prunes primary keys + pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result { + for expr in &self.exprs { + // evaluate every filter against primary key + let Ok(eva) = expr.evaluate(primary_key) else { + continue; + }; + let result = match eva { + ColumnarValue::Array(array) => { + let predicate_array = array.as_any().downcast_ref::().unwrap(); + predicate_array + .into_iter() + .map(|x| x.unwrap_or(true)) + .next() + .unwrap_or(true) + } + // result was a column + ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true), + _ => { + unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key); + } + }; + debug!( + "Evaluate primary key {:?} against filter: {:?}, result: {:?}", + primary_key, expr, result + ); + if !result { + return Ok(false); + } + } + Ok(true) + } + /// Evaluates the predicate against the `stats`. /// Returns a vector of boolean values, among which `false` means the row group can be skipped. pub fn prune_with_stats(&self, stats: &S) -> Vec {