Skip to content

Commit

Permalink
fix: revert memtable pk rb cache to rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Oct 10, 2023
1 parent 7f75190 commit dea6618
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};

use api::v1::OpType;
use arc_swap::ArcSwapOption;
use common_telemetry::debug;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
Expand Down Expand Up @@ -260,7 +259,7 @@ impl Memtable for TimeSeriesMemtable {

struct PrimaryKey {
bytes: Vec<u8>,
record_batch: ArcSwapOption<RecordBatch>,
record_batch: RwLock<Option<Arc<RecordBatch>>>,
}

impl Clone for PrimaryKey {
Expand All @@ -276,21 +275,25 @@ impl PrimaryKey {
fn new(bytes: Vec<u8>) -> Self {
Self {
bytes,
record_batch: ArcSwapOption::empty(),
record_batch: RwLock::new(None),
}
}

fn get_or_update_record_batch_with<F: FnMut() -> Result<RecordBatch>>(
&self,
mut f: F,
) -> Result<Arc<RecordBatch>> {
if let Some(rb) = self.record_batch.load_full() {
return Ok(rb);
if let Some(rb) = self.record_batch.read().unwrap().as_ref() {
return Ok(rb.clone());
}

let batch = Arc::new(f()?);
self.record_batch.store(Some(batch.clone()));
Ok(batch)
Ok(self
.record_batch
.write()
.unwrap()
.get_or_insert(batch)
.clone())
}
}

Expand Down

0 comments on commit dea6618

Please sign in to comment.