Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot-backfill): control log store back pressure in backfill executor #18798

Merged
merged 31 commits into from
Oct 18, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Oct 8, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In snapshot backfill implemented in #17735, we have a complicated control mechanism on meta node when consuming log store after finishing consuming snapshot. On meta node, snapshot backfill is divided into 4 phases, ConsumingSnapshot, ConsumingLogStore, ConsumingUpstream and Finishing. After finishing consuming snapshot, it enters ConsumingLogStore phase. All snapshot backfill actors will keep consuming log store until receiving a barrier with special mutation from meta node. After hitting some pre-defined criterion, it will enter ConsumingUpstream phase. In this phase, the barrier of upstream and creating job is still decoupled, but the backfill executors have already started consuming upstream. On meta side, we will gradually reduce the number of barriers lagged behind. After the lag is small enough, it enters the Finishing phase, after which the creating job will join as part of the upstream, in which barriers are injected and collected together.

The current implementation has some drawbacks.

  • before the barrier to turn ConsumingLogStore to ConsumingUpstream, backfill executors still consume log store even though it has caught up with the upstream, which unnecessarily still reads log store data even though the upstream data are ready.
  • though we have some control mechanism on meta node to ensure that the number of barrier lagging behind gradually decreases, there is no actual back pressure to upstream on streaming executor side.
  • the control state on meta node is too complicated, and can be simplified.

In this PR, as discussed offline with @hzxa21 , we implement a much simpler mechanism for snapshot backfill control. On meta node, there is only 3 phases, ConsumingSnapshot, ConsumingLogStore and Finishing, where the original ConsumingUpstream is removed.

For snapshot backfill executors, after consuming snapshot and entering ConsumingLogStore, we will start having back pressure to upstream by limiting the maximum number of barrier lagging behind in the barrier buffer. This maximum number will be decreased gradually to ensure that the phase of ConsumingLogStore can be eventually finished. After there is no pending barrier any more, it means it has caught up to the upstream, and then it will start reading data from upstream, and will not read from log store any more. Note that, this process is all controlled by the snapshot backfill executors themselves, and they do not receive any control information from meta node. On the contrary, when they catch up with upstream and start consuming upstream, they will notify meta node about the progress.

In meta node, when all snapshot backfill executors have caught up with upstream, it will enter the Finishing phase directly. This transition happens with a special barrier, which is the same barrier as the previous implementation. On this barrier,

  • after this barrier, the creating job will join the upstream, and all later barriers are injected and collected together
  • on receiving this barrier, the upstream mv executor will stop writing log store old value, which is the same as the previous implementation.
  • snapshot backfill executor will not be aware of this barrier, because it no longer receives control information from meta node after finishing consuming snapshot.
  • For this barrier, it is injected and collected independently between upstream and the creating job. However, the upstream and the creating job will commit this epoch together, which means the upstream will wait for the creating job to collect this barrier before committing the epoch. After this epoch is committed, the creating job will be marked as created, and the snapshot backfill is finished.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@kwannoel
Copy link
Contributor

kwannoel commented Oct 8, 2024

  1. snapshot backfill executor will not be aware of this barrier, because it no longer receives control information from meta node after finishing consuming snapshot.
  2. For this barrier, it is injected and collected independently between upstream and the creating job. However, the upstream and the creating job will commit this epoch together, which means the upstream will wait for the creating job to collect this barrier before committing the epoch. After this epoch is committed, the creating job will be marked as created, and the snapshot backfill is finished.

I'm a little confused by the phrasing here. The first statement states that "snapshot backfill executor will not be aware of this barrier". But in the second statement, it is implied that this barrier will still be "injected and collected from the creating job", so it seems like it will still be received by the snapshot backfill executor.

Could you elaborate more on what you mean by "not be aware"? I suppose it does not mean "will not receive", but something else entirely. But I'm not quite sure what. Do you mean it will just be treated as a normal barrier in the snapshot backfill executor, and not anytihng special? And in contrast, the upstream mv executor will handle it specially, and stop writing to logstore on receiving it?

@wenym1
Copy link
Contributor Author

wenym1 commented Oct 8, 2024

Could you elaborate more on what you mean by "not be aware"? I suppose it does not mean "will not receive", but something else entirely. But I'm not quite sure what. Do you mean it will just be treated as a normal barrier in the snapshot backfill executor, and not anytihng special? And in contrast, the upstream mv executor will handle it specially, and stop writing to logstore on receiving it?

For not be aware, I actually meant not aware of the barrier mutation, which is the key difference to the previous implementation. It only cares about the epoch information in the barrier. In the previous implementation, it depends on the barrier mutation to transit from consuming log store to consuming upstream. Sorry for the confusing wording.

Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some early comments.

prev_epoch.value().0,
checkpoint,
)?
creating_job.may_inject_fake_barrier(&mut self.control_stream_manager, checkpoint)?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, may_inject_fake_barrier may do the following operations:

  • During backfilling, it will inject one fake barrier
  • On starting log store consumption, it will inject one or more real (but stale) barriers

Given that it is not always injecting one "fake" barrier, how about renaming may_inject_fake_barrier to may_inject_barriers? Also it is better to breifly document the behavor as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updated the code.

In the latest code, we won't transit from ConsumingSnapshot to ConsumingLogStore when handling new upstream barrier. Instead, we changed to do the transition in the collect method when we see that the snapshot is fully consumed. The previous pending barriers will be injected when we do the transition.

When handling new upstream barrier, we will only inject fake barrier and push the upstream barrier to the buffer, and won't do the transition.

tables_to_commit,
is_first_time,
));
self.completing_command = CompletingCommand::CreatingStreamingJob {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, currently we are polling commands for the following barriers here:

  1. Collected fake barrier for creating jobs
  2. Collected normal barrier for createing jobs
  3. Collected normal barrier for created jobs

1 and 2 are biased over 3. Is it possible that this can cause starvation in commit_epoch for 3 and thus delay the regular checkpoint? For example, right before each time next_completed_barrier is called, there is a barrier from 1 or 2 finishing its collection. In the worst case, regular ckpt cannot be completed until the new jobs finish creation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will not have starvation after #18819

if let Some(prev_barrier) = self.upstream_pending_barriers.front()
&& prev_barrier.kind.is_barrier()
{
// allow consuming upstream when the barrier is non-checkpoint barrier to avoid deadlock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why deadlock can happen if prev_barrier is checkpoint barrier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we have epoch 1, 2, 3, where 1, 2 is non-checkpoint epoch, and 3 is checkpoint barrier. After receiving barrier of epoch 1, the snapshot backfill executor will do iter_log on epoch 1 to replay the change log, before which it should wait for the committed epoch of upstream to bump to above epoch 1. However, since epoch 1 is a non-checkpoint epoch, we actually need to wait until the upstream to commit epoch 3. But we currently only receive barrier 1 from upstream, and then stop polling, which can have back-pressure on the upstream, and this back pressure may block the upstream from processing epoch 3, and cause the deadlock.

In short, the deadlock will be like, downstream iter_log on epoch 1 -> downstream wait on upstream finish epoch 3 -> upstream blocked by back pressure caused by downstream not polling further barrier -> downstream not poll barrier due to being blocking by iter_log on epoch1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thank you

Copy link

gitguardian bot commented Oct 15, 2024

⚠️ GitGuardian has uncovered 7 secrets following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secrets in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password 604b5b6 e2e_test/source/tvf/postgres_query.slt View secret
9425213 Triggered Generic Password 43c944c e2e_test/source/tvf/postgres_query.slt View secret
9425213 Triggered Generic Password 1b617b3 ci/scripts/e2e-source-test.sh View secret
9425213 Triggered Generic Password 43c944c e2e_test/source/tvf/postgres_query.slt View secret
9425213 Triggered Generic Password 604b5b6 e2e_test/source/tvf/postgres_query.slt View secret
9425213 Triggered Generic Password 1b013f1 ci/scripts/e2e-source-test.sh View secret
9425213 Triggered Generic Password 0b01544 ci/scripts/e2e-source-test.sh View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secrets safely. Learn here the best practices.
  3. Revoke and rotate these secrets.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

@wenym1 wenym1 changed the base branch from main to yiming/commit-multi-graph-together October 15, 2024 10:37
Base automatically changed from yiming/commit-multi-graph-together to main October 16, 2024 06:43
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No major comments. LGTM!

@@ -81,47 +148,40 @@ impl CreatingStreamingJobStatus {
pub(super) fn update_progress(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eventually we need more documentation on the state transition including the following information to help others understand the codes more easily:

  • What triggers the state transition. Example: backfill finish triggers ConsumingSnapshot -> ConsumingLogStore
  • What actions are fired on each state transition. Example: inject all upstream pending barriers on ConsumingSnapshot -> ConsumingLogStore

This can be done lazily in separate PRs given that there may be more changes here in the future.

@@ -290,179 +225,170 @@ impl CreatingStreamingJobControl {
.0
.saturating_sub(progress_epoch) as _,
);
let graph_to_finish = match &mut self.status {
match &mut self.status {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that bothers me a little bit is that the state mutation/transition happens in two places: one here outside of CreatingStreamingJobStatus, the other one inside CreatingStreamingJobStatus with CreatingStreamingJobStatus::update_progress. I am thinking whether we can move the logics here inside CreatingStreamingJobStatus

Ok(
if let Some(barriers) = self.upstream_pending_barriers.pop_back() {
// sub(1) to ensure that the lag is monotonically decreasing.
self.max_pending_checkpoint_barrier_num = min(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very elegant implementation

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wenym1 wenym1 added this pull request to the merge queue Oct 18, 2024
Merged via the queue into main with commit 87e2ebf Oct 18, 2024
32 of 34 checks passed
@wenym1 wenym1 deleted the yiming/snapshot-backfill-executor-backpressure branch October 18, 2024 06:23
@xxchan
Copy link
Member

xxchan commented Oct 18, 2024

Is it ok to cherry-pick this PR to 2.1? Since my ongoing PR #18925 (I want to cherry-pick it) also uses BackfillState 🤪

@hzxa21
Copy link
Collaborator

hzxa21 commented Oct 18, 2024

@wenym1 I think we can given that this PR doesn't affect the default backill mechnism.

@wenym1
Copy link
Contributor Author

wenym1 commented Oct 18, 2024

Is it ok to cherry-pick this PR to 2.1? Since my ongoing PR #18925 (I want to cherry-pick it) also uses BackfillState 🤪

Included in #19000

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants