-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc-backfill): support pause and resume #14590
Conversation
Signed-off-by: TennyZhuang <[email protected]>
2a2bf40
to
0000002
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will it be resumed?
#[auto_enum(futures03::Stream)] | ||
let right_snapshot = if !paused { | ||
upstream_table_reader.snapshot_read(args).map(Either::Right) | ||
} else { | ||
futures::stream::pending() | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about
risingwave/src/stream/src/executor/stream_reader.rs
Lines 33 to 48 in a5e510c
/// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types | |
/// of messages) and the other receiving data only (no barrier). The merged stream can be paused | |
/// (`StreamReaderWithPause::pause_stream`) and resumed (`StreamReaderWithPause::resume_stream`). | |
/// A paused stream will not receive any data from either original stream until a barrier arrives | |
/// and the stream is resumed. | |
/// | |
/// ## Priority | |
/// | |
/// If `BIASED` is `true`, the left-hand stream (the one receiving barriers) will get a higher | |
/// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin | |
/// fashion. | |
pub(super) struct StreamReaderWithPause<const BIASED: bool, M> { | |
inner: StreamReaderWithPauseInner<M>, | |
/// Whether the source stream is paused. | |
paused: bool, | |
} |
IIRC, pause_on_startup flag will be reset to false after meta restart. So in the second time we restart meta, it will be resumed. Anything I missed? @BugenZhao |
It can also be resumed by issuing a |
Yep, this pr doesn't support this resume path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, can support resume barrier as well, so no need to restart cluster to resume.
Reimplemented and supported resume, please review again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4727 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2065 | 1 | 2661 | 0 |
Click to see the invalid file list
- src/utils/futures_util/src/pausable.rs
Signed-off-by: TennyZhuang <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let this = self.project(); | ||
if this.paused.load(Ordering::Relaxed) { | ||
Poll::Pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning Pending
without doing anything with the waker could potentially lead to deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give me more explanation? I can only find https://docs.rs/futures-util/0.3.30/src/futures_util/stream/pending.rs.html#32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the article, but I can't find the solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, should I register the paused as a signal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a simple reproduction. 🥹
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, should I register the paused as a signal?
Yeah. I guess using anything else with a wait-queue mechanism should work.
Although I guess the deadlock won't occur with the usage of this PR, but since it's made a general utility, we should be more careful about it.
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
let this = self.project(); | ||
if this.paused.load(Ordering::Relaxed) { | ||
let mut waker = this.waker.lock().unwrap(); | ||
waker.get_or_insert(cx.waker().clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we always insert the new waker? Is it possible that the stream is moved to another task and the previous waker does not work anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unsure, but insert
seems not bad here. Updated
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]>
Signed-off-by: TennyZhuang <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#14555
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.