Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable checkpointing #2124

Closed
wants to merge 2 commits into from
Closed

Conversation

abcpro1
Copy link
Contributor

@abcpro1 abcpro1 commented Oct 4, 2023

Disable checkpointing for now until we make it use less disk space.

@abcpro1 abcpro1 requested a review from Jesse-Bakker October 4, 2023 19:40
@abcpro1 abcpro1 force-pushed the disable-checkpointing branch from 2a8004a to eaa320a Compare October 4, 2023 22:03
Persisting pipeline processors to disk currently uses a lot of disk space, especially when checkpointing frequency is high; for example when `max_num_records_before_persist` is set to a low value.
@abcpro1 abcpro1 force-pushed the disable-checkpointing branch from eaa320a to 5f3ccf0 Compare October 4, 2023 22:12
@@ -182,7 +182,8 @@ impl SourceChannelManager {

fn should_participate_in_commit(&self) -> bool {
self.num_uncommitted_ops >= self.commit_sz
|| self
|| self.num_uncommitted_ops > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can tweak this already with a self.commit_sz > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I targeted the else branch, which is time based.
Basically, when time since last commit is greater than max_duration_between_commits; commit, but only if there is something to commit.
It's a minor fix, but I had it locally, and I thought I should push it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what it does now. Makes sense 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really subtle. Let me try to explain.

To generate a Commit message, all the sources have to agree on the system's "state", which is represented with the SourceStates type.
The EpochManager is the synchronization point where all sources communicate. Every source calls EpochManager::wait_for_epoch_close with its own state, and the EpochManager aggregates all source states, performing and on the termination request (second parameter), performing or on the commit request (third parameter), meaning that a Terminate message is sent iif all sources want to terminate, and a Commit message is sent iif any source wants to commit.

The EpochManager being a synchronization point implies that every source must call EpochManager::wait_for_epoch_close every now and then to unblock the progress of other sources. Imagine that we have two sources, A and B. A hasn't received any new ops, and B received some. If A doesn't call EpochManager::wait_for_epoch_close, B will call it and wait for A to call it forever. The pipeline won't be able to make any progress.

So we shouldn't add this line. The information is not lost though. self.num_uncommitted_ops > 0 is passed as the third parameter of EpochManager::wait_for_epoch_close.

return Err(ExecutionError::CannotRestart);
}

#[cfg(FIXME_CHECKPOINTING)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might still write the checkpoints, which means this will fail to restart every time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to fix the restart issue separately. Once we settle on a solution, I will open a pull request for it.

Copy link
Contributor Author

@abcpro1 abcpro1 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer to solve the restart issue in this pull request as well.

@@ -182,7 +182,8 @@ impl SourceChannelManager {

fn should_participate_in_commit(&self) -> bool {
self.num_uncommitted_ops >= self.commit_sz
|| self
|| self.num_uncommitted_ops > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what it does now. Makes sense 👍

@chubei
Copy link
Contributor

chubei commented Oct 18, 2023

Not needed anymore

@chubei chubei closed this Oct 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants