Skip to content

Commit

Permalink
fix: revert memtable pk rb cache to rwlock (#2565)
Browse files Browse the repository at this point in the history
* fix: revert memtable pk rb cache to rwlock

* feat: refine
  • Loading branch information
v0y4g3r authored Oct 10, 2023
1 parent 7f75190 commit c9c2b3c
Showing 1 changed file with 44 additions and 94 deletions.
138 changes: 44 additions & 94 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};

use api::v1::OpType;
use arc_swap::ArcSwapOption;
use common_telemetry::debug;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
Expand Down Expand Up @@ -183,7 +182,7 @@ impl Memtable for TimeSeriesMemtable {
actual: kv.num_primary_keys()
}
);
let primary_key_encoded = PrimaryKey::new(self.row_codec.encode(kv.primary_keys())?);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();

allocated += fields.len() * std::mem::size_of::<ValueRef>();
Expand All @@ -209,7 +208,7 @@ impl Memtable for TimeSeriesMemtable {
fn iter(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
filters: Option<Predicate>,
) -> BoxedBatchIterator {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
Expand All @@ -220,7 +219,7 @@ impl Memtable for TimeSeriesMemtable {
.collect()
};

Box::new(self.series_set.iter_series(projection, predicate))
Box::new(self.series_set.iter_series(projection, filters))
}

fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -258,63 +257,7 @@ impl Memtable for TimeSeriesMemtable {
}
}

struct PrimaryKey {
bytes: Vec<u8>,
record_batch: ArcSwapOption<RecordBatch>,
}

impl Clone for PrimaryKey {
fn clone(&self) -> Self {
Self {
bytes: self.bytes.clone(),
record_batch: Default::default(),
}
}
}

impl PrimaryKey {
fn new(bytes: Vec<u8>) -> Self {
Self {
bytes,
record_batch: ArcSwapOption::empty(),
}
}

fn get_or_update_record_batch_with<F: FnMut() -> Result<RecordBatch>>(
&self,
mut f: F,
) -> Result<Arc<RecordBatch>> {
if let Some(rb) = self.record_batch.load_full() {
return Ok(rb);
}

let batch = Arc::new(f()?);
self.record_batch.store(Some(batch.clone()));
Ok(batch)
}
}

impl Eq for PrimaryKey {}

impl PartialEq<Self> for PrimaryKey {
fn eq(&self, other: &Self) -> bool {
self.bytes.eq(&other.bytes)
}
}

impl PartialOrd<Self> for PrimaryKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for PrimaryKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.bytes.cmp(&other.bytes)
}
}

type SeriesRwLockMap = RwLock<BTreeMap<PrimaryKey, Arc<RwLock<Series>>>>;
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;

struct SeriesSet {
region_metadata: RegionMetadataRef,
Expand All @@ -335,15 +278,15 @@ impl SeriesSet {
impl SeriesSet {
/// Returns the series for given primary key, or create a new series if not already exist,
/// along with the allocated memory footprint for primary keys.
fn get_or_add_series(&self, primary_key: PrimaryKey) -> (Arc<RwLock<Series>>, usize) {
fn get_or_add_series(&self, primary_key: Vec<u8>) -> (Arc<RwLock<Series>>, usize) {
if let Some(series) = self.series.read().unwrap().get(&primary_key) {
return (series.clone(), 0);
};
let s = Arc::new(RwLock::new(Series::new(&self.region_metadata)));
let mut indices = self.series.write().unwrap();
match indices.entry(primary_key) {
Entry::Vacant(v) => {
let key_len = v.key().bytes.len();
let key_len = v.key().len();
v.insert(s.clone());
(s, key_len)
}
Expand Down Expand Up @@ -397,7 +340,7 @@ struct Iter {
metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
last_key: Option<PrimaryKey>,
last_key: Option<Vec<u8>>,
predicate: Option<Predicate>,
pk_schema: arrow::datatypes::SchemaRef,
primary_key_builders: Vec<Box<dyn MutableVector>>,
Expand All @@ -410,28 +353,31 @@ impl Iterator for Iter {
fn next(&mut self) -> Option<Self::Item> {
let map = self.series.read().unwrap();
let range = match &self.last_key {
None => map.range::<PrimaryKey, _>(..),
None => map.range::<Vec<u8>, _>(..),
Some(last_key) => {
map.range::<PrimaryKey, _>((Bound::Excluded(last_key), Bound::Unbounded))
map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
}
};

// TODO(hl): maybe yield more than one time series to amortize range overhead.
for (primary_key, series) in range {
let mut series = series.write().unwrap();
if let Some(predicate) = &self.predicate {
if !prune_primary_key(
&self.codec,
primary_key,
primary_key.as_slice(),
&mut series,
&mut self.primary_key_builders,
&self.pk_schema,
self.pk_schema.clone(),
predicate,
) {
// read next series
continue;
}
}
self.last_key = Some(primary_key.clone());
let values = series.write().unwrap().compact(&self.metadata);

let values = series.compact(&self.metadata);
return Some(
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)),
);
Expand All @@ -442,35 +388,37 @@ impl Iterator for Iter {

fn prune_primary_key(
codec: &Arc<McmpRowCodec>,
pk: &PrimaryKey,
pk: &[u8],
series: &mut Series,
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: &arrow::datatypes::SchemaRef,
pk_schema: arrow::datatypes::SchemaRef,
predicate: &Predicate,
) -> bool {
// no primary key, we simply return true.
if pk_schema.fields().is_empty() {
return true;
}

let Ok(pk_record_batch) = pk.get_or_update_record_batch_with(move || {
pk_to_record_batch(codec, &pk.bytes, builders, pk_schema)
}) else {
return true;
};

let result = predicate.prune_primary_key(&pk_record_batch);
debug!(
"Prune primary key: {:?}, res: {:?}",
pk_record_batch, result
);
result.unwrap_or(true)
if let Some(rb) = series.pk_cache.as_ref() {
let res = predicate.prune_primary_key(rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
res
} else {
let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else {
return true;
};
let res = predicate.prune_primary_key(&rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
series.update_pk_cache(rb);
res
}
}

fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &[u8],
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: &arrow::datatypes::SchemaRef,
pk_schema: arrow::datatypes::SchemaRef,
) -> Result<RecordBatch> {
let pk_values = codec.decode(bytes).unwrap();
assert_eq!(builders.len(), pk_values.len());
Expand All @@ -484,18 +432,20 @@ fn pk_to_record_batch(
})
.collect();

RecordBatch::try_new(pk_schema.clone(), arrays).context(NewRecordBatchSnafu)
RecordBatch::try_new(pk_schema, arrays).context(NewRecordBatchSnafu)
}

/// A `Series` holds a list of field values of some given primary key.
struct Series {
pk_cache: Option<RecordBatch>,
active: ValueBuilder,
frozen: Vec<Values>,
}

impl Series {
fn new(region_metadata: &RegionMetadataRef) -> Self {
Self {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
frozen: vec![],
}
Expand All @@ -506,6 +456,10 @@ impl Series {
self.active.push(ts, sequence, op_type as u8, values);
}

fn update_pk_cache(&mut self, pk_batch: RecordBatch) {
self.pk_cache = Some(pk_batch);
}

/// Freezes the active part and push it to `frozen`.
fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
if self.active.len() != 0 {
Expand Down Expand Up @@ -624,12 +578,12 @@ impl Values {
/// keeps only the latest row for the same timestamp.
pub fn to_batch(
&self,
primary_key: &PrimaryKey,
primary_key: &[u8],
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
) -> Result<Batch> {
let builder = BatchBuilder::with_required_columns(
primary_key.bytes.clone(),
primary_key.to_vec(),
self.timestamp.clone(),
self.sequence.clone(),
self.op_type.clone(),
Expand Down Expand Up @@ -862,11 +816,7 @@ mod tests {
};

let batch = values
.to_batch(
&PrimaryKey::new(b"test".to_vec()),
&schema,
&[0, 1, 2, 3, 4].into_iter().collect(),
)
.to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect())
.unwrap();
check_value(
&batch,
Expand Down Expand Up @@ -968,7 +918,7 @@ mod tests {
for j in i * 100..(i + 1) * 100 {
let pk = j % pk_num;
let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
let (series, _) = set.get_or_add_series(PrimaryKey::new(primary_key));
let (series, _) = set.get_or_add_series(primary_key);
let mut guard = series.write().unwrap();
guard.push(
ts_value_ref(j as i64),
Expand All @@ -991,7 +941,7 @@ mod tests {

for i in 0..pk_num {
let pk = format!("pk-{}", i).as_bytes().to_vec();
let (series, _) = set.get_or_add_series(PrimaryKey::new(pk));
let (series, _) = set.get_or_add_series(pk);
let mut guard = series.write().unwrap();
let values = guard.compact(&schema).unwrap();
timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
Expand Down

0 comments on commit c9c2b3c

Please sign in to comment.