From 5851b4511af89ed1989691ae0add67ff5ce7245a Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Wed, 4 Oct 2023 19:33:35 +0000 Subject: [PATCH 1/2] fix: disable checkpointing Persisting pipeline processors to disk currently uses a lot of disk space, especially when checkpointing frequency is high; for example when `max_num_records_before_persist` is set to a low value. --- dozer-core/src/checkpoint/mod.rs | 7 ++++++- dozer-core/src/errors.rs | 4 ++++ dozer-core/src/executor/processor_node.rs | 1 + 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index aff19563fd..cce42a0018 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -301,13 +301,18 @@ async fn read_record_store_slices( let record_store = ProcessorRecordStoreDeserializer::new(record_store)?; let record_store_prefix = record_store_prefix(factory_prefix); - let mut last_checkpoint: Option = None; + let last_checkpoint: Option = None; let mut continuation_token = None; loop { let objects = storage .list_objects(record_store_prefix.to_string(), continuation_token) .await?; + if objects.objects.last().is_some() { + return Err(ExecutionError::CannotRestart); + } + + #[cfg(FIXME_CHECKPOINTING)] if let Some(object) = objects.objects.last() { let object_name = AsRef::::as_ref(&object.key) .strip_prefix(&record_store_prefix) diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index b709c4c791..f3e82fc67d 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -50,6 +50,10 @@ pub enum ExecutionError { source_name: NodeHandle, table_name: String, }, + #[error( + "Dozer cannot restart. You have to clean data from previous runs by running `dozer clean`" + )] + CannotRestart, #[error("Failed to create checkpoint: {0}")] FailedToCreateCheckpoint(BoxedError), } diff --git a/dozer-core/src/executor/processor_node.rs b/dozer-core/src/executor/processor_node.rs index c8ce1597e0..81cee7d39f 100644 --- a/dozer-core/src/executor/processor_node.rs +++ b/dozer-core/src/executor/processor_node.rs @@ -115,6 +115,7 @@ impl ReceiverLoop for ProcessorNode { self.error_manager.report(e); } + #[cfg(FIXME_CHECKPOINTING)] if let Some(checkpoint_writer) = &epoch.common_info.checkpoint_writer { let object = checkpoint_writer.create_processor_object(&self.node_handle)?; self.processor From 5f3ccf06a061de8966f116b120f829df16625468 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Wed, 4 Oct 2023 19:37:50 +0000 Subject: [PATCH 2/2] fix: only commit when there is something to commit --- dozer-core/src/forwarder.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs index 56bbe11a8a..18b86bf4d4 100644 --- a/dozer-core/src/forwarder.rs +++ b/dozer-core/src/forwarder.rs @@ -182,7 +182,8 @@ impl SourceChannelManager { fn should_participate_in_commit(&self) -> bool { self.num_uncommitted_ops >= self.commit_sz - || self + || self.num_uncommitted_ops > 0 + && self .last_commit_instant .elapsed() .unwrap_or(self.max_duration_between_commits) // In case of system time drift, we just commit