Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable checkpointing #2124

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
@@ -301,13 +301,18 @@ async fn read_record_store_slices(
let record_store = ProcessorRecordStoreDeserializer::new(record_store)?;
let record_store_prefix = record_store_prefix(factory_prefix);

let mut last_checkpoint: Option<Checkpoint> = None;
let last_checkpoint: Option<Checkpoint> = None;
let mut continuation_token = None;
loop {
let objects = storage
.list_objects(record_store_prefix.to_string(), continuation_token)
.await?;

if objects.objects.last().is_some() {
return Err(ExecutionError::CannotRestart);
}

#[cfg(FIXME_CHECKPOINTING)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might still write the checkpoints, which means this will fail to restart every time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to fix the restart issue separately. Once we settle on a solution, I will open a pull request for it.

Copy link
Contributor Author

@abcpro1 abcpro1 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer to solve the restart issue in this pull request as well.

if let Some(object) = objects.objects.last() {
let object_name = AsRef::<Utf8Path>::as_ref(&object.key)
.strip_prefix(&record_store_prefix)
4 changes: 4 additions & 0 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -50,6 +50,10 @@ pub enum ExecutionError {
source_name: NodeHandle,
table_name: String,
},
#[error(
"Dozer cannot restart. You have to clean data from previous runs by running `dozer clean`"
)]
CannotRestart,
#[error("Failed to create checkpoint: {0}")]
FailedToCreateCheckpoint(BoxedError),
}
1 change: 1 addition & 0 deletions dozer-core/src/executor/processor_node.rs
Original file line number Diff line number Diff line change
@@ -115,6 +115,7 @@ impl ReceiverLoop for ProcessorNode {
self.error_manager.report(e);
}

#[cfg(FIXME_CHECKPOINTING)]
if let Some(checkpoint_writer) = &epoch.common_info.checkpoint_writer {
let object = checkpoint_writer.create_processor_object(&self.node_handle)?;
self.processor
3 changes: 2 additions & 1 deletion dozer-core/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -182,7 +182,8 @@ impl SourceChannelManager {

fn should_participate_in_commit(&self) -> bool {
self.num_uncommitted_ops >= self.commit_sz
|| self
|| self.num_uncommitted_ops > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can tweak this already with a self.commit_sz > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I targeted the else branch, which is time based.
Basically, when time since last commit is greater than max_duration_between_commits; commit, but only if there is something to commit.
It's a minor fix, but I had it locally, and I thought I should push it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what it does now. Makes sense 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really subtle. Let me try to explain.

To generate a Commit message, all the sources have to agree on the system's "state", which is represented with the SourceStates type.
The EpochManager is the synchronization point where all sources communicate. Every source calls EpochManager::wait_for_epoch_close with its own state, and the EpochManager aggregates all source states, performing and on the termination request (second parameter), performing or on the commit request (third parameter), meaning that a Terminate message is sent iif all sources want to terminate, and a Commit message is sent iif any source wants to commit.

The EpochManager being a synchronization point implies that every source must call EpochManager::wait_for_epoch_close every now and then to unblock the progress of other sources. Imagine that we have two sources, A and B. A hasn't received any new ops, and B received some. If A doesn't call EpochManager::wait_for_epoch_close, B will call it and wait for A to call it forever. The pipeline won't be able to make any progress.

So we shouldn't add this line. The information is not lost though. self.num_uncommitted_ops > 0 is passed as the third parameter of EpochManager::wait_for_epoch_close.

&& self
.last_commit_instant
.elapsed()
.unwrap_or(self.max_duration_between_commits) // In case of system time drift, we just commit