-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(snapshot-backfill): extract common logic of consuming snapshot and log store #19936
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
2d2947f
to
579b264
Compare
self.stream.poll_next_unpin(cx) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we adapt the docs from the PR and add them here? To my understanding the reason we need a custom stream, rather than rely on a try_stream
macro, is because we need to encapsulate some state inside the stream as well.
It is possible to do it with try_stream
, but the internal state of the sink will be exposed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to use the try_stream
macro, because the ownership of the vnode streams will be owned by the stream generated by try_stream
, and then we can't get the state of the vnode streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay let's add some docs to provide this justification.
src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs
Outdated
Show resolved
Hide resolved
src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs
Outdated
Show resolved
Hide resolved
match ready!(this.poll_next_row(cx)) { | ||
Ok(Some(((op, row), second))) => { | ||
let may_chunk = if let Some((second_op, second_row)) = second { | ||
if this.data_chunk_builder.can_append(2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this.data_chunk_builder.can_append(2) { | |
if this.data_chunk_builder.can_append_update |
Two rows will always correspond to update. This will make it more readable.
src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs
Outdated
Show resolved
Hide resolved
} | ||
} else { | ||
this.ops.push(op); | ||
this.data_chunk_builder.append_one_row(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we ensure that the stream always drains any remaining rows in data_chunk_builder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh because when we hit Ok(None)
below we always drain it.
Will be helpful to add this as a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
6e0ccbc
to
de3fef3
Compare
de3fef3
to
627c55a
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Part of #19720.
For snapshot backfill, in the main branch and the future PR #19720, the stage of consuming snapshot and consuming log store have the similar logic of consuming a stream of row with op and convert the stream into stream chunk. In this PR, we will extra the common logic and let them share the same code.
Previously, the row streams of multiple vnodes are combined in the stream returned from
StorageTable
method. In this PR we will change to combine the stream of multiple vnodes outsideStorageTable
withFuturesUnordered
, so that we can access each vnode stream and get the progress of each vnode, which will be useful in #19720.Checklist
Documentation
Release note