-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): add approx percentile cache for global approx percentile #18026
Conversation
Seems like performance got worse....... |
Let me test it with fs storage rather than in-memory. |
38f075a
to
9e8bb23
Compare
9e8bb23
to
67cda7c
Compare
Performance improved 2x. I will polish up this PR and get it ready to be merged. |
Ready for review. |
#[for_await] | ||
for keyed_row in self | ||
.bucket_state_table | ||
.rev_iter_with_prefix(&[Datum::None; 0], &neg_bounds, PrefetchOptions::default()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why we use rev iter here? IMO the self.cache.neg_buckets.insert
will work regardless of iter order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// Update | ||
impl<S: StateStore> GlobalApproxPercentileState<S> { | ||
pub fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult<()> { | ||
for (_op, row) in chunk.rows() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ignoring op
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because from the upstream, the input is always an INSERT
. We only care about the delta
column of the row, which will update our counts accordingly. (e.g. -100, +10, +5, etc...)
I will add a comment on it.
It's a special way of processing stream chunks. The behaviour does not change from previous approx_percentile implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IC. May add a debug_assert
here to ensure the correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Just load the entire state into memory on bootstrap. When processing chunks, we will immediately persist the updates, rather than waiting for barrier to write to state tables, to avoid contributing to barrier latency. On barrier, we will just scan the cache and output the percentile agg.
Other improvements
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.