diff --git a/e2e_test/streaming/watermark.slt b/e2e_test/streaming/watermark.slt index 4bbb5e1d8388..d1a29b88188c 100644 --- a/e2e_test/streaming/watermark.slt +++ b/e2e_test/streaming/watermark.slt @@ -25,7 +25,7 @@ sleep 10s skipif in-memory query TI -select * from mv1; +select * from mv1 order by 2; ---- 2023-05-06 16:51:00 1 2023-05-06 16:51:00 2 @@ -33,16 +33,16 @@ select * from mv1; skipif in-memory query TI -select * from mv2; +select * from mv2 order by 2; ---- 2023-05-06 16:51:00 1 -2023-05-06 16:51:00 2 -2023-05-06 16:51:00 3 2023-05-06 16:51:00 1 -2023-05-06 16:51:00 2 -2023-05-06 16:51:00 3 2023-05-06 16:51:00 1 2023-05-06 16:51:00 2 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 3 +2023-05-06 16:51:00 3 2023-05-06 16:51:00 3 statement ok diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index e4eb65b8b9f4..f784c4a5e7ca 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::ops::{Index, RangeBounds}; use std::sync::Arc; @@ -46,7 +45,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; use crate::store::{PrefetchOptions, ReadOptions}; use crate::table::merge_sort::merge_sort; -use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter, DEFAULT_VNODE}; +use crate::table::{compute_vnode, Distribution, KeyedRow, TableIter}; use crate::StateStore; /// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with @@ -418,24 +417,7 @@ impl StorageTableInner { _ => CachePolicy::Fill(CachePriority::High), }; - let raw_key_ranges = if !ordered - && matches!(encoded_key_range.start_bound(), Unbounded) - && matches!(encoded_key_range.end_bound(), Unbounded) - { - // If the range is unbounded and order is not required, we can create a single iterator - // for each continuous vnode range. - - // In this case, the `vnode_hint` must be default for singletons and `None` for - // distributed tables. - assert_eq!(vnode_hint.unwrap_or(DEFAULT_VNODE), DEFAULT_VNODE); - - Either::Left(self.vnodes.vnode_ranges().map(|r| { - let start = Included(Bytes::copy_from_slice(&r.start().to_be_bytes()[..])); - let end = end_bound_of_prefix(&r.end().to_be_bytes()); - assert_matches!(end, Excluded(_) | Unbounded); - (start, end) - })) - } else { + let raw_key_ranges = { // Vnodes that are set and should be accessed. let vnodes = match vnode_hint { // If `vnode_hint` is set, we can only access this single vnode. @@ -443,9 +425,7 @@ impl StorageTableInner { // Otherwise, we need to access all vnodes of this table. None => Either::Right(self.vnodes.iter_vnodes()), }; - Either::Right( - vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes())), - ) + vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes())) }; // For each key range, construct an iterator. @@ -493,7 +473,10 @@ impl StorageTableInner { 0 => unreachable!(), 1 => iterators.into_iter().next().unwrap(), // Concat all iterators if not to preserve order. - _ if !ordered => futures::stream::iter(iterators).flatten(), + _ if !ordered => { + futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) + .flatten_unordered(1024) + } // Merge all iterators if to preserve order. _ => merge_sort(iterators.into_iter().map(Box::pin).collect()), };