Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): support row merge (a.k.a keyed merge) #17930

Merged
merged 33 commits into from
Aug 8, 2024
Merged

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Aug 5, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Subsequently, we will also need to modify simple agg executor to always output, as long as there's some input chunks in the current epoch.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@kwannoel kwannoel marked this pull request as ready for review August 6, 2024 02:23
@graphite-app graphite-app bot requested a review from a team August 6, 2024 03:29
Copy link
Member

@stdrc stdrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this PR has many duplication with the 2-phase approx percentile one, what about trying Graphite to construct a PR stack?

@kwannoel kwannoel changed the base branch from main to kwannoel/approx-percentile-simple-two-phase August 6, 2024 06:12
@kwannoel kwannoel force-pushed the kwannoel/keyed-merge branch from 4485c01 to 3d92790 Compare August 6, 2024 06:18
@kwannoel
Copy link
Contributor Author

kwannoel commented Aug 6, 2024

Seems this PR has many duplication with the 2-phase approx percentile one, what about trying Graphite to construct a PR stack?

It's rebased ontop of that PR. I have already changed the target to make it easier for reviewers.

@kwannoel kwannoel changed the title feat(stream): support keyed merge feat(stream): support row merge Aug 6, 2024
@kwannoel kwannoel requested a review from chenzl25 August 7, 2024 09:17
@kwannoel kwannoel changed the title feat(stream): support row merge feat(stream): support row merge (a.k.a keyed merge) Aug 7, 2024
@kwannoel kwannoel requested review from fuyufjh and st1page August 7, 2024 09:17
Comment on lines 80 to 85
AlignedMessage::Left(chunk) => {
lhs_buffer = Some(chunk);
}
AlignedMessage::Right(chunk) => {
rhs_buffer = Some(chunk);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This executor looks like a very special executor which accepts one chunk from each input per barrier. In my mind, it should be a general merge executor that can process any number of chunks from input and need to maintain a hash map to match rows for both side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's similar to the difference between hash agg and simple agg. Once we decide to implement 'keyed merge', we have to consider whether the number of rows coming from both sides will exceed memory usage, and then consider whether to introduce a separate state table or spill mechanism. Therefore, introducing a simple implementation to determine the cardinality of both sides of the relation as 1 can quickly support percentile aggregation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer a simpler executor compared to a more generalized one. We can generalize it if the need arises.

Copy link
Contributor

@chenzl25 chenzl25 Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The row merge is the same as the simple agg and the generalized one should be called key merge. BTW, can we add an assertion to ensure lhs_buffer and rhs_buffer is None before assigning?

Copy link
Contributor Author

@kwannoel kwannoel Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The row merge is the same as the simple agg and the generalized one should be called key merge. BTW, can we add an assertion to ensure lhs_buffer and rhs_buffer is None before assigning?

Hmm I think the normal agg executor may output multiple chunks in one epoch. I will instead parse the chunks to ensure consistent operations.

Edit: Nevermind this will add more complexity. I will instead just buffer everything in one epoch and flush them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 677edab

Comment on lines 98 to 106
if !(1..=2).contains(&lhs_chunk.cardinality()) {
bail!("lhs chunk cardinality should be 1 or 2");
}
if !(1..=2).contains(&rhs_chunk.cardinality()) {
bail!("rhs chunk cardinality should be 1 or 2");
}
if lhs_chunk.cardinality() != rhs_chunk.cardinality() {
bail!("lhs and rhs chunk cardinality should be the same");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assumption a bit too strong? What we know at this operator is that logically, the relations it inputs have only one row each, but we do not assume that only one chunk of changes will come during a barrier period, or that this chunk contains only one or two rows. Can we still parse each row of every incoming chunk when we receive it to get the current values on both sides? This way, we can handle chunks like (-+-+-+).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of row merge is such that the input chunks should only have 1 operation, e.g. update, insert or delete.

Update will be for chunks with cardinality 2.
Insert will be for chunks with cardinality 1.

It's specific to simple normal agg and simple approx percentile agg, where the output cardinality of both sides should be 1 (insert or delete) or 2 (update) at most.

We can generalize it later if needed.

Copy link
Contributor Author

@kwannoel kwannoel Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I got what you mean. We can't simply overwrite the chunks. I thought you meant 1 chunk with 3 ops inside. I guess you mean 3 different update chunks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled by 677edab

@kwannoel kwannoel force-pushed the kwannoel/keyed-merge branch from 305c48d to 51cdb17 Compare August 8, 2024 05:14
Base automatically changed from kwannoel/approx-percentile-simple-two-phase to main August 8, 2024 05:25
@graphite-app graphite-app bot requested a review from a team August 8, 2024 05:39
self.ctx.id,
self.ctx.fragment_id,
self.ctx.streaming_metrics.clone(),
"Join",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be RowMerge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled: ebf0545

Comment on lines 103 to 108
AlignedMessage::WatermarkLeft(watermark) => {
yield Message::Watermark(watermark);
}
AlignedMessage::WatermarkRight(watermark) => {
yield Message::Watermark(watermark);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we buffer the chunks, we can't emit watermark messages bypassing the chunk; otherwise, the watermark guarantee cannot be maintained. It seems necessary to buffer the watermark messages as well. Refer to this PR for a solution to this issue.

Copy link
Contributor Author

@kwannoel kwannoel Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should see watermark at all actually. Agg with approx percentile should not propagate watermark. I suggest just absorbing and ignoring any watermark, leaving a trace::warn if we see unexpected watermarks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled: ebf0545

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@kwannoel kwannoel added this pull request to the merge queue Aug 8, 2024
Merged via the queue into main with commit f5f5701 Aug 8, 2024
31 of 32 checks passed
@kwannoel kwannoel deleted the kwannoel/keyed-merge branch August 8, 2024 11:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants