Skip to content

Commit

Permalink
fix: disable checkpointing
Browse files Browse the repository at this point in the history
Persisting pipeline processors to disk currently uses a lot of disk space, especially when checkpointing frequency is high; for example when `max_num_records_before_persist` is set to a low value.
  • Loading branch information
abcpro1 committed Oct 4, 2023
1 parent c0d1a7b commit 9269db4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
7 changes: 6 additions & 1 deletion dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,18 @@ async fn read_record_store_slices(
let record_store = ProcessorRecordStoreDeserializer::new(record_store)?;
let record_store_prefix = record_store_prefix(factory_prefix);

let mut last_checkpoint: Option<Checkpoint> = None;
let last_checkpoint: Option<Checkpoint> = None;
let mut continuation_token = None;
loop {
let objects = storage
.list_objects(record_store_prefix.to_string(), continuation_token)
.await?;

if let Some(_) = objects.objects.last() {
return Err(ExecutionError::CannotRestart);
}

#[cfg(FIXME_CHECKPOINTING)]
if let Some(object) = objects.objects.last() {
let object_name = AsRef::<Utf8Path>::as_ref(&object.key)
.strip_prefix(&record_store_prefix)
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum ExecutionError {
source_name: NodeHandle,
table_name: String,
},
#[error("Dozer cannot restart. You have to clean data from previous runs by running `dozer clean`")]
CannotRestart,
#[error("Failed to create checkpoint: {0}")]
FailedToCreateCheckpoint(BoxedError),
}
Expand Down
1 change: 1 addition & 0 deletions dozer-core/src/executor/processor_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl ReceiverLoop for ProcessorNode {
self.error_manager.report(e);
}

#[cfg(FIXME_CHECKPOINTING)]
if let Some(checkpoint_writer) = &epoch.common_info.checkpoint_writer {
let object = checkpoint_writer.create_processor_object(&self.node_handle)?;
self.processor
Expand Down

0 comments on commit 9269db4

Please sign in to comment.