-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: concurrent batch scan #12825
feat: concurrent batch scan #12825
Conversation
(start, end) | ||
})) | ||
} else { | ||
// let raw_key_ranges = if !ordered |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 Please either remove these code or make an option for enabling/disabling it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but needs to be proved to work by benchmark.
Codecov Report
@@ Coverage Diff @@
## main #12825 +/- ##
==========================================
+ Coverage 69.17% 69.19% +0.01%
==========================================
Files 1489 1489
Lines 246069 246061 -8
==========================================
+ Hits 170230 170257 +27
+ Misses 75839 75804 -35
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 11 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, I think we should make the limit larger
_ if !ordered => futures::stream::iter(iterators).flatten(), | ||
_ if !ordered => { | ||
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) | ||
.flatten_unordered(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 10 maybe two small, 1024 maybe better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether this is a good practice that the upper-level application are responsible for iteration performance improvement by manually schedule it concurrently. Does any other system do this?
If so, we may have to also handle the ordered case. Or it's hard to tell whether sorting with a separate executor can be faster.
@@ -493,7 +473,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> { | |||
0 => unreachable!(), | |||
1 => iterators.into_iter().next().unwrap(), | |||
// Concat all iterators if not to preserve order. | |||
_ if !ordered => futures::stream::iter(iterators).flatten(), | |||
_ if !ordered => { | |||
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can pin it after buffer
.
let raw_key_ranges = if !ordered | ||
&& matches!(encoded_key_range.start_bound(), Unbounded) | ||
&& matches!(encoded_key_range.end_bound(), Unbounded) | ||
{ | ||
// If the range is unbounded and order is not required, we can create a single iterator | ||
// for each continuous vnode range. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess removing this can significantly hurt the performance for a full scan on a small table (like Top-10 or simple aggregation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I will add a session varible to control it but I am not sure about which default value should be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have maintained the physical key count in meta. Currently CN doesn't have a way to get the key count stats but we can add it if needed.
offline discussion with @liurenjie1024. In OLAP system, they will do the concurent scan more aggressive by using file as the divided unit. |
The image we use is:
If I do the same with image v1.2.0, i.e. the one that your cluster was running when we were having the call.
The time is measured by turning on \timing in psql. It includes the time spent on network communication between my laptop and the RW cluster at US-east-1.
So the optimization improves the latency from 6.5s-0.25s = 6.2s to 2.9s-0.25s = 2.7s. about 56% faster |
In terms of the concern about performance regression on small table we use the image with the change of concurrency to 1024:
s2 is a table of 20 rows. |
I guess QPS might be a better indicator as the network latency is wavy and we're actually utilizing more CPU but no more I/O considering single-flight optimization in block cache. |
Then does more aggressive block pre-fetch help in this case? |
Sure, how was it tested last time when it was changed to the current status? can I reuse the testbed to compare Or we wait for the sysbench |
Yes, we can pre-fetch the blocks concurrently. But it will wait to return until all pre-fetch IO is finished. |
Looks like a very special case that the query must
|
I'm unsure about the implementation. However, in my mind, isn't pre-fetching a background task? Why do we have to make it synchronous? 👀 |
I apologize, but if I remember correctly, I only conducted basic tests on my local machine because those changes must be an improvement as there were no concurrent iterations. |
It's fine, then we use sysbench or manually construct a test case, I will add this setting as a weekly test |
You're probably correct. Also the table must be a distributed one. |
So we have two methods here
I am not sure which one is better... @hzxa21 any idea? |
Correct me if I am wrong, I think these are two non-conflicting optimizations. Although prefetch is asynchrouous, it is better to be driven by the caller's poll. For example, it is unrealistic to prefetch all blocks required by a per-vnode iter if |
I don't think the scan executor should think about block cache or io. It should delegate the request to storage layer, which is supposed to have an io manager to schedule io request. |
Scan executor shouldn't worry about that. IMO, under the current implementation, storage table should be responsible for concurrently poll the iterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More sysbench results have been posted in the Slack channel, the numbers LOKTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#12783
It still will not be concurrent when
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.