From 9269db4b567cb46c2446f4595641cd8f2f0c77f7 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Wed, 4 Oct 2023 19:33:35 +0000 Subject: [PATCH] fix: disable checkpointing Persisting pipeline processors to disk currently uses a lot of disk space, especially when checkpointing frequency is high; for example when `max_num_records_before_persist` is set to a low value. --- dozer-core/src/checkpoint/mod.rs | 7 ++++++- dozer-core/src/errors.rs | 2 ++ dozer-core/src/executor/processor_node.rs | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index aff19563fd..ca99bc6983 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -301,13 +301,18 @@ async fn read_record_store_slices( let record_store = ProcessorRecordStoreDeserializer::new(record_store)?; let record_store_prefix = record_store_prefix(factory_prefix); - let mut last_checkpoint: Option = None; + let last_checkpoint: Option = None; let mut continuation_token = None; loop { let objects = storage .list_objects(record_store_prefix.to_string(), continuation_token) .await?; + if let Some(_) = objects.objects.last() { + return Err(ExecutionError::CannotRestart); + } + + #[cfg(FIXME_CHECKPOINTING)] if let Some(object) = objects.objects.last() { let object_name = AsRef::::as_ref(&object.key) .strip_prefix(&record_store_prefix) diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index b709c4c791..0fc4d8ba9a 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -50,6 +50,8 @@ pub enum ExecutionError { source_name: NodeHandle, table_name: String, }, + #[error("Dozer cannot restart. You have to clean data from previous runs by running `dozer clean`")] + CannotRestart, #[error("Failed to create checkpoint: {0}")] FailedToCreateCheckpoint(BoxedError), } diff --git a/dozer-core/src/executor/processor_node.rs b/dozer-core/src/executor/processor_node.rs index c8ce1597e0..81cee7d39f 100644 --- a/dozer-core/src/executor/processor_node.rs +++ b/dozer-core/src/executor/processor_node.rs @@ -115,6 +115,7 @@ impl ReceiverLoop for ProcessorNode { self.error_manager.report(e); } + #[cfg(FIXME_CHECKPOINTING)] if let Some(checkpoint_writer) = &epoch.common_info.checkpoint_writer { let object = checkpoint_writer.create_processor_object(&self.node_handle)?; self.processor