diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 9ea4e04ec36f..6ab6f984ad05 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use api::v1::OpType; -use arc_swap::ArcSwapOption; use common_telemetry::debug; use datatypes::arrow; use datatypes::arrow::array::ArrayRef; @@ -183,7 +182,7 @@ impl Memtable for TimeSeriesMemtable { actual: kv.num_primary_keys() } ); - let primary_key_encoded = PrimaryKey::new(self.row_codec.encode(kv.primary_keys())?); + let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; let fields = kv.fields().collect::>(); allocated += fields.len() * std::mem::size_of::(); @@ -209,7 +208,7 @@ impl Memtable for TimeSeriesMemtable { fn iter( &self, projection: Option<&[ColumnId]>, - predicate: Option, + filters: Option, ) -> BoxedBatchIterator { let projection = if let Some(projection) = projection { projection.iter().copied().collect() @@ -220,7 +219,7 @@ impl Memtable for TimeSeriesMemtable { .collect() }; - Box::new(self.series_set.iter_series(projection, predicate)) + Box::new(self.series_set.iter_series(projection, filters)) } fn is_empty(&self) -> bool { @@ -258,63 +257,7 @@ impl Memtable for TimeSeriesMemtable { } } -struct PrimaryKey { - bytes: Vec, - record_batch: ArcSwapOption, -} - -impl Clone for PrimaryKey { - fn clone(&self) -> Self { - Self { - bytes: self.bytes.clone(), - record_batch: Default::default(), - } - } -} - -impl PrimaryKey { - fn new(bytes: Vec) -> Self { - Self { - bytes, - record_batch: ArcSwapOption::empty(), - } - } - - fn get_or_update_record_batch_with Result>( - &self, - mut f: F, - ) -> Result> { - if let Some(rb) = self.record_batch.load_full() { - return Ok(rb); - } - - let batch = Arc::new(f()?); - self.record_batch.store(Some(batch.clone())); - Ok(batch) - } -} - -impl Eq for PrimaryKey {} - -impl PartialEq for PrimaryKey { - fn eq(&self, other: &Self) -> bool { - self.bytes.eq(&other.bytes) - } -} - -impl PartialOrd for PrimaryKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for PrimaryKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.bytes.cmp(&other.bytes) - } -} - -type SeriesRwLockMap = RwLock>>>; +type SeriesRwLockMap = RwLock, Arc>>>; struct SeriesSet { region_metadata: RegionMetadataRef, @@ -335,7 +278,7 @@ impl SeriesSet { impl SeriesSet { /// Returns the series for given primary key, or create a new series if not already exist, /// along with the allocated memory footprint for primary keys. - fn get_or_add_series(&self, primary_key: PrimaryKey) -> (Arc>, usize) { + fn get_or_add_series(&self, primary_key: Vec) -> (Arc>, usize) { if let Some(series) = self.series.read().unwrap().get(&primary_key) { return (series.clone(), 0); }; @@ -343,7 +286,7 @@ impl SeriesSet { let mut indices = self.series.write().unwrap(); match indices.entry(primary_key) { Entry::Vacant(v) => { - let key_len = v.key().bytes.len(); + let key_len = v.key().len(); v.insert(s.clone()); (s, key_len) } @@ -397,7 +340,7 @@ struct Iter { metadata: RegionMetadataRef, series: Arc, projection: HashSet, - last_key: Option, + last_key: Option>, predicate: Option, pk_schema: arrow::datatypes::SchemaRef, primary_key_builders: Vec>, @@ -410,20 +353,22 @@ impl Iterator for Iter { fn next(&mut self) -> Option { let map = self.series.read().unwrap(); let range = match &self.last_key { - None => map.range::(..), + None => map.range::, _>(..), Some(last_key) => { - map.range::((Bound::Excluded(last_key), Bound::Unbounded)) + map.range::, _>((Bound::Excluded(last_key), Bound::Unbounded)) } }; // TODO(hl): maybe yield more than one time series to amortize range overhead. for (primary_key, series) in range { + let mut series = series.write().unwrap(); if let Some(predicate) = &self.predicate { if !prune_primary_key( &self.codec, - primary_key, + primary_key.as_slice(), + &mut series, &mut self.primary_key_builders, - &self.pk_schema, + self.pk_schema.clone(), predicate, ) { // read next series @@ -431,7 +376,8 @@ impl Iterator for Iter { } } self.last_key = Some(primary_key.clone()); - let values = series.write().unwrap().compact(&self.metadata); + + let values = series.compact(&self.metadata); return Some( values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)), ); @@ -442,9 +388,10 @@ impl Iterator for Iter { fn prune_primary_key( codec: &Arc, - pk: &PrimaryKey, + pk: &[u8], + series: &mut Series, builders: &mut Vec>, - pk_schema: &arrow::datatypes::SchemaRef, + pk_schema: arrow::datatypes::SchemaRef, predicate: &Predicate, ) -> bool { // no primary key, we simply return true. @@ -452,25 +399,26 @@ fn prune_primary_key( return true; } - let Ok(pk_record_batch) = pk.get_or_update_record_batch_with(move || { - pk_to_record_batch(codec, &pk.bytes, builders, pk_schema) - }) else { - return true; - }; - - let result = predicate.prune_primary_key(&pk_record_batch); - debug!( - "Prune primary key: {:?}, res: {:?}", - pk_record_batch, result - ); - result.unwrap_or(true) + if let Some(rb) = series.pk_cache.as_ref() { + let res = predicate.prune_primary_key(rb).unwrap_or(true); + debug!("Prune primary key: {:?}, res: {:?}", rb, res); + res + } else { + let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else { + return true; + }; + let res = predicate.prune_primary_key(&rb).unwrap_or(true); + debug!("Prune primary key: {:?}, res: {:?}", rb, res); + series.update_pk_cache(rb); + res + } } fn pk_to_record_batch( codec: &Arc, bytes: &[u8], builders: &mut Vec>, - pk_schema: &arrow::datatypes::SchemaRef, + pk_schema: arrow::datatypes::SchemaRef, ) -> Result { let pk_values = codec.decode(bytes).unwrap(); assert_eq!(builders.len(), pk_values.len()); @@ -484,11 +432,12 @@ fn pk_to_record_batch( }) .collect(); - RecordBatch::try_new(pk_schema.clone(), arrays).context(NewRecordBatchSnafu) + RecordBatch::try_new(pk_schema, arrays).context(NewRecordBatchSnafu) } /// A `Series` holds a list of field values of some given primary key. struct Series { + pk_cache: Option, active: ValueBuilder, frozen: Vec, } @@ -496,6 +445,7 @@ struct Series { impl Series { fn new(region_metadata: &RegionMetadataRef) -> Self { Self { + pk_cache: None, active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY), frozen: vec![], } @@ -506,6 +456,10 @@ impl Series { self.active.push(ts, sequence, op_type as u8, values); } + fn update_pk_cache(&mut self, pk_batch: RecordBatch) { + self.pk_cache = Some(pk_batch); + } + /// Freezes the active part and push it to `frozen`. fn freeze(&mut self, region_metadata: &RegionMetadataRef) { if self.active.len() != 0 { @@ -624,12 +578,12 @@ impl Values { /// keeps only the latest row for the same timestamp. pub fn to_batch( &self, - primary_key: &PrimaryKey, + primary_key: &[u8], metadata: &RegionMetadataRef, projection: &HashSet, ) -> Result { let builder = BatchBuilder::with_required_columns( - primary_key.bytes.clone(), + primary_key.to_vec(), self.timestamp.clone(), self.sequence.clone(), self.op_type.clone(), @@ -862,11 +816,7 @@ mod tests { }; let batch = values - .to_batch( - &PrimaryKey::new(b"test".to_vec()), - &schema, - &[0, 1, 2, 3, 4].into_iter().collect(), - ) + .to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect()) .unwrap(); check_value( &batch, @@ -968,7 +918,7 @@ mod tests { for j in i * 100..(i + 1) * 100 { let pk = j % pk_num; let primary_key = format!("pk-{}", pk).as_bytes().to_vec(); - let (series, _) = set.get_or_add_series(PrimaryKey::new(primary_key)); + let (series, _) = set.get_or_add_series(primary_key); let mut guard = series.write().unwrap(); guard.push( ts_value_ref(j as i64), @@ -991,7 +941,7 @@ mod tests { for i in 0..pk_num { let pk = format!("pk-{}", i).as_bytes().to_vec(); - let (series, _) = set.get_or_add_series(PrimaryKey::new(pk)); + let (series, _) = set.get_or_add_series(pk); let mut guard = series.write().unwrap(); let values = guard.compact(&schema).unwrap(); timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));