From 151c8c1f57eed01c76a1a487054e60a35b656062 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Mon, 2 Oct 2023 05:48:03 +0000 Subject: [PATCH 1/4] fix: reenable rocksdb record store checkpointing with an empty implementation This flushes the in-memory log and releases memory. --- dozer-core/src/epoch/manager.rs | 8 ++------ dozer-recordstore/src/rocksdb.rs | 6 +++--- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/dozer-core/src/epoch/manager.rs b/dozer-core/src/epoch/manager.rs index 9c488c440f..e24d6200b9 100644 --- a/dozer-core/src/epoch/manager.rs +++ b/dozer-core/src/epoch/manager.rs @@ -192,16 +192,12 @@ impl EpochManager { let instant = SystemTime::now(); let action = if *should_commit { let num_records = self.record_store().num_records(); - if source_states.values().all(|table_states| { - table_states - .values() - .all(|&state| state != TableState::NonRestartable) - }) && (num_records - state.next_record_index_to_persist + if num_records - state.next_record_index_to_persist >= self.options.max_num_records_before_persist || instant .duration_since(state.last_persisted_epoch_decision_instant) .unwrap_or(Duration::from_secs(0)) - >= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)) + >= Duration::from_secs(self.options.max_interval_before_persist_in_seconds) { state.next_record_index_to_persist = num_records; state.last_persisted_epoch_decision_instant = instant; diff --git a/dozer-recordstore/src/rocksdb.rs b/dozer-recordstore/src/rocksdb.rs index f54f28d86f..3597b7c227 100644 --- a/dozer-recordstore/src/rocksdb.rs +++ b/dozer-recordstore/src/rocksdb.rs @@ -45,11 +45,11 @@ impl ProcessorRecordStore { .ok_or(RecordStoreError::RocksdbRecordNotFound(*record_ref)) } - pub fn serialize_slice(&self, _start: usize) -> Result<(Vec, usize), RecordStoreError> { - todo!("implement rocksdb record store checkpointing") + pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { + Ok((vec![0], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing } pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> { - todo!("implement rocksdb record store checkpointing") + Ok(()) // TODO: implement rocksdb record store checkpointing } } From 3b318ea114f1cd48d48ecae4a561cc465d795022 Mon Sep 17 00:00:00 2001 From: Jesse Bakker Date: Mon, 2 Oct 2023 17:38:53 +0200 Subject: [PATCH 2/4] Vacuum record store on serialization --- dozer-recordstore/src/in_memory/store.rs | 82 +++++++++++++++--------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/dozer-recordstore/src/in_memory/store.rs b/dozer-recordstore/src/in_memory/store.rs index 0dc334b5c7..169e3ed7ed 100644 --- a/dozer-recordstore/src/in_memory/store.rs +++ b/dozer-recordstore/src/in_memory/store.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{Arc, Weak}, }; @@ -26,8 +26,9 @@ pub struct ProcessorRecordStore { #[derive(Debug, Default)] struct ProcessorRecordStoreInner { - records: Vec>, + records: BTreeMap>, record_pointer_to_index: HashMap, + idx: usize, } impl ProcessorRecordStore { @@ -42,14 +43,18 @@ impl ProcessorRecordStore { } pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { - let records = &self.inner.read().records; - let slice = records[start..] - .iter() - .map(|record| record.upgrade().map(RecordRef)) + self.inner.write().vacuum(); + + let inner = self.inner.read(); + let slice = inner + .records + .range(start..) + // We just removed all the + .map(|(id, record)| (id, RecordRef(record.upgrade().unwrap()))) .collect::>(); let data = bincode::serialize(&slice).map_err(|e| RecordStoreError::SerializationError { - typ: "[Option]", + typ: "[RecordRef]", reason: Box::new(e), })?; Ok((data, slice.len())) @@ -65,13 +70,30 @@ impl ProcessorRecordStore { } } +impl ProcessorRecordStoreInner { + fn vacuum(&mut self) { + let ptr_to_idx = &mut self.record_pointer_to_index; + let records = &mut self.records; + + records.retain(|_, record_ref| { + if record_ref.strong_count() == 0 { + ptr_to_idx.remove(&(record_ref.as_ptr() as usize)); + false + } else { + true + } + }); + } +} + impl StoreRecord for ProcessorRecordStore { fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> { let mut inner = self.inner.write(); - let index = inner.records.len(); - insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index); - inner.records.push(Arc::downgrade(&record.0)); + inner.idx += 1; + let idx = inner.idx; + insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx); + inner.records.insert(idx, Arc::downgrade(&record.0)); Ok(()) } @@ -87,22 +109,24 @@ pub struct ProcessorRecordStoreDeserializer { #[derive(Debug)] struct ProcessorRecordStoreDeserializerInner { - records: Vec>, + records: BTreeMap, record_pointer_to_index: HashMap, + idx: usize, } impl ProcessorRecordStoreDeserializer { pub fn new() -> Result { Ok(Self { inner: RwLock::new(ProcessorRecordStoreDeserializerInner { - records: vec![], + records: BTreeMap::new(), record_pointer_to_index: HashMap::new(), + idx: 0, }), }) } pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> { - let slice: Vec> = + let slice: Vec<(usize, RecordRef)> = bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError { typ: "[Option]", reason: Box::new(e), @@ -110,12 +134,8 @@ impl ProcessorRecordStoreDeserializer { let mut inner = self.inner.write(); - let mut index = inner.records.len(); - for record in &slice { - if let Some(record) = record { - insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index); - } - index += 1; + for (idx, record) in &slice { + insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, *idx); } inner.records.extend(slice); @@ -128,26 +148,25 @@ impl ProcessorRecordStoreDeserializer { .inner .read() .records - .get(index as usize) - .ok_or(RecordStoreError::InMemoryRecordNotFound(index))? - .as_ref() + .get(&(index as usize)) .ok_or(RecordStoreError::InMemoryRecordNotFound(index))? .clone()) } pub fn into_record_store(self) -> ProcessorRecordStore { let inner = self.inner.into_inner(); + let max_idx = inner + .records + .last_key_value() + .map(|(idx, _)| *idx) + .unwrap_or(0); ProcessorRecordStore { inner: RwLock::new(ProcessorRecordStoreInner { + idx: max_idx, records: inner .records .into_iter() - .map(|record| { - record.map_or_else( - || Arc::downgrade(&RecordRef::new(vec![]).0), - |record| Arc::downgrade(&record.0), - ) - }) + .map(|(idx, record)| (idx, Arc::downgrade(&record.0))) .collect(), record_pointer_to_index: inner.record_pointer_to_index, }), @@ -159,9 +178,10 @@ impl StoreRecord for ProcessorRecordStoreDeserializer { fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError> { let mut inner = self.inner.write(); - let index = inner.records.len(); - insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, index); - inner.records.push(Some(record.clone())); + inner.idx += 1; + let idx = inner.idx; + insert_record_pointer_to_index(&mut inner.record_pointer_to_index, record, idx); + inner.records.insert(idx, record.clone()); Ok(()) } From 934cd5a78e98d0a603034fe65a89cd5e1ecafc95 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Tue, 3 Oct 2023 03:13:36 +0000 Subject: [PATCH 3/4] Small fixes Weak can become empty between `vacuum()` and `upgrade()`. --- dozer-recordstore/src/in_memory/store.rs | 6 +++--- dozer-recordstore/src/rocksdb.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dozer-recordstore/src/in_memory/store.rs b/dozer-recordstore/src/in_memory/store.rs index 169e3ed7ed..fe17519fae 100644 --- a/dozer-recordstore/src/in_memory/store.rs +++ b/dozer-recordstore/src/in_memory/store.rs @@ -50,11 +50,11 @@ impl ProcessorRecordStore { .records .range(start..) // We just removed all the - .map(|(id, record)| (id, RecordRef(record.upgrade().unwrap()))) + .filter_map(|(&id, weak)| weak.upgrade().map(|record| (id, RecordRef(record)))) .collect::>(); let data = bincode::serialize(&slice).map_err(|e| RecordStoreError::SerializationError { - typ: "[RecordRef]", + typ: "[(usize, RecordRef)]", reason: Box::new(e), })?; Ok((data, slice.len())) @@ -128,7 +128,7 @@ impl ProcessorRecordStoreDeserializer { pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> { let slice: Vec<(usize, RecordRef)> = bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError { - typ: "[Option]", + typ: "[(usize, RecordRef)]", reason: Box::new(e), })?; diff --git a/dozer-recordstore/src/rocksdb.rs b/dozer-recordstore/src/rocksdb.rs index 3597b7c227..98f41230c0 100644 --- a/dozer-recordstore/src/rocksdb.rs +++ b/dozer-recordstore/src/rocksdb.rs @@ -46,7 +46,7 @@ impl ProcessorRecordStore { } pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { - Ok((vec![0], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing + Ok((vec![], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing } pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> { From baae32b9eccc05aae45f5faa6eb51a9d6f89a0ea Mon Sep 17 00:00:00 2001 From: Jesse Bakker Date: Tue, 3 Oct 2023 09:29:10 +0200 Subject: [PATCH 4/4] fix: make record store num_records strictly increasing --- dozer-recordstore/src/in_memory/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-recordstore/src/in_memory/store.rs b/dozer-recordstore/src/in_memory/store.rs index fe17519fae..bf4faafd3c 100644 --- a/dozer-recordstore/src/in_memory/store.rs +++ b/dozer-recordstore/src/in_memory/store.rs @@ -39,7 +39,7 @@ impl ProcessorRecordStore { } pub fn num_records(&self) -> usize { - self.inner.read().records.len() + self.inner.read().idx } pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> {