Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent batch scan #12825

Merged
merged 7 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions e2e_test/streaming/watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ sleep 10s

skipif in-memory
query TI
select * from mv1;
select * from mv1 order by 2;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3

skipif in-memory
query TI
select * from mv2;
select * from mv2 order by 2;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 2
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 3
2023-05-06 16:51:00 3

statement ok
Expand Down
31 changes: 7 additions & 24 deletions src/storage/src/table/batch_table/storage_table.rs
st1page marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::{Index, RangeBounds};
use std::sync::Arc;
Expand Down Expand Up @@ -46,7 +45,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{PrefetchOptions, ReadOptions};
use crate::table::merge_sort::merge_sort;
use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter, DEFAULT_VNODE};
use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter};
use crate::StateStore;

/// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with
Expand Down Expand Up @@ -418,34 +417,15 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
_ => CachePolicy::Fill(CachePriority::High),
};

let raw_key_ranges = if !ordered
&& matches!(encoded_key_range.start_bound(), Unbounded)
&& matches!(encoded_key_range.end_bound(), Unbounded)
{
// If the range is unbounded and order is not required, we can create a single iterator
// for each continuous vnode range.
Comment on lines -421 to -426
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess removing this can significantly hurt the performance for a full scan on a small table (like Top-10 or simple aggregation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I will add a session varible to control it but I am not sure about which default value should be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or can we get some table's cardinarity statics here or in optimizer? c.c. @hzxa21 @wcy-fdu

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have maintained the physical key count in meta. Currently CN doesn't have a way to get the key count stats but we can add it if needed.


// In this case, the `vnode_hint` must be default for singletons and `None` for
// distributed tables.
assert_eq!(vnode_hint.unwrap_or(DEFAULT_VNODE), DEFAULT_VNODE);

Either::Left(self.vnodes.vnode_ranges().map(|r| {
let start = Included(Bytes::copy_from_slice(&r.start().to_be_bytes()[..]));
let end = end_bound_of_prefix(&r.end().to_be_bytes());
assert_matches!(end, Excluded(_) | Unbounded);
(start, end)
}))
} else {
let raw_key_ranges = {
// Vnodes that are set and should be accessed.
let vnodes = match vnode_hint {
// If `vnode_hint` is set, we can only access this single vnode.
Some(vnode) => Either::Left(std::iter::once(vnode)),
// Otherwise, we need to access all vnodes of this table.
None => Either::Right(self.vnodes.iter_vnodes()),
};
Either::Right(
vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes())),
)
vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes()))
};

// For each key range, construct an iterator.
Expand Down Expand Up @@ -493,7 +473,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
0 => unreachable!(),
1 => iterators.into_iter().next().unwrap(),
// Concat all iterators if not to preserve order.
_ if !ordered => futures::stream::iter(iterators).flatten(),
_ if !ordered => {
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can pin it after buffer.

.flatten_unordered(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 10 maybe two small, 1024 maybe better?

st1page marked this conversation as resolved.
Show resolved Hide resolved
}
// Merge all iterators if to preserve order.
_ => merge_sort(iterators.into_iter().map(Box::pin).collect()),
};
Expand Down