Skip to content

Commit

Permalink
feat: concurrent batch scan (#12825)
Browse files Browse the repository at this point in the history
Co-authored-by: lmatz <[email protected]>
  • Loading branch information
st1page and lmatz authored Oct 20, 2023
1 parent 852b427 commit 37d0a79
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 30 deletions.
12 changes: 6 additions & 6 deletions e2e_test/streaming/watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ sleep 10s

skipif in-memory
query TI
select * from mv1;
select * from mv1 order by 2;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3

skipif in-memory
query TI
select * from mv2;
select * from mv2 order by 2;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 2
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 3
2023-05-06 16:51:00 3

statement ok
Expand Down
31 changes: 7 additions & 24 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::{Index, RangeBounds};
use std::sync::Arc;
Expand Down Expand Up @@ -46,7 +45,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{PrefetchOptions, ReadOptions};
use crate::table::merge_sort::merge_sort;
use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter, DEFAULT_VNODE};
use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter};
use crate::StateStore;

/// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with
Expand Down Expand Up @@ -418,34 +417,15 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
_ => CachePolicy::Fill(CachePriority::High),
};

let raw_key_ranges = if !ordered
&& matches!(encoded_key_range.start_bound(), Unbounded)
&& matches!(encoded_key_range.end_bound(), Unbounded)
{
// If the range is unbounded and order is not required, we can create a single iterator
// for each continuous vnode range.

// In this case, the `vnode_hint` must be default for singletons and `None` for
// distributed tables.
assert_eq!(vnode_hint.unwrap_or(DEFAULT_VNODE), DEFAULT_VNODE);

Either::Left(self.vnodes.vnode_ranges().map(|r| {
let start = Included(Bytes::copy_from_slice(&r.start().to_be_bytes()[..]));
let end = end_bound_of_prefix(&r.end().to_be_bytes());
assert_matches!(end, Excluded(_) | Unbounded);
(start, end)
}))
} else {
let raw_key_ranges = {
// Vnodes that are set and should be accessed.
let vnodes = match vnode_hint {
// If `vnode_hint` is set, we can only access this single vnode.
Some(vnode) => Either::Left(std::iter::once(vnode)),
// Otherwise, we need to access all vnodes of this table.
None => Either::Right(self.vnodes.iter_vnodes()),
};
Either::Right(
vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes())),
)
vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes()))
};

// For each key range, construct an iterator.
Expand Down Expand Up @@ -493,7 +473,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
0 => unreachable!(),
1 => iterators.into_iter().next().unwrap(),
// Concat all iterators if not to preserve order.
_ if !ordered => futures::stream::iter(iterators).flatten(),
_ if !ordered => {
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec())
.flatten_unordered(1024)
}
// Merge all iterators if to preserve order.
_ => merge_sort(iterators.into_iter().map(Box::pin).collect()),
};
Expand Down

0 comments on commit 37d0a79

Please sign in to comment.