Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce memory consumption #2117

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,12 @@ impl EpochManager {
let instant = SystemTime::now();
let action = if *should_commit {
let num_records = self.record_store().num_records();
if source_states.values().all(|table_states| {
table_states
.values()
.all(|&state| state != TableState::NonRestartable)
}) && (num_records - state.next_record_index_to_persist
if num_records - state.next_record_index_to_persist
>= self.options.max_num_records_before_persist
|| instant
.duration_since(state.last_persisted_epoch_decision_instant)
.unwrap_or(Duration::from_secs(0))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)
{
state.next_record_index_to_persist = num_records;
state.last_persisted_epoch_decision_instant = instant;
Expand Down
86 changes: 53 additions & 33 deletions dozer-recordstore/src/in_memory/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Weak},
};

Expand All @@ -26,8 +26,9 @@ pub struct ProcessorRecordStore {

#[derive(Debug, Default)]
struct ProcessorRecordStoreInner {
records: Vec<Weak<RecordRefInner>>,
records: BTreeMap<usize, Weak<RecordRefInner>>,
record_pointer_to_index: HashMap<usize, usize>,
idx: usize,
}

impl ProcessorRecordStore {
Expand All @@ -38,18 +39,22 @@ impl ProcessorRecordStore {
}

pub fn num_records(&self) -> usize {
self.inner.read().records.len()
self.inner.read().idx
}

pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
let records = &self.inner.read().records;
let slice = records[start..]
.iter()
.map(|record| record.upgrade().map(RecordRef))
self.inner.write().vacuum();

let inner = self.inner.read();
let slice = inner
.records
.range(start..)
// We just removed all the
.filter_map(|(&id, weak)| weak.upgrade().map(|record| (id, RecordRef(record))))
.collect::<Vec<_>>();
let data =
bincode::serialize(&slice).map_err(|e| RecordStoreError::SerializationError {
typ: "[Option<RecordRef>]",
typ: "[(usize, RecordRef)]",
reason: Box::new(e),
})?;
Ok((data, slice.len()))
Expand All @@ -65,13 +70,30 @@ impl ProcessorRecordStore {
}
}

impl ProcessorRecordStoreInner {
fn vacuum(&mut self) {
let ptr_to_idx = &mut self.record_pointer_to_index;
let records = &mut self.records;

records.retain(|_, record_ref| {
if record_ref.strong_count() == 0 {
ptr_to_idx.remove(&(record_ref.as_ptr() as usize));
false
} else {
true
}
});
}
}

impl StoreRecord for ProcessorRecordStore {
fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> {
let mut inner = self.inner.write();

let index = inner.records.len();
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
inner.records.push(Arc::downgrade(&record.0));
inner.idx += 1;
let idx = inner.idx;
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx);
inner.records.insert(idx, Arc::downgrade(&record.0));

Ok(())
}
Expand All @@ -87,35 +109,33 @@ pub struct ProcessorRecordStoreDeserializer {

#[derive(Debug)]
struct ProcessorRecordStoreDeserializerInner {
records: Vec<Option<RecordRef>>,
records: BTreeMap<usize, RecordRef>,
record_pointer_to_index: HashMap<usize, usize>,
idx: usize,
}

impl ProcessorRecordStoreDeserializer {
pub fn new() -> Result<Self, RecordStoreError> {
Ok(Self {
inner: RwLock::new(ProcessorRecordStoreDeserializerInner {
records: vec![],
records: BTreeMap::new(),
record_pointer_to_index: HashMap::new(),
idx: 0,
}),
})
}

pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> {
let slice: Vec<Option<RecordRef>> =
let slice: Vec<(usize, RecordRef)> =
bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError {
typ: "[Option<RecordRef>]",
typ: "[(usize, RecordRef)]",
reason: Box::new(e),
})?;

let mut inner = self.inner.write();

let mut index = inner.records.len();
for record in &slice {
if let Some(record) = record {
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
}
index += 1;
for (idx, record) in &slice {
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, *idx);
}

inner.records.extend(slice);
Expand All @@ -128,26 +148,25 @@ impl ProcessorRecordStoreDeserializer {
.inner
.read()
.records
.get(index as usize)
.ok_or(RecordStoreError::InMemoryRecordNotFound(index))?
.as_ref()
.get(&(index as usize))
.ok_or(RecordStoreError::InMemoryRecordNotFound(index))?
.clone())
}

pub fn into_record_store(self) -> ProcessorRecordStore {
let inner = self.inner.into_inner();
let max_idx = inner
.records
.last_key_value()
.map(|(idx, _)| *idx)
.unwrap_or(0);
ProcessorRecordStore {
inner: RwLock::new(ProcessorRecordStoreInner {
idx: max_idx,
records: inner
.records
.into_iter()
.map(|record| {
record.map_or_else(
|| Arc::downgrade(&RecordRef::new(vec![]).0),
|record| Arc::downgrade(&record.0),
)
})
.map(|(idx, record)| (idx, Arc::downgrade(&record.0)))
.collect(),
record_pointer_to_index: inner.record_pointer_to_index,
}),
Expand All @@ -159,9 +178,10 @@ impl StoreRecord for ProcessorRecordStoreDeserializer {
fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> {
let mut inner = self.inner.write();

let index = inner.records.len();
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
inner.records.push(Some(record.clone()));
inner.idx += 1;
let idx = inner.idx;
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx);
inner.records.insert(idx, record.clone());

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions dozer-recordstore/src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl ProcessorRecordStore {
.ok_or(RecordStoreError::RocksdbRecordNotFound(*record_ref))
}

pub fn serialize_slice(&self, _start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
todo!("implement rocksdb record store checkpointing")
pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
Ok((vec![], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing
}

pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> {
todo!("implement rocksdb record store checkpointing")
Ok(()) // TODO: implement rocksdb record store checkpointing
}
}