Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc-backfill): support pause and resume #14590

Merged
merged 13 commits into from
Jan 17, 2024
Merged

Conversation

TennyZhuang
Copy link
Contributor

@TennyZhuang TennyZhuang commented Jan 16, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

#14555

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will it be resumed?

Comment on lines 226 to 231
#[auto_enum(futures03::Stream)]
let right_snapshot = if !paused {
upstream_table_reader.snapshot_read(args).map(Either::Right)
} else {
futures::stream::pending()
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about

/// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types
/// of messages) and the other receiving data only (no barrier). The merged stream can be paused
/// (`StreamReaderWithPause::pause_stream`) and resumed (`StreamReaderWithPause::resume_stream`).
/// A paused stream will not receive any data from either original stream until a barrier arrives
/// and the stream is resumed.
///
/// ## Priority
///
/// If `BIASED` is `true`, the left-hand stream (the one receiving barriers) will get a higher
/// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin
/// fashion.
pub(super) struct StreamReaderWithPause<const BIASED: bool, M> {
inner: StreamReaderWithPauseInner<M>,
/// Whether the source stream is paused.
paused: bool,
}

@BugenZhao BugenZhao disabled auto-merge January 16, 2024 05:45
@StrikeW
Copy link
Contributor

StrikeW commented Jan 16, 2024

How will it be resumed?

IIRC, pause_on_startup flag will be reset to false after meta restart. So in the second time we restart meta, it will be resumed. Anything I missed? @BugenZhao

@BugenZhao
Copy link
Member

How will it be resumed?

IIRC, pause_on_startup flag will be reset to false after meta restart. So in the second time we restart meta, it will be resumed. Anything I missed? @BugenZhao

It can also be resumed by issuing a Resume barrier with risectl meta resume.

@StrikeW
Copy link
Contributor

StrikeW commented Jan 16, 2024

It can also be resumed by issuing a Resume barrier with risectl meta resume.

Yep, this pr doesn't support this resume path.

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, can support resume barrier as well, so no need to restart cluster to resume.

@TennyZhuang
Copy link
Contributor Author

Reimplemented and supported resume, please review again.

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4727 files.

Valid Invalid Ignored Fixed
2065 1 2661 0
Click to see the invalid file list
  • src/utils/futures_util/src/pausable.rs

src/utils/futures_util/src/pausable.rs Show resolved Hide resolved
TennyZhuang and others added 2 commits January 16, 2024 17:36
Signed-off-by: TennyZhuang <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

src/utils/futures_util/src/pausable.rs Show resolved Hide resolved
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if this.paused.load(Ordering::Relaxed) {
Poll::Pending
Copy link
Member

@BugenZhao BugenZhao Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Pending without doing anything with the waker could potentially lead to deadlock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give me more explanation? I can only find https://docs.rs/futures-util/0.3.30/src/futures_util/stream/pending.rs.html#32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the article, but I can't find the solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, should I register the paused as a signal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@BugenZhao BugenZhao Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, should I register the paused as a signal?

Yeah. I guess using anything else with a wait-queue mechanism should work.

Although I guess the deadlock won't occur with the usage of this PR, but since it's made a general utility, we should be more careful about it.

Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
@TennyZhuang TennyZhuang changed the title feat(cdc-backfill): support pause_on_startup feat(cdc-backfill): support pause and resume Jan 17, 2024
let this = self.project();
if this.paused.load(Ordering::Relaxed) {
let mut waker = this.waker.lock().unwrap();
waker.get_or_insert(cx.waker().clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always insert the new waker? Is it possible that the stream is moved to another task and the previous waker does not work anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure, but insert seems not bad here. Updated

@TennyZhuang TennyZhuang added this pull request to the merge queue Jan 17, 2024
Merged via the queue into main with commit 1674a7c Jan 17, 2024
25 of 26 checks passed
@TennyZhuang TennyZhuang deleted the cdc-backfill-pause branch January 17, 2024 08:38
Little-Wallace pushed a commit that referenced this pull request Jan 20, 2024
Signed-off-by: TennyZhuang <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants