-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Disable checkpointing #2124
Disable checkpointing #2124
Conversation
2a8004a
to
eaa320a
Compare
Persisting pipeline processors to disk currently uses a lot of disk space, especially when checkpointing frequency is high; for example when `max_num_records_before_persist` is set to a low value.
eaa320a
to
5f3ccf0
Compare
@@ -182,7 +182,8 @@ impl SourceChannelManager { | |||
|
|||
fn should_participate_in_commit(&self) -> bool { | |||
self.num_uncommitted_ops >= self.commit_sz | |||
|| self | |||
|| self.num_uncommitted_ops > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can tweak this already with a self.commit_sz > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I targeted the else branch, which is time based.
Basically, when time since last commit is greater than max_duration_between_commits
; commit, but only if there is something to commit.
It's a minor fix, but I had it locally, and I thought I should push it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see what it does now. Makes sense 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really subtle. Let me try to explain.
To generate a Commit
message, all the sources have to agree on the system's "state", which is represented with the SourceStates
type.
The EpochManager
is the synchronization point where all sources communicate. Every source calls EpochManager::wait_for_epoch_close
with its own state, and the EpochManager
aggregates all source states, performing and
on the termination request (second parameter), performing or
on the commit request (third parameter), meaning that a Terminate
message is sent iif all sources want to terminate, and a Commit
message is sent iif any source wants to commit.
The EpochManager
being a synchronization point implies that every source must call EpochManager::wait_for_epoch_close
every now and then to unblock the progress of other sources. Imagine that we have two sources, A
and B
. A
hasn't received any new ops, and B
received some. If A
doesn't call EpochManager::wait_for_epoch_close
, B
will call it and wait for A
to call it forever. The pipeline won't be able to make any progress.
So we shouldn't add this line. The information is not lost though. self.num_uncommitted_ops > 0
is passed as the third parameter of EpochManager::wait_for_epoch_close
.
return Err(ExecutionError::CannotRestart); | ||
} | ||
|
||
#[cfg(FIXME_CHECKPOINTING)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might still write the checkpoints, which means this will fail to restart every time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to fix the restart issue separately. Once we settle on a solution, I will open a pull request for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer to solve the restart issue in this pull request as well.
@@ -182,7 +182,8 @@ impl SourceChannelManager { | |||
|
|||
fn should_participate_in_commit(&self) -> bool { | |||
self.num_uncommitted_ops >= self.commit_sz | |||
|| self | |||
|| self.num_uncommitted_ops > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see what it does now. Makes sense 👍
Not needed anymore |
Disable checkpointing for now until we make it use less disk space.