Skip to content

Commit

Permalink
Vacuum record store on serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker authored and abcpro1 committed Oct 2, 2023
1 parent 151c8c1 commit 3b318ea
Showing 1 changed file with 51 additions and 31 deletions.
82 changes: 51 additions & 31 deletions dozer-recordstore/src/in_memory/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Weak},
};

Expand All @@ -26,8 +26,9 @@ pub struct ProcessorRecordStore {

#[derive(Debug, Default)]
struct ProcessorRecordStoreInner {
records: Vec<Weak<RecordRefInner>>,
records: BTreeMap<usize, Weak<RecordRefInner>>,
record_pointer_to_index: HashMap<usize, usize>,
idx: usize,
}

impl ProcessorRecordStore {
Expand All @@ -42,14 +43,18 @@ impl ProcessorRecordStore {
}

pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
let records = &self.inner.read().records;
let slice = records[start..]
.iter()
.map(|record| record.upgrade().map(RecordRef))
self.inner.write().vacuum();

let inner = self.inner.read();
let slice = inner
.records
.range(start..)
// We just removed all the
.map(|(id, record)| (id, RecordRef(record.upgrade().unwrap())))
.collect::<Vec<_>>();
let data =
bincode::serialize(&slice).map_err(|e| RecordStoreError::SerializationError {
typ: "[Option<RecordRef>]",
typ: "[RecordRef]",
reason: Box::new(e),
})?;
Ok((data, slice.len()))
Expand All @@ -65,13 +70,30 @@ impl ProcessorRecordStore {
}
}

impl ProcessorRecordStoreInner {
fn vacuum(&mut self) {
let ptr_to_idx = &mut self.record_pointer_to_index;
let records = &mut self.records;

records.retain(|_, record_ref| {
if record_ref.strong_count() == 0 {
ptr_to_idx.remove(&(record_ref.as_ptr() as usize));
false
} else {
true
}
});
}
}

impl StoreRecord for ProcessorRecordStore {
fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> {
let mut inner = self.inner.write();

let index = inner.records.len();
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
inner.records.push(Arc::downgrade(&record.0));
inner.idx += 1;
let idx = inner.idx;
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx);
inner.records.insert(idx, Arc::downgrade(&record.0));

Ok(())
}
Expand All @@ -87,35 +109,33 @@ pub struct ProcessorRecordStoreDeserializer {

#[derive(Debug)]
struct ProcessorRecordStoreDeserializerInner {
records: Vec<Option<RecordRef>>,
records: BTreeMap<usize, RecordRef>,
record_pointer_to_index: HashMap<usize, usize>,
idx: usize,
}

impl ProcessorRecordStoreDeserializer {
pub fn new() -> Result<Self, RecordStoreError> {
Ok(Self {
inner: RwLock::new(ProcessorRecordStoreDeserializerInner {
records: vec![],
records: BTreeMap::new(),
record_pointer_to_index: HashMap::new(),
idx: 0,
}),
})
}

pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> {
let slice: Vec<Option<RecordRef>> =
let slice: Vec<(usize, RecordRef)> =
bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError {
typ: "[Option<RecordRef>]",
reason: Box::new(e),
})?;

let mut inner = self.inner.write();

let mut index = inner.records.len();
for record in &slice {
if let Some(record) = record {
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
}
index += 1;
for (idx, record) in &slice {
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, *idx);
}

inner.records.extend(slice);
Expand All @@ -128,26 +148,25 @@ impl ProcessorRecordStoreDeserializer {
.inner
.read()
.records
.get(index as usize)
.ok_or(RecordStoreError::InMemoryRecordNotFound(index))?
.as_ref()
.get(&(index as usize))
.ok_or(RecordStoreError::InMemoryRecordNotFound(index))?
.clone())
}

pub fn into_record_store(self) -> ProcessorRecordStore {
let inner = self.inner.into_inner();
let max_idx = inner
.records
.last_key_value()
.map(|(idx, _)| *idx)
.unwrap_or(0);
ProcessorRecordStore {
inner: RwLock::new(ProcessorRecordStoreInner {
idx: max_idx,
records: inner
.records
.into_iter()
.map(|record| {
record.map_or_else(
|| Arc::downgrade(&RecordRef::new(vec![]).0),
|record| Arc::downgrade(&record.0),
)
})
.map(|(idx, record)| (idx, Arc::downgrade(&record.0)))
.collect(),
record_pointer_to_index: inner.record_pointer_to_index,
}),
Expand All @@ -159,9 +178,10 @@ impl StoreRecord for ProcessorRecordStoreDeserializer {
fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> {
let mut inner = self.inner.write();

let index = inner.records.len();
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index);
inner.records.push(Some(record.clone()));
inner.idx += 1;
let idx = inner.idx;
insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx);
inner.records.insert(idx, record.clone());

Ok(())
}
Expand Down

0 comments on commit 3b318ea

Please sign in to comment.