Skip to content

Commit

Permalink
fix: reenable rocksdb record store checkpointing with an empty implem…
Browse files Browse the repository at this point in the history
…entation

This flushes the in-memory log and releases memory.
  • Loading branch information
abcpro1 committed Oct 2, 2023
1 parent 496d4c6 commit 151c8c1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 9 deletions.
8 changes: 2 additions & 6 deletions dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,12 @@ impl EpochManager {
let instant = SystemTime::now();
let action = if *should_commit {
let num_records = self.record_store().num_records();
if source_states.values().all(|table_states| {
table_states
.values()
.all(|&state| state != TableState::NonRestartable)
}) && (num_records - state.next_record_index_to_persist
if num_records - state.next_record_index_to_persist
>= self.options.max_num_records_before_persist
|| instant
.duration_since(state.last_persisted_epoch_decision_instant)
.unwrap_or(Duration::from_secs(0))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)
{
state.next_record_index_to_persist = num_records;
state.last_persisted_epoch_decision_instant = instant;
Expand Down
6 changes: 3 additions & 3 deletions dozer-recordstore/src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl ProcessorRecordStore {
.ok_or(RecordStoreError::RocksdbRecordNotFound(*record_ref))
}

pub fn serialize_slice(&self, _start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
todo!("implement rocksdb record store checkpointing")
pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), RecordStoreError> {
Ok((vec![0], self.num_records() - start)) // TODO: implement rocksdb record store checkpointing
}

pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> {
todo!("implement rocksdb record store checkpointing")
Ok(()) // TODO: implement rocksdb record store checkpointing
}
}

0 comments on commit 151c8c1

Please sign in to comment.