-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(snapshot-backfill): control log store back pressure in backfill executor #18798
Conversation
I'm a little confused by the phrasing here. The first statement states that "snapshot backfill executor will not be aware of this barrier". But in the second statement, it is implied that this barrier will still be "injected and collected from the creating job", so it seems like it will still be received by the snapshot backfill executor. Could you elaborate more on what you mean by "not be aware"? I suppose it does not mean "will not receive", but something else entirely. But I'm not quite sure what. Do you mean it will just be treated as a normal barrier in the snapshot backfill executor, and not anytihng special? And in contrast, the upstream mv executor will handle it specially, and stop writing to logstore on receiving it? |
For |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some early comments.
src/meta/src/barrier/mod.rs
Outdated
prev_epoch.value().0, | ||
checkpoint, | ||
)? | ||
creating_job.may_inject_fake_barrier(&mut self.control_stream_manager, checkpoint)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, may_inject_fake_barrier
may do the following operations:
- During backfilling, it will inject one fake barrier
- On starting log store consumption, it will inject one or more real (but stale) barriers
Given that it is not always injecting one "fake" barrier, how about renaming may_inject_fake_barrier
to may_inject_barriers
? Also it is better to breifly document the behavor as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just updated the code.
In the latest code, we won't transit from ConsumingSnapshot to ConsumingLogStore when handling new upstream barrier. Instead, we changed to do the transition in the collect
method when we see that the snapshot is fully consumed. The previous pending barriers will be injected when we do the transition.
When handling new upstream barrier, we will only inject fake barrier and push the upstream barrier to the buffer, and won't do the transition.
src/meta/src/barrier/mod.rs
Outdated
tables_to_commit, | ||
is_first_time, | ||
)); | ||
self.completing_command = CompletingCommand::CreatingStreamingJob { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, currently we are polling commands for the following barriers here:
- Collected fake barrier for creating jobs
- Collected normal barrier for createing jobs
- Collected normal barrier for created jobs
1 and 2 are biased over 3. Is it possible that this can cause starvation in commit_epoch for 3 and thus delay the regular checkpoint? For example, right before each time next_completed_barrier
is called, there is a barrier from 1 or 2 finishing its collection. In the worst case, regular ckpt cannot be completed until the new jobs finish creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will not have starvation after #18819
if let Some(prev_barrier) = self.upstream_pending_barriers.front() | ||
&& prev_barrier.kind.is_barrier() | ||
{ | ||
// allow consuming upstream when the barrier is non-checkpoint barrier to avoid deadlock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why deadlock can happen if prev_barrier is checkpoint barrier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say we have epoch 1, 2, 3, where 1, 2 is non-checkpoint epoch, and 3 is checkpoint barrier. After receiving barrier of epoch 1, the snapshot backfill executor will do iter_log
on epoch 1 to replay the change log, before which it should wait for the committed epoch of upstream to bump to above epoch 1. However, since epoch 1 is a non-checkpoint epoch, we actually need to wait until the upstream to commit epoch 3. But we currently only receive barrier 1 from upstream, and then stop polling, which can have back-pressure on the upstream, and this back pressure may block the upstream from processing epoch 3, and cause the deadlock.
In short, the deadlock will be like, downstream iter_log on epoch 1
-> downstream wait on upstream finish epoch 3
-> upstream blocked by back pressure caused by downstream not polling further barrier
-> downstream not poll barrier due to being blocking by iter_log on epoch1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thank you
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | 604b5b6 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 43c944c | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 1b617b3 | ci/scripts/e2e-source-test.sh | View secret |
9425213 | Triggered | Generic Password | 43c944c | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 604b5b6 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 1b013f1 | ci/scripts/e2e-source-test.sh | View secret |
9425213 | Triggered | Generic Password | 0b01544 | ci/scripts/e2e-source-test.sh | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
…ming/temp-merge-snapshot-backfill-and-refactor
…rge-snapshot-backfill-and-refactor
…t-backfill-executor-backpressure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No major comments. LGTM!
@@ -81,47 +148,40 @@ impl CreatingStreamingJobStatus { | |||
pub(super) fn update_progress( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think eventually we need more documentation on the state transition including the following information to help others understand the codes more easily:
- What triggers the state transition. Example: backfill finish triggers ConsumingSnapshot -> ConsumingLogStore
- What actions are fired on each state transition. Example: inject all upstream pending barriers on ConsumingSnapshot -> ConsumingLogStore
This can be done lazily in separate PRs given that there may be more changes here in the future.
@@ -290,179 +225,170 @@ impl CreatingStreamingJobControl { | |||
.0 | |||
.saturating_sub(progress_epoch) as _, | |||
); | |||
let graph_to_finish = match &mut self.status { | |||
match &mut self.status { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that bothers me a little bit is that the state mutation/transition happens in two places: one here outside of CreatingStreamingJobStatus
, the other one inside CreatingStreamingJobStatus
with CreatingStreamingJobStatus::update_progress
. I am thinking whether we can move the logics here inside CreatingStreamingJobStatus
Ok( | ||
if let Some(barriers) = self.upstream_pending_barriers.pop_back() { | ||
// sub(1) to ensure that the lag is monotonically decreasing. | ||
self.max_pending_checkpoint_barrier_num = min( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very elegant implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Is it ok to cherry-pick this PR to 2.1? Since my ongoing PR #18925 (I want to cherry-pick it) also uses |
@wenym1 I think we can given that this PR doesn't affect the default backill mechnism. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In snapshot backfill implemented in #17735, we have a complicated control mechanism on meta node when consuming log store after finishing consuming snapshot. On meta node, snapshot backfill is divided into 4 phases,
ConsumingSnapshot
,ConsumingLogStore
,ConsumingUpstream
andFinishing
. After finishing consuming snapshot, it entersConsumingLogStore
phase. All snapshot backfill actors will keep consuming log store until receiving a barrier with special mutation from meta node. After hitting some pre-defined criterion, it will enterConsumingUpstream
phase. In this phase, the barrier of upstream and creating job is still decoupled, but the backfill executors have already started consuming upstream. On meta side, we will gradually reduce the number of barriers lagged behind. After the lag is small enough, it enters theFinishing
phase, after which the creating job will join as part of the upstream, in which barriers are injected and collected together.The current implementation has some drawbacks.
ConsumingLogStore
toConsumingUpstream
, backfill executors still consume log store even though it has caught up with the upstream, which unnecessarily still reads log store data even though the upstream data are ready.In this PR, as discussed offline with @hzxa21 , we implement a much simpler mechanism for snapshot backfill control. On meta node, there is only 3 phases,
ConsumingSnapshot
,ConsumingLogStore
andFinishing
, where the originalConsumingUpstream
is removed.For snapshot backfill executors, after consuming snapshot and entering
ConsumingLogStore
, we will start having back pressure to upstream by limiting the maximum number of barrier lagging behind in the barrier buffer. This maximum number will be decreased gradually to ensure that the phase ofConsumingLogStore
can be eventually finished. After there is no pending barrier any more, it means it has caught up to the upstream, and then it will start reading data from upstream, and will not read from log store any more. Note that, this process is all controlled by the snapshot backfill executors themselves, and they do not receive any control information from meta node. On the contrary, when they catch up with upstream and start consuming upstream, they will notify meta node about the progress.In meta node, when all snapshot backfill executors have caught up with upstream, it will enter the
Finishing
phase directly. This transition happens with a special barrier, which is the same barrier as the previous implementation. On this barrier,Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.