Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable shuffle for snapshot backfill #18063

Merged
merged 6 commits into from
Aug 16, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Aug 16, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In #17735, we didn't enable shuffled backfill for snapshot backfill yet, because when running the backfill test in CI, if shuffled backfill enabled, the stream got stuck and never finished.

After investigating the log, we figured out that there was deadlock happening. When the downstream creating job is consuming the upstream log store, it will wait for the upstream epoch to be committed. However, in the current code, this waiting will cause back-pressure to the upstream, and then block the upstream from handling data, and then the upstream epoch won't be finished and the committed until unblocked. The deadlock didn't happen when shuffled backfill is not enabled because, if not shuffled, the dispatcher between upstream mv executors and downstream backfill executor is all local exchange, which has large buffer, and is less likely to get back-pressured. However, when shuffled backfill is enabled, for remote exchange that has smaller buffer, we are likely to hit the back-pressure and enter further deadlock.

In this PR, we resolve it by also concurrently polling upstream while waiting for upstream epoch to be committed, and the shuffled backfill is enabled for snapshot backfill.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@wenym1 wenym1 requested review from hzxa21 and kwannoel August 16, 2024 08:27
@wenym1 wenym1 changed the title feat: (wip) enable shuffle for snapshot backfill feat: enable shuffle for snapshot backfill Aug 16, 2024
Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.
Btw for with_consuming_upstream, I took a look at the macro definition. It's a little unreadable to me:

                async fn with_consuming_upstream<'a, 'b, T>(
                    future: impl Future<Output = StreamExecutorResult<T>>,
                    upstream_buffer: &'a mut UpstreamBuffer<'b>,
                ) -> StreamExecutorResult<T> {
                    select! {
                        biased;
                        e = upstream_buffer.consume_upstream() => {
                            Err(e)
                        }
                        result = future => {
                            result
                        }
                    }
                }

I think most people would expect the following select arm yields some values. But it only yields errors. Once we look into the internal implementation of upstream_buffer the reason becomes clear, which is that we implicitly buffer the barriers inside the UpstreamBuffer and error is only returned if there's an error in reading from upstream.

                        e = upstream_buffer.consume_upstream() => {
                            Err(e)
                        }

Could we add a comment here to explain this behaviour? We can do that in a separate PR.

@wenym1 wenym1 added this pull request to the merge queue Aug 16, 2024
Merged via the queue into main with commit f5f09a6 Aug 16, 2024
31 of 32 checks passed
@wenym1 wenym1 deleted the yiming/shuffled-snapshot-backfill branch August 16, 2024 10:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants