-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): support row merge (a.k.a keyed merge) #17930
Conversation
… use reverse iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this PR has many duplication with the 2-phase approx percentile one, what about trying Graphite to construct a PR stack?
4485c01
to
3d92790
Compare
It's rebased ontop of that PR. I have already changed the target to make it easier for reviewers. |
src/stream/src/executor/row_merge.rs
Outdated
AlignedMessage::Left(chunk) => { | ||
lhs_buffer = Some(chunk); | ||
} | ||
AlignedMessage::Right(chunk) => { | ||
rhs_buffer = Some(chunk); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This executor looks like a very special executor which accepts one chunk from each input per barrier. In my mind, it should be a general merge executor that can process any number of chunks from input and need to maintain a hash map to match rows for both side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's similar to the difference between hash agg and simple agg. Once we decide to implement 'keyed merge', we have to consider whether the number of rows coming from both sides will exceed memory usage, and then consider whether to introduce a separate state table or spill mechanism. Therefore, introducing a simple implementation to determine the cardinality of both sides of the relation as 1 can quickly support percentile aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer a simpler executor compared to a more generalized one. We can generalize it if the need arises.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. The row merge is the same as the simple agg and the generalized one should be called key merge. BTW, can we add an assertion to ensure lhs_buffer and rhs_buffer is None before assigning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. The row merge is the same as the simple agg and the generalized one should be called key merge. BTW, can we add an assertion to ensure lhs_buffer and rhs_buffer is None before assigning?
Hmm I think the normal agg executor may output multiple chunks in one epoch. I will instead parse the chunks to ensure consistent operations.
Edit: Nevermind this will add more complexity. I will instead just buffer everything in one epoch and flush them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 677edab
src/stream/src/executor/row_merge.rs
Outdated
if !(1..=2).contains(&lhs_chunk.cardinality()) { | ||
bail!("lhs chunk cardinality should be 1 or 2"); | ||
} | ||
if !(1..=2).contains(&rhs_chunk.cardinality()) { | ||
bail!("rhs chunk cardinality should be 1 or 2"); | ||
} | ||
if lhs_chunk.cardinality() != rhs_chunk.cardinality() { | ||
bail!("lhs and rhs chunk cardinality should be the same"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assumption a bit too strong? What we know at this operator is that logically, the relations it inputs have only one row each, but we do not assume that only one chunk of changes will come during a barrier period, or that this chunk contains only one or two rows. Can we still parse each row of every incoming chunk when we receive it to get the current values on both sides? This way, we can handle chunks like (-+-+-+
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics of row merge is such that the input chunks should only have 1 operation, e.g. update, insert or delete.
Update will be for chunks with cardinality 2.
Insert will be for chunks with cardinality 1.
It's specific to simple normal agg and simple approx percentile agg, where the output cardinality of both sides should be 1 (insert or delete) or 2 (update) at most.
We can generalize it later if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I got what you mean. We can't simply overwrite the chunks. I thought you meant 1 chunk with 3 ops inside. I guess you mean 3 different update chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled by 677edab
305c48d
to
51cdb17
Compare
src/stream/src/executor/row_merge.rs
Outdated
self.ctx.id, | ||
self.ctx.fragment_id, | ||
self.ctx.streaming_metrics.clone(), | ||
"Join", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be RowMerge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled: ebf0545
src/stream/src/executor/row_merge.rs
Outdated
AlignedMessage::WatermarkLeft(watermark) => { | ||
yield Message::Watermark(watermark); | ||
} | ||
AlignedMessage::WatermarkRight(watermark) => { | ||
yield Message::Watermark(watermark); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we buffer the chunks, we can't emit watermark messages bypassing the chunk; otherwise, the watermark guarantee cannot be maintained. It seems necessary to buffer the watermark messages as well. Refer to this PR for a solution to this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should see watermark at all actually. Agg with approx percentile should not propagate watermark. I suggest just absorbing and ignoring any watermark, leaving a trace::warn if we see unexpected watermarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled: ebf0545
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Subsequently, we will also need to modify simple agg executor to always output, as long as there's some input chunks in the current epoch.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.