Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent batch scan #12825

Merged
merged 7 commits into from
Oct 20, 2023
Merged

feat: concurrent batch scan #12825

merged 7 commits into from
Oct 20, 2023

Conversation

st1page
Copy link
Contributor

@st1page st1page commented Oct 12, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

#12783

It still will not be concurrent when

  1. the pk prefix determines a specified distribution key and the scan range is just in some vnode.
  2. The optimizer generates the plan depending on the scan's order property

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@st1page st1page requested a review from BugenZhao October 13, 2023 02:37
(start, end)
}))
} else {
// let raw_key_ranges = if !ordered
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 Please either remove these code or make an option for enabling/disabling it.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but needs to be proved to work by benchmark.

@codecov
Copy link

codecov bot commented Oct 13, 2023

Codecov Report

Merging #12825 (38f0304) into main (527a276) will increase coverage by 0.01%.
The diff coverage is 40.00%.

@@            Coverage Diff             @@
##             main   #12825      +/-   ##
==========================================
+ Coverage   69.17%   69.19%   +0.01%     
==========================================
  Files        1489     1489              
  Lines      246069   246061       -8     
==========================================
+ Hits       170230   170257      +27     
+ Misses      75839    75804      -35     
Flag Coverage Δ
rust 69.19% <40.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
src/storage/src/table/batch_table/storage_table.rs 85.87% <40.00%> (-0.72%) ⬇️

... and 11 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, I think we should make the limit larger

_ if !ordered => futures::stream::iter(iterators).flatten(),
_ if !ordered => {
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec())
.flatten_unordered(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 10 maybe two small, 1024 maybe better?

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether this is a good practice that the upper-level application are responsible for iteration performance improvement by manually schedule it concurrently. Does any other system do this?

If so, we may have to also handle the ordered case. Or it's hard to tell whether sorting with a separate executor can be faster.

@@ -493,7 +473,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
0 => unreachable!(),
1 => iterators.into_iter().next().unwrap(),
// Concat all iterators if not to preserve order.
_ if !ordered => futures::stream::iter(iterators).flatten(),
_ if !ordered => {
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can pin it after buffer.

Comment on lines -421 to -426
let raw_key_ranges = if !ordered
&& matches!(encoded_key_range.start_bound(), Unbounded)
&& matches!(encoded_key_range.end_bound(), Unbounded)
{
// If the range is unbounded and order is not required, we can create a single iterator
// for each continuous vnode range.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess removing this can significantly hurt the performance for a full scan on a small table (like Top-10 or simple aggregation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I will add a session varible to control it but I am not sure about which default value should be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or can we get some table's cardinarity statics here or in optimizer? c.c. @hzxa21 @wcy-fdu

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have maintained the physical key count in meta. Currently CN doesn't have a way to get the key count stats but we can add it if needed.

@st1page
Copy link
Contributor Author

st1page commented Oct 16, 2023

I'm not sure whether this is a good practice that the upper-level application are responsible for iteration performance improvement by manually schedule it concurrently. Does any other system do this?

offline discussion with @liurenjie1024. In OLAP system, they will do the concurent scan more aggressive by using file as the divided unit.

@lmatz
Copy link
Contributor

lmatz commented Oct 17, 2023

The image we use is:
concurrent-batch-scan

CREATE TABLE s (
  dummy int, -- to control the number of rows generated in total, 1M rows in this case
  i int,
  c varchar,
  t1 timestamp,
  t2 timestamp
) with (
  connector = 'datagen',
  fields.dummy.kind = 'sequence',
  fields.dummy.start = '1',
  fields.dummy.end = '1000000',
  fields.i.kind = 'random',
  fields.i.min = 1,
  fields.i.max = 10,
  fields.i.seed = 1,
  fields.c.kind = 'random',
  fields.c.length = 1,
  fields.c.seed = 1,
  datagen.rows.per.second = '100000',
  datagen.split.num = '4'
);

-- When data is generated in memory, everything in cache

dev=> select i, c, max(t1), max(t2) from s group by i, c;
Time: 949.184 ms
dev=> select i, c, max(t1), max(t2) from s group by i, c;
Time: 941.612 ms
dev=> select i, c, max(t1), max(t2) from s group by i, c;
Time: 1080.106 ms (00:01.080)

-- We restarted the cluster on purpose to clear the cache

dev=> select i, c, max(t1), max(t2) from s group by i, c;
Time: 2964.502 ms (00:02.965)

If I do the same with image v1.2.0, i.e. the one that your cluster was running when we were having the call.

-- When the cache is empty.

dev=> select i, c, max(t1), max(t2) from s group by i, c;
Time: 6516.271 ms (00:06.516)

-- When everything is in cache:
dev=> select i, c, max(t1), max(t2) from s group by i, c;
Time: 961.800 ms

The time is measured by turning on \timing in psql. It includes the time spent on network communication between my laptop and the RW cluster at US-east-1.
We measure the network latency by

dev=> select 1;
 ?column? 
----------
        1
(1 row)

Time: 255.276 ms

So the optimization improves the latency from 6.5s-0.25s = 6.2s to 2.9s-0.25s = 2.7s. about 56% faster

@lmatz
Copy link
Contributor

lmatz commented Oct 17, 2023

In terms of the concern about performance regression on small table

we use the image with the change of concurrency to 1024:

dev=> select i, c, max(t1), max(t2) from s group by i, c;                                                                  
Time: 1824.229 ms (00:01.824)

dev=> select * from s2;
(20 rows)

Time: 244.970 ms

s2 is a table of 20 rows.

@BugenZhao
Copy link
Member

performance regression on small table

I guess QPS might be a better indicator as the network latency is wavy and we're actually utilizing more CPU but no more I/O considering single-flight optimization in block cache.

@BugenZhao
Copy link
Member

In OLAP system, they will do the concurent scan more aggressive by using file as the divided unit.

Then does more aggressive block pre-fetch help in this case?

@lmatz
Copy link
Contributor

lmatz commented Oct 17, 2023

I guess QPS might be a better indicator as the network latency is wavy

Sure, how was it tested last time when it was changed to the current status? can I reuse the testbed to compare

Or we wait for the sysbench

@st1page
Copy link
Contributor Author

st1page commented Oct 17, 2023

In OLAP system, they will do the concurent scan more aggressive by using file as the divided unit.

Then does more aggressive block pre-fetch help in this case?

Yes, we can pre-fetch the blocks concurrently. But it will wait to return until all pre-fetch IO is finished.
I think the question here is how deep we can push down the information that "order does not matter“, Only when the state store provides a disordered range scan interface, it can read the multiple blocks concurrently and return when any of them is finished.

@st1page
Copy link
Contributor Author

st1page commented Oct 17, 2023

I guess removing this can significantly hurt the performance for a full scan on a small table (like Top-10 or simple aggregation).

Looks like a very special case that the query must

  1. be a serving query requiring high QPS
  2. need to scan the whole table without any range predicate on the pk column.
  3. no order required on the scan

@BugenZhao
Copy link
Member

But it will wait to return until all pre-fetch IO is finished.

I'm unsure about the implementation. However, in my mind, isn't pre-fetching a background task? Why do we have to make it synchronous? 👀

@BugenZhao
Copy link
Member

Sure, how was it tested last time when it was changed to the current status?

I apologize, but if I remember correctly, I only conducted basic tests on my local machine because those changes must be an improvement as there were no concurrent iterations.

@lmatz
Copy link
Contributor

lmatz commented Oct 17, 2023

It's fine, then we use sysbench or manually construct a test case, I will add this setting as a weekly test

@BugenZhao
Copy link
Member

Looks like a very special case that the query must

You're probably correct. Also the table must be a distributed one.

@st1page
Copy link
Contributor Author

st1page commented Oct 17, 2023

But it will wait to return until all pre-fetch IO is finished.

I'm unsure about the implementation. However, in my mind, isn't pre-fetching a background task? Why do we have to make it synchronous? 👀

So we have two methods here

  1. divide the scan range into pieces and make the Scan concurrent in the executor
  2. make the Scan's performance totally dependent on the block cache, and optimize the cache's prefetch logic with concurent IO.

I am not sure which one is better... @hzxa21 any idea?

@hzxa21
Copy link
Collaborator

hzxa21 commented Oct 17, 2023

But it will wait to return until all pre-fetch IO is finished.

I'm unsure about the implementation. However, in my mind, isn't pre-fetching a background task? Why do we have to make it synchronous? 👀

So we have two methods here

  1. divide the scan range into pieces and make the Scan concurrent in the executor
  2. make the Scan's performance totally dependent on the block cache, and optimize the cache's prefetch logic with concurent IO.

I am not sure which one is better... @hzxa21 any idea?

Correct me if I am wrong, I think these are two non-conflicting optimizations. Although prefetch is asynchrouous, it is better to be driven by the caller's poll. For example, it is unrealistic to prefetch all blocks required by a per-vnode iter if iter.next is rarely called. Also, prefetch for full table scan query can pollute block cache so the amount of data to be prefetched should be bounded as well. That being said, I agree we can provide more information to storage to do more aggressive prefetch instead of prefetching only one block ahead but I think 1 is still needed.

@liurenjie1024
Copy link
Contributor

But it will wait to return until all pre-fetch IO is finished.

I'm unsure about the implementation. However, in my mind, isn't pre-fetching a background task? Why do we have to make it synchronous? 👀

So we have two methods here

  1. divide the scan range into pieces and make the Scan concurrent in the executor
  2. make the Scan's performance totally dependent on the block cache, and optimize the cache's prefetch logic with concurent IO.

I am not sure which one is better... @hzxa21 any idea?

I don't think the scan executor should think about block cache or io. It should delegate the request to storage layer, which is supposed to have an io manager to schedule io request.

@hzxa21
Copy link
Collaborator

hzxa21 commented Oct 17, 2023

But it will wait to return until all pre-fetch IO is finished.

I'm unsure about the implementation. However, in my mind, isn't pre-fetching a background task? Why do we have to make it synchronous? 👀

So we have two methods here

  1. divide the scan range into pieces and make the Scan concurrent in the executor
  2. make the Scan's performance totally dependent on the block cache, and optimize the cache's prefetch logic with concurent IO.

I am not sure which one is better... @hzxa21 any idea?

I don't think the scan executor should think about block cache or io. It should delegate the request to storage layer, which is supposed to have an io manager to schedule io request.

Scan executor shouldn't worry about that. IMO, under the current implementation, storage table should be responsible for concurrently poll the iterator.

Copy link
Contributor

@lmatz lmatz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More sysbench results have been posted in the Slack channel, the numbers LOKTM

@st1page st1page added this pull request to the merge queue Oct 20, 2023
Merged via the queue into main with commit 37d0a79 Oct 20, 2023
@st1page st1page deleted the sts/concurrent_scan branch October 20, 2023 11:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants