Skip to content

Commit

Permalink
fix: reduce memory consumption (#2117)
Browse files Browse the repository at this point in the history
* fix: reenable rocksdb record store checkpointing with an empty implementation

This flushes the in-memory log and releases memory.

* Vacuum record store on serialization

* Small fixes

Weak can become empty between `vacuum()` and `upgrade()`.

* fix: make record store num_records strictly increasing

---------

Co-authored-by: Jesse Bakker <[email protected]>
  • Loading branch information
abcpro1 and Jesse-Bakker authored Oct 3, 2023
1 parent 496d4c6 commit 26caa0a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 42 deletions.
8 changes: 2 additions & 6 deletions dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,12 @@ impl EpochManager {
let instant = SystemTime::now();
let action = if *should_commit {
let num_records = self.record_store().num_records();
if source_states.values().all(|table_states| {
table_states
.values()
.all(|&state| state != TableState::NonRestartable)
}) && (num_records - state.next_record_index_to_persist
if num_records - state.next_record_index_to_persist
>= self.options.max_num_records_before_persist
|| instant
.duration_since(state.last_persisted_epoch_decision_instant)
.unwrap_or(Duration::from_secs(0))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)
{
state.next_record_index_to_persist = num_records;
state.last_persisted_epoch_decision_instant = instant;
Expand Down
86 changes: 53 additions & 33 deletions dozer-recordstore/src/in_memory/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Weak},
};

Expand All @@ -26,8 +26,9 @@ pub struct ProcessorRecordStore {

#[derive(Debug, Default)]
struct ProcessorRecordStoreInner {
records: Vec<Weak<RecordRefInner>>,
records: BTreeMap<usize, Weak<RecordRefInner>>,
record_pointer_to_index: HashMap<usize, usize>,
idx: usize,
}

impl ProcessorRecordStore {
Expand All @@ -38,18 +39,22 @@ impl ProcessorRecordStore {
}

pub fn num_records(&self) -> usize {
self.inner.read().records.len()
self.inner.read().idx
}

pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
let records = &self.inner.read().records;
let slice = records[start..]
.iter()
.map(|record| record.upgrade().map(RecordRef))
self.inner.write().vacuum();

let inner = self.inner.read();
let slice = inner
.records
.range(start..)
// We just removed all the
.filter_map(|(&id, weak)| weak.upgrade().map(|record| (id, RecordRef(record))))
.collect::<Vec<_>>();
let data =
bincode::serialize(&slice).map_err(|e| RecordStoreError::SerializationError {
typ: "[Option<RecordRef>]",
typ: "[(usize, RecordRef)]",
reason: Box::new(e),
})?;
Ok((data, slice.len()))
Expand All @@ -65,13 +70,30 @@ impl ProcessorRecordStore {
}
}

impl ProcessorRecordStoreInner {
fn vacuum(&mut self) {
let ptr_to_idx = &mut self.record_pointer_to_index;
let records = &mut self.records;

records.retain(|_, record_ref| {
if record_ref.strong_count() == 0 {
ptr_to_idx.remove(&(record_ref.as_ptr() as usize));
false
} else {
true
}
});
}
}

impl StoreRecord for ProcessorRecordStore {
fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> {
let mut inner = self.inner.write();

let index = inner.records.len();
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
inner.records.push(Arc::downgrade(&record.0));
inner.idx += 1;
let idx = inner.idx;
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx);
inner.records.insert(idx, Arc::downgrade(&record.0));

Ok(())
}
Expand All @@ -87,35 +109,33 @@ pub struct ProcessorRecordStoreDeserializer {

#[derive(Debug)]
struct ProcessorRecordStoreDeserializerInner {
records: Vec<Option<RecordRef>>,
records: BTreeMap<usize, RecordRef>,
record_pointer_to_index: HashMap<usize, usize>,
idx: usize,
}

impl ProcessorRecordStoreDeserializer {
pub fn new() -> Result<Self, RecordStoreError> {
Ok(Self {
inner: RwLock::new(ProcessorRecordStoreDeserializerInner {
records: vec![],
records: BTreeMap::new(),
record_pointer_to_index: HashMap::new(),
idx: 0,
}),
})
}

pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> {
let slice: Vec<Option<RecordRef>> =
let slice: Vec<(usize, RecordRef)> =
bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError {
typ: "[Option<RecordRef>]",
typ: "[(usize, RecordRef)]",
reason: Box::new(e),
})?;

let mut inner = self.inner.write();

let mut index = inner.records.len();
for record in &slice {
if let Some(record) = record {
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
}
index += 1;
for (idx, record) in &slice {
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, *idx);
}

inner.records.extend(slice);
Expand All @@ -128,26 +148,25 @@ impl ProcessorRecordStoreDeserializer {
.inner
.read()
.records
.get(index as usize)
.ok_or(RecordStoreError::InMemoryRecordNotFound(index))?
.as_ref()
.get(&(index as usize))
.ok_or(RecordStoreError::InMemoryRecordNotFound(index))?
.clone())
}

pub fn into_record_store(self) -> ProcessorRecordStore {
let inner = self.inner.into_inner();
let max_idx = inner
.records
.last_key_value()
.map(|(idx, _)| *idx)
.unwrap_or(0);
ProcessorRecordStore {
inner: RwLock::new(ProcessorRecordStoreInner {
idx: max_idx,
records: inner
.records
.into_iter()
.map(|record| {
record.map_or_else(
|| Arc::downgrade(&RecordRef::new(vec![]).0),
|record| Arc::downgrade(&record.0),
)
})
.map(|(idx, record)| (idx, Arc::downgrade(&record.0)))
.collect(),
record_pointer_to_index: inner.record_pointer_to_index,
}),
Expand All @@ -159,9 +178,10 @@ impl StoreRecord for ProcessorRecordStoreDeserializer {
fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> {
let mut inner = self.inner.write();

let index = inner.records.len();
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
inner.records.push(Some(record.clone()));
inner.idx += 1;
let idx = inner.idx;
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx);
inner.records.insert(idx, record.clone());

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions dozer-recordstore/src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl ProcessorRecordStore {
.ok_or(RecordStoreError::RocksdbRecordNotFound(*record_ref))
}

pub fn serialize_slice(&self, _start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
todo!("implement rocksdb record store checkpointing")
pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
Ok((vec![], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing
}

pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> {
todo!("implement rocksdb record store checkpointing")
Ok(()) // TODO: implement rocksdb record store checkpointing
}
}

0 comments on commit 26caa0a

Please sign in to comment.