diff --git a/dozer-core/src/epoch/manager.rs b/dozer-core/src/epoch/manager.rs index 9c488c440f..e24d6200b9 100644 --- a/dozer-core/src/epoch/manager.rs +++ b/dozer-core/src/epoch/manager.rs @@ -192,16 +192,12 @@ impl EpochManager { let instant = SystemTime::now(); let action = if *should_commit { let num_records = self.record_store().num_records(); - if source_states.values().all(|table_states| { - table_states - .values() - .all(|&state| state != TableState::NonRestartable) - }) && (num_records - state.next_record_index_to_persist + if num_records - state.next_record_index_to_persist >= self.options.max_num_records_before_persist || instant .duration_since(state.last_persisted_epoch_decision_instant) .unwrap_or(Duration::from_secs(0)) - >= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)) + >= Duration::from_secs(self.options.max_interval_before_persist_in_seconds) { state.next_record_index_to_persist = num_records; state.last_persisted_epoch_decision_instant = instant; diff --git a/dozer-recordstore/src/in_memory/store.rs b/dozer-recordstore/src/in_memory/store.rs index 0dc334b5c7..bf4faafd3c 100644 --- a/dozer-recordstore/src/in_memory/store.rs +++ b/dozer-recordstore/src/in_memory/store.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{Arc, Weak}, }; @@ -26,8 +26,9 @@ pub struct ProcessorRecordStore { #[derive(Debug, Default)] struct ProcessorRecordStoreInner { - records: Vec>, + records: BTreeMap>, record_pointer_to_index: HashMap, + idx: usize, } impl ProcessorRecordStore { @@ -38,18 +39,22 @@ impl ProcessorRecordStore { } pub fn num_records(&self) -> usize { - self.inner.read().records.len() + self.inner.read().idx } pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { - let records = &self.inner.read().records; - let slice = records[start..] - .iter() - .map(|record| record.upgrade().map(RecordRef)) + self.inner.write().vacuum(); + + let inner = self.inner.read(); + let slice = inner + .records + .range(start..) + // We just removed all the + .filter_map(|(&id, weak)| weak.upgrade().map(|record| (id, RecordRef(record)))) .collect::>(); let data = bincode::serialize(&slice).map_err(|e| RecordStoreError::SerializationError { - typ: "[Option]", + typ: "[(usize, RecordRef)]", reason: Box::new(e), })?; Ok((data, slice.len())) @@ -65,13 +70,30 @@ impl ProcessorRecordStore { } } +impl ProcessorRecordStoreInner { + fn vacuum(&mut self) { + let ptr_to_idx = &mut self.record_pointer_to_index; + let records = &mut self.records; + + records.retain(|_, record_ref| { + if record_ref.strong_count() == 0 { + ptr_to_idx.remove(&(record_ref.as_ptr() as usize)); + false + } else { + true + } + }); + } +} + impl StoreRecord for ProcessorRecordStore { fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> { let mut inner = self.inner.write(); - let index = inner.records.len(); - insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index); - inner.records.push(Arc::downgrade(&record.0)); + inner.idx += 1; + let idx = inner.idx; + insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx); + inner.records.insert(idx, Arc::downgrade(&record.0)); Ok(()) } @@ -87,35 +109,33 @@ pub struct ProcessorRecordStoreDeserializer { #[derive(Debug)] struct ProcessorRecordStoreDeserializerInner { - records: Vec>, + records: BTreeMap, record_pointer_to_index: HashMap, + idx: usize, } impl ProcessorRecordStoreDeserializer { pub fn new() -> Result { Ok(Self { inner: RwLock::new(ProcessorRecordStoreDeserializerInner { - records: vec![], + records: BTreeMap::new(), record_pointer_to_index: HashMap::new(), + idx: 0, }), }) } pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> { - let slice: Vec> = + let slice: Vec<(usize, RecordRef)> = bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError { - typ: "[Option]", + typ: "[(usize, RecordRef)]", reason: Box::new(e), })?; let mut inner = self.inner.write(); - let mut index = inner.records.len(); - for record in &slice { - if let Some(record) = record { - insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index); - } - index += 1; + for (idx, record) in &slice { + insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, *idx); } inner.records.extend(slice); @@ -128,26 +148,25 @@ impl ProcessorRecordStoreDeserializer { .inner .read() .records - .get(index as usize) - .ok_or(RecordStoreError::InMemoryRecordNotFound(index))? - .as_ref() + .get(&(index as usize)) .ok_or(RecordStoreError::InMemoryRecordNotFound(index))? .clone()) } pub fn into_record_store(self) -> ProcessorRecordStore { let inner = self.inner.into_inner(); + let max_idx = inner + .records + .last_key_value() + .map(|(idx, _)| *idx) + .unwrap_or(0); ProcessorRecordStore { inner: RwLock::new(ProcessorRecordStoreInner { + idx: max_idx, records: inner .records .into_iter() - .map(|record| { - record.map_or_else( - || Arc::downgrade(&RecordRef::new(vec![]).0), - |record| Arc::downgrade(&record.0), - ) - }) + .map(|(idx, record)| (idx, Arc::downgrade(&record.0))) .collect(), record_pointer_to_index: inner.record_pointer_to_index, }), @@ -159,9 +178,10 @@ impl StoreRecord for ProcessorRecordStoreDeserializer { fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> { let mut inner = self.inner.write(); - let index = inner.records.len(); - insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index); - inner.records.push(Some(record.clone())); + inner.idx += 1; + let idx = inner.idx; + insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx); + inner.records.insert(idx, record.clone()); Ok(()) } diff --git a/dozer-recordstore/src/rocksdb.rs b/dozer-recordstore/src/rocksdb.rs index f54f28d86f..98f41230c0 100644 --- a/dozer-recordstore/src/rocksdb.rs +++ b/dozer-recordstore/src/rocksdb.rs @@ -45,11 +45,11 @@ impl ProcessorRecordStore { .ok_or(RecordStoreError::RocksdbRecordNotFound(*record_ref)) } - pub fn serialize_slice(&self, _start: usize) -> Result<(Vec, usize), RecordStoreError> { - todo!("implement rocksdb record store checkpointing") + pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { + Ok((vec![], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing } pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> { - todo!("implement rocksdb record store checkpointing") + Ok(()) // TODO: implement rocksdb record store checkpointing } }