From c3660e005cdb2b6f110fb351f6e9e25d20e2014b Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Sep 2023 13:24:11 +0800 Subject: [PATCH 01/13] save work --- src/stream/src/common/table/state_table.rs | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index dfdd11732c942..afdc19a3c7fea 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1178,6 +1178,35 @@ where .await } + /// This function scans rows from the relational table with specific `pk_range` under the same + /// `vnode`. + pub async fn iter_row_with_pk_prefix_sub_range( + &self, + pk_prefix: impl Row, + sub_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { + // let (sub_range_start, sub_range_end) = prefix_range_to_memcomparable(&self.pk_serde, sub_range); + let (sub_range_start, sub_range_end) = sub_range; + let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); + let start_range = match sub_range_end { + Included(sub_range_start) => { + Bound::Included(pk_prefix.chain(*sub_range_start)) + }, + Excluded(start) => Bound::Excluded(pk_prefix.chain(*start)), + Unbounded => Bound::Included(pk_prefix.chain()), + }; + + // let end_range = match sub_range_end { + // Included(sub_range_end) => todo!(), + // Excluded(sub_range_end) => todo!(), + // Unbounded => todo!(), + // }; + + todo!() + } + + /// This function scans raw key-values from the relational table with specific `pk_range` under /// the same `vnode`. async fn iter_kv_with_pk_range( From 60f8bcc0ec90b066861c8c3436e0eb54750859f9 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Sep 2023 17:19:44 +0800 Subject: [PATCH 02/13] save work --- src/stream/src/common/table/state_table.rs | 148 +++++++++++++++--- .../src/common/table/test_state_table.rs | 96 ++++++++++++ 2 files changed, 225 insertions(+), 19 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index afdc19a3c7fea..6a0bcad67344f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -16,7 +16,8 @@ use std::ops::Bound; use std::ops::Bound::*; use std::sync::Arc; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use either::Either; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1178,35 +1179,92 @@ where .await } + // /// This function scans rows from the relational table with specific `pk_range` under the same + // /// `vnode`. + // pub async fn iter_row_with_pk_prefix_sub_range( + // &self, + // pk_prefix: impl Row, + // sub_range: &(Bound, Bound), + // prefetch_options: PrefetchOptions, + // ) -> StreamExecutorResult> { + // let (sub_range_start_bytes, sub_range_end_bytes) = + // prefix_range_to_memcomparable_v2(&self.pk_serde, sub_range); + // // let (sub_range_start_bytes, sub_range_end_bytes) = + // // sub_range; + // let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); + // let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); + + // let pk_prefix_bytes = Bytes::copy_from_slice(&encoded_prefix); + // let start_range = match sub_range_start_bytes { + // Included(start_bytes) => { + // Bound::Included((encoded_prefix.chain(start_bytes)).into_iter().collect()) + // } + // Excluded(start_bytes) => { + // Bound::Excluded(encoded_prefix.chain(start_bytes).into_iter().collect()) + // } + // Unbounded => Bound::Included(encoded_prefix), + // }; + + // let end_range = match sub_range_end_bytes { + // Included(end_bytes) => { + // Bound::Included((pk_prefix_bytes.chain(end_bytes)).into_iter().collect()) + // } + // Excluded(end_bytes) => { + // Bound::Excluded(pk_prefix_bytes.chain(end_bytes).into_iter().collect()) + // } + // Unbounded => end_bound_of_prefix(&pk_prefix_bytes), + // }; + // println!("这里{:?}", (start_range.clone(), end_range.clone())); + // Ok(deserialize_keyed_row_stream( + // self.iter_kv((start_range, end_range), None, prefetch_options) + // .await?, + // &self.row_serde, + // )) + // } + + + /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. - pub async fn iter_row_with_pk_prefix_sub_range( + pub async fn xxx( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - // let (sub_range_start, sub_range_end) = prefix_range_to_memcomparable(&self.pk_serde, sub_range); - let (sub_range_start, sub_range_end) = sub_range; - let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - let start_range = match sub_range_end { - Included(sub_range_start) => { - Bound::Included(pk_prefix.chain(*sub_range_start)) - }, - Excluded(start) => Bound::Excluded(pk_prefix.chain(*start)), - Unbounded => Bound::Included(pk_prefix.chain()), + let (start, end) = sub_range; + let pk_prefix = pk_prefix.to_owned_row(); + let pk_prefix_v2 = pk_prefix.clone(); + let pk_prefix_v3 = pk_prefix.clone(); + let start_range = match start { + Included(start_bytes) => { + Bound::Included((&pk_prefix.chain(start_bytes)).to_owned_row()) + } + Excluded(start_bytes) => { + Bound::Excluded((&pk_prefix.chain(start_bytes)).to_owned_row()) + } + Unbounded => Bound::Included(pk_prefix), }; - // let end_range = match sub_range_end { - // Included(sub_range_end) => todo!(), - // Excluded(sub_range_end) => todo!(), - // Unbounded => todo!(), - // }; - - todo!() + let end_range = match end { + Included(start_bytes) => { + Bound::Included((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) + } + Excluded(start_bytes) => { + Bound::Excluded((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) + } + Unbounded => Unbounded, + }; + println!("这里range {:?}", (start_range.clone(), end_range.clone())); + let a = prefix_range_to_memcomparable_v2(&self.pk_serde,&(start_range, end_range), pk_prefix_v3); + + Ok(deserialize_keyed_row_stream( + self.iter_kv(a, None, prefetch_options) + .await?, + &self.row_serde, + )) } - /// This function scans raw key-values from the relational table with specific `pk_range` under /// the same `vnode`. async fn iter_kv_with_pk_range( @@ -1314,6 +1372,17 @@ pub fn prefix_range_to_memcomparable( ) } +pub fn prefix_range_to_memcomparable_v2( + pk_serde: &OrderedRowSerde, + range: &(Bound, Bound), + pk_prefix: OwnedRow +) -> (Bound, Bound) { + ( + to_memcomparable_v2(pk_serde, &range.0, false, pk_prefix.clone()), + to_memcomparable_v2(pk_serde, &range.1, true, pk_prefix), + ) +} + fn to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, @@ -1344,3 +1413,44 @@ fn to_memcomparable( } } } + + +fn to_memcomparable_v2( + pk_serde: &OrderedRowSerde, + bound: &Bound, + is_upper: bool, + pk_prefix1: OwnedRow, +) -> Bound { + let serialize_pk_prefix = |pk_prefix: &R| { + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + serialize_pk(pk_prefix, &prefix_serializer) + }; + match bound { + Unbounded => { + if is_upper { + let prefix_serializer = pk_serde.prefix(pk_prefix1.len()); + let serialized = serialize_pk(pk_prefix1, &prefix_serializer); + Included(serialized) + } else { + Unbounded + } + }, + Included(r) => { + let serialized = serialize_pk_prefix(r); + if is_upper { + end_bound_of_prefix(&serialized) + } else { + Included(serialized) + } + } + Excluded(r) => { + let serialized = serialize_pk_prefix(r); + if !is_upper { + // if lower + start_bound_of_excluded_prefix(&serialized) + } else { + Excluded(serialized) + } + } + } +} diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index c3e5759a47ae6..fb761b7ad0936 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; + use futures::{pin_mut, StreamExt}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -1833,3 +1835,97 @@ async fn test_state_table_watermark_cache_refill() { .as_scalar_ref_impl() ) } + +#[tokio::test] +async fn test_state_table_iter_prefix_sub_range() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let test_env = prepare_hummock_test_env().await; + + let order_types = vec![OrderType::ascending(), OrderType::ascending()]; + let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ColumnDesc::unnamed(column_ids[2], DataType::Int32), + ]; + let pk_index = vec![0_usize, 1_usize]; + let read_prefix_len_hint = 0; + let table = gen_prost_table( + TEST_TABLE_ID, + column_descs, + order_types, + pk_index, + read_prefix_len_hint, + ); + + test_env.register_table(table.clone()).await; + let mut state_table = + StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) + .await; + let mut epoch = EpochPair::new_test_epoch(1); + state_table.init_epoch(epoch); + + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ])); + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ])); + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ])); + + state_table.insert(OwnedRow::new(vec![ + Some(4_i32.into()), + Some(44_i32.into()), + Some(444_i32.into()), + ])); + + epoch.inc(); + state_table.commit(epoch).await.unwrap(); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + // let pk_prefix = OwnedRow::empty(); + + let pk_range = ( + std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])), + std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])), + ); + let iter = state_table + .xxx(pk_prefix, &pk_range, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + // assert_eq!( + // &OwnedRow::new(vec![ + // Some("abc".into()), + // Some("bbb".into()), + // Some("bbbb".into()), + // ]), + // res.as_ref() + // ); + + // let res = iter.next().await; + // assert!(res.is_none()); +} From 53808ba52963defad75ed50026788afd7ac636d3 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 13 Sep 2023 11:10:11 +0800 Subject: [PATCH 03/13] add iter_prefix_and_sub_range --- src/stream/src/common/table/state_table.rs | 173 +++++++----------- .../src/common/table/test_state_table.rs | 116 ++++++++++-- 2 files changed, 172 insertions(+), 117 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 6a0bcad67344f..341b915fa53df 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -16,8 +16,7 @@ use std::ops::Bound; use std::ops::Bound::*; use std::sync::Arc; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use either::Either; +use bytes::{BufMut, Bytes, BytesMut}; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1179,87 +1178,25 @@ where .await } - // /// This function scans rows from the relational table with specific `pk_range` under the same - // /// `vnode`. - // pub async fn iter_row_with_pk_prefix_sub_range( - // &self, - // pk_prefix: impl Row, - // sub_range: &(Bound, Bound), - // prefetch_options: PrefetchOptions, - // ) -> StreamExecutorResult> { - // let (sub_range_start_bytes, sub_range_end_bytes) = - // prefix_range_to_memcomparable_v2(&self.pk_serde, sub_range); - // // let (sub_range_start_bytes, sub_range_end_bytes) = - // // sub_range; - // let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - // let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); - - // let pk_prefix_bytes = Bytes::copy_from_slice(&encoded_prefix); - // let start_range = match sub_range_start_bytes { - // Included(start_bytes) => { - // Bound::Included((encoded_prefix.chain(start_bytes)).into_iter().collect()) - // } - // Excluded(start_bytes) => { - // Bound::Excluded(encoded_prefix.chain(start_bytes).into_iter().collect()) - // } - // Unbounded => Bound::Included(encoded_prefix), - // }; - - // let end_range = match sub_range_end_bytes { - // Included(end_bytes) => { - // Bound::Included((pk_prefix_bytes.chain(end_bytes)).into_iter().collect()) - // } - // Excluded(end_bytes) => { - // Bound::Excluded(pk_prefix_bytes.chain(end_bytes).into_iter().collect()) - // } - // Unbounded => end_bound_of_prefix(&pk_prefix_bytes), - // }; - // println!("这里{:?}", (start_range.clone(), end_range.clone())); - // Ok(deserialize_keyed_row_stream( - // self.iter_kv((start_range, end_range), None, prefetch_options) - // .await?, - // &self.row_serde, - // )) - // } - - - /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. - pub async fn xxx( + pub async fn iter_row_with_pk_prefix_sub_range( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - let (start, end) = sub_range; - let pk_prefix = pk_prefix.to_owned_row(); - let pk_prefix_v2 = pk_prefix.clone(); - let pk_prefix_v3 = pk_prefix.clone(); - let start_range = match start { - Included(start_bytes) => { - Bound::Included((&pk_prefix.chain(start_bytes)).to_owned_row()) - } - Excluded(start_bytes) => { - Bound::Excluded((&pk_prefix.chain(start_bytes)).to_owned_row()) - } - Unbounded => Bound::Included(pk_prefix), - }; + let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - let end_range = match end { - Included(start_bytes) => { - Bound::Included((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) - } - Excluded(start_bytes) => { - Bound::Excluded((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) - } - Unbounded => Unbounded, - }; - println!("这里range {:?}", (start_range.clone(), end_range.clone())); - let a = prefix_range_to_memcomparable_v2(&self.pk_serde,&(start_range, end_range), pk_prefix_v3); - + let memcomparable_range = prefix_and_sub_range_to_memcomparable( + &self.pk_serde, + sub_range, + pk_prefix.to_owned_row(), + ); + + let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); Ok(deserialize_keyed_row_stream( - self.iter_kv(a, None, prefetch_options) + self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options) .await?, &self.row_serde, )) @@ -1372,14 +1309,39 @@ pub fn prefix_range_to_memcomparable( ) } -pub fn prefix_range_to_memcomparable_v2( +pub fn prefix_and_sub_range_to_memcomparable( pk_serde: &OrderedRowSerde, - range: &(Bound, Bound), - pk_prefix: OwnedRow + sub_range: &(Bound, Bound), + pk_prefix: OwnedRow, ) -> (Bound, Bound) { + let (range_start, range_end) = sub_range; + let pk_prefix_calculate_start_range = pk_prefix.clone(); + let pk_prefix_calculate_end_range = pk_prefix.clone(); + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + let serialized_pk_prefix = serialize_pk(pk_prefix, &prefix_serializer); + + let start_range = match range_start { + Included(start_range) => { + Bound::Included((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) + } + Excluded(start_range) => { + Bound::Excluded((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) + } + Unbounded => Bound::Included(pk_prefix_calculate_start_range), + }; + + let end_range = match range_end { + Included(end_range) => { + Bound::Included((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) + } + Excluded(end_range) => { + Bound::Excluded((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) + } + Unbounded => Unbounded, + }; ( - to_memcomparable_v2(pk_serde, &range.0, false, pk_prefix.clone()), - to_memcomparable_v2(pk_serde, &range.1, true, pk_prefix), + start_range_to_memcomparable(pk_serde, &start_range), + end_range_to_memcomparable(pk_serde, &end_range, serialized_pk_prefix), ) } @@ -1414,12 +1376,9 @@ fn to_memcomparable( } } - -fn to_memcomparable_v2( +fn start_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, - is_upper: bool, - pk_prefix1: OwnedRow, ) -> Bound { let serialize_pk_prefix = |pk_prefix: &R| { let prefix_serializer = pk_serde.prefix(pk_prefix.len()); @@ -1427,30 +1386,40 @@ fn to_memcomparable_v2( }; match bound { Unbounded => { - if is_upper { - let prefix_serializer = pk_serde.prefix(pk_prefix1.len()); - let serialized = serialize_pk(pk_prefix1, &prefix_serializer); - Included(serialized) - } else { - Unbounded - } - }, + unreachable!() + } Included(r) => { let serialized = serialize_pk_prefix(r); - if is_upper { - end_bound_of_prefix(&serialized) - } else { - Included(serialized) - } + + Included(serialized) } Excluded(r) => { let serialized = serialize_pk_prefix(r); - if !is_upper { - // if lower - start_bound_of_excluded_prefix(&serialized) - } else { - Excluded(serialized) - } + + start_bound_of_excluded_prefix(&serialized) + } + } +} + +fn end_range_to_memcomparable( + pk_serde: &OrderedRowSerde, + bound: &Bound, + serialized_pk_prefix: Bytes, +) -> Bound { + let serialize_pk_prefix = |pk_prefix: &R| { + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + serialize_pk(pk_prefix, &prefix_serializer) + }; + match bound { + Unbounded => end_bound_of_prefix(&serialized_pk_prefix), + Included(r) => { + let serialized = serialize_pk_prefix(r); + + end_bound_of_prefix(&serialized) + } + Excluded(r) => { + let serialized = serialize_pk_prefix(r); + Excluded(serialized) } } } diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index fb761b7ad0936..2f5dc3202adb9 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1837,7 +1837,7 @@ async fn test_state_table_watermark_cache_refill() { } #[tokio::test] -async fn test_state_table_iter_prefix_sub_range() { +async fn test_state_table_iter_prefix_and_sub_range() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; let test_env = prepare_hummock_test_env().await; @@ -1891,14 +1891,52 @@ async fn test_state_table_iter_prefix_sub_range() { state_table.commit(epoch).await.unwrap(); let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); - // let pk_prefix = OwnedRow::empty(); - let pk_range = ( + let sub_range1 = ( std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])), std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])), ); + + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); + + let sub_range2: (Bound, Bound) = ( + std::ops::Bound::Excluded(OwnedRow::new(vec![Some(11_i32.into())])), + std::ops::Bound::Unbounded, + ); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let iter = state_table - .xxx(pk_prefix, &pk_range, Default::default()) + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default()) .await .unwrap(); @@ -1906,6 +1944,43 @@ async fn test_state_table_iter_prefix_sub_range() { let res = iter.next().await.unwrap().unwrap(); + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); + + let sub_range3: (Bound, Bound) = ( + std::ops::Bound::Unbounded, + std::ops::Bound::Included(OwnedRow::new(vec![Some(33_i32.into())])), + ); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + let res = iter.next().await.unwrap().unwrap(); + assert_eq!( &OwnedRow::new(vec![ Some(1_i32.into()), @@ -1917,15 +1992,26 @@ async fn test_state_table_iter_prefix_sub_range() { let res = iter.next().await.unwrap().unwrap(); - // assert_eq!( - // &OwnedRow::new(vec![ - // Some("abc".into()), - // Some("bbb".into()), - // Some("bbbb".into()), - // ]), - // res.as_ref() - // ); - - // let res = iter.next().await; - // assert!(res.is_none()); + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); } From 2529231c9184f31821073131e22f638d9ed4d463 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 13 Sep 2023 11:13:52 +0800 Subject: [PATCH 04/13] add comments --- src/stream/src/common/table/state_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 341b915fa53df..c3615ded2ea5c 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1178,7 +1178,7 @@ where .await } - /// This function scans rows from the relational table with specific `pk_range` under the same + /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same /// `vnode`. pub async fn iter_row_with_pk_prefix_sub_range( &self, From c64444c33b0317f09e3b80f76a0fd98917c5b5d9 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 14 Sep 2023 18:05:09 +0800 Subject: [PATCH 05/13] unify some code --- src/stream/src/common/table/state_table.rs | 48 ++++------------------ 1 file changed, 9 insertions(+), 39 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index c3615ded2ea5c..c1a7de2f03b87 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1304,8 +1304,8 @@ pub fn prefix_range_to_memcomparable( range: &(Bound, Bound), ) -> (Bound, Bound) { ( - to_memcomparable(pk_serde, &range.0, false), - to_memcomparable(pk_serde, &range.1, true), + start_range_to_memcomparable(pk_serde, &range.0), + end_range_to_memcomparable(pk_serde, &range.1, None), ) } @@ -1341,41 +1341,10 @@ pub fn prefix_and_sub_range_to_memcomparable( }; ( start_range_to_memcomparable(pk_serde, &start_range), - end_range_to_memcomparable(pk_serde, &end_range, serialized_pk_prefix), + end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)), ) } -fn to_memcomparable( - pk_serde: &OrderedRowSerde, - bound: &Bound, - is_upper: bool, -) -> Bound { - let serialize_pk_prefix = |pk_prefix: &R| { - let prefix_serializer = pk_serde.prefix(pk_prefix.len()); - serialize_pk(pk_prefix, &prefix_serializer) - }; - match bound { - Unbounded => Unbounded, - Included(r) => { - let serialized = serialize_pk_prefix(r); - if is_upper { - end_bound_of_prefix(&serialized) - } else { - Included(serialized) - } - } - Excluded(r) => { - let serialized = serialize_pk_prefix(r); - if !is_upper { - // if lower - start_bound_of_excluded_prefix(&serialized) - } else { - Excluded(serialized) - } - } - } -} - fn start_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, @@ -1385,9 +1354,7 @@ fn start_range_to_memcomparable( serialize_pk(pk_prefix, &prefix_serializer) }; match bound { - Unbounded => { - unreachable!() - } + Unbounded => Unbounded, Included(r) => { let serialized = serialize_pk_prefix(r); @@ -1404,14 +1371,17 @@ fn start_range_to_memcomparable( fn end_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, - serialized_pk_prefix: Bytes, + serialized_pk_prefix: Option, ) -> Bound { let serialize_pk_prefix = |pk_prefix: &R| { let prefix_serializer = pk_serde.prefix(pk_prefix.len()); serialize_pk(pk_prefix, &prefix_serializer) }; match bound { - Unbounded => end_bound_of_prefix(&serialized_pk_prefix), + Unbounded => match serialized_pk_prefix { + Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix), + None => Unbounded, + }, Included(r) => { let serialized = serialize_pk_prefix(r); From 7981a646cdc9c7386427c0d7cfbbb9d66423b1d0 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 14 Sep 2023 20:03:06 +0800 Subject: [PATCH 06/13] todo: refactor executor side, remove iter_row_with_pk_prefix interface --- e2e_test/streaming/aggregate/boolean.slt | 67 ------------- src/stream/src/common/table/state_table.rs | 99 +++++++++---------- .../src/common/table/test_state_table.rs | 4 +- 3 files changed, 52 insertions(+), 118 deletions(-) diff --git a/e2e_test/streaming/aggregate/boolean.slt b/e2e_test/streaming/aggregate/boolean.slt index 86cf018fda4e1..e69de29bb2d1d 100644 --- a/e2e_test/streaming/aggregate/boolean.slt +++ b/e2e_test/streaming/aggregate/boolean.slt @@ -1,67 +0,0 @@ -statement ok -SET RW_IMPLICIT_FLUSH TO true; - -statement ok -create table t (v boolean); - -statement ok -create materialized view mv as select - bool_and(v), - bool_or(v) -from t; - -query BB -select * from mv; ----- -NULL NULL - - -statement ok -insert into t values (true); - -# table values: true - -query BB -select * from mv; ----- -t t - - -statement ok -insert into t values (false); - -# table values: true, false - -query BB -select * from mv; ----- -f t - - -statement ok -delete from t where v = true; - -# table values: false - -query BB -select * from mv; ----- -f f - - -statement ok -delete from t; - -# table values: empty - -query BB -select * from mv; ----- -NULL NULL - - -statement ok -drop materialized view mv; - -statement ok -drop table t; diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 86c4f9d1e7624..07cf443a829d0 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1095,7 +1095,8 @@ where &self, prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - self.iter_row_with_pk_prefix(row::empty(), prefetch_options) + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); + self.iter_row_with_pk_prefix_sub_range(row::empty(), sub_range, prefetch_options) .await } @@ -1106,11 +1107,9 @@ where pk_prefix: impl Row, prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - Ok(deserialize_keyed_row_stream( - self.iter_kv_with_pk_prefix(pk_prefix, prefetch_options) - .await?, - &self.row_serde, - )) + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); + self.iter_row_with_pk_prefix_sub_range(pk_prefix, sub_range, prefetch_options) + .await } /// This function scans rows from the relational table with specific `pk_range` under the same @@ -1150,50 +1149,50 @@ where Ok(self.local_store.iter(key_range, read_options).await?) } - /// This function scans raw key-values from the relational table with specific `pk_prefix`. - /// `pk_prefix` is used to identify the exact vnode the scan should perform on. - async fn iter_kv_with_pk_prefix( - &self, - pk_prefix: impl Row, - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult<::IterStream<'_>> { - let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); - let encoded_key_range = range_of_prefix(&encoded_prefix); - - // We assume that all usages of iterating the state table only access a single vnode. - // If this assertion fails, then something must be wrong with the operator implementation or - // the distribution derivation from the optimizer. - let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode); - - // Construct prefix hint for prefix bloom filter. - let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()]; - if self.prefix_hint_len != 0 { - debug_assert_eq!(self.prefix_hint_len, pk_prefix.len()); - } - let prefix_hint = { - if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() { - None - } else { - let encoded_prefix_len = self - .pk_serde - .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?; - - Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec())) - } - }; - - trace!( - table_id = %self.table_id(), - ?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix, - ?pk_prefix_indices, - "storage_iter_with_prefix" - ); - - self.iter_kv(encoded_key_range_with_vnode, prefix_hint, prefetch_options) - .await - } + // /// This function scans raw key-values from the relational table with specific `pk_prefix`. + // /// `pk_prefix` is used to identify the exact vnode the scan should perform on. + // async fn iter_kv_with_pk_prefix( + // &self, + // pk_prefix: impl Row, + // prefetch_options: PrefetchOptions, + // ) -> StreamExecutorResult<::IterStream<'_>> { + // let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); + // let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); + // let encoded_key_range = range_of_prefix(&encoded_prefix); + + // // We assume that all usages of iterating the state table only access a single vnode. + // // If this assertion fails, then something must be wrong with the operator implementation or + // // the distribution derivation from the optimizer. + // let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); + // let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode); + + // // Construct prefix hint for prefix bloom filter. + // let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()]; + // if self.prefix_hint_len != 0 { + // debug_assert_eq!(self.prefix_hint_len, pk_prefix.len()); + // } + // let prefix_hint = { + // if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() { + // None + // } else { + // let encoded_prefix_len = self + // .pk_serde + // .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?; + + // Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec())) + // } + // }; + + // trace!( + // table_id = %self.table_id(), + // ?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix, + // ?pk_prefix_indices, + // "storage_iter_with_prefix" + // ); + + // self.iter_kv(encoded_key_range_with_vnode, prefix_hint, prefetch_options) + // .await + // } /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same /// `vnode`. diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 2f5dc3202adb9..f4328b5ebd461 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::ops::Bound; +use std::ops::Bound::Unbounded; use futures::{pin_mut, StreamExt}; use risingwave_common::array::{Op, StreamChunk}; @@ -282,8 +283,9 @@ async fn test_state_table_iter_with_prefix() { ])); let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let iter = state_table - .iter_row_with_pk_prefix(&pk_prefix, Default::default()) + .iter_row_with_pk_prefix_sub_range(&pk_prefix, sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); From 0337172adfa060c8b98c11f0dfadd50cce648042 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 15 Sep 2023 14:29:51 +0800 Subject: [PATCH 07/13] remove iter_prefix --- src/stream/src/common/table/state_table.rs | 63 +------------------ src/stream/src/executor/aggregation/minput.rs | 10 ++- .../src/executor/managed_state/join/mod.rs | 29 ++++++--- src/stream/src/executor/over_window/eowc.rs | 8 ++- .../executor/over_window/over_partition.rs | 4 +- src/stream/src/executor/top_n/top_n_state.rs | 14 +++-- 6 files changed, 49 insertions(+), 79 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 07cf443a829d0..871698bf225f6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1100,18 +1100,6 @@ where .await } - /// This function scans rows from the relational table with specific `pk_prefix`. - /// `pk_prefix` is used to identify the exact vnode the scan should perform on. - pub async fn iter_row_with_pk_prefix( - &self, - pk_prefix: impl Row, - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { - let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); - self.iter_row_with_pk_prefix_sub_range(pk_prefix, sub_range, prefetch_options) - .await - } - /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. pub async fn iter_row_with_pk_range( @@ -1149,53 +1137,9 @@ where Ok(self.local_store.iter(key_range, read_options).await?) } - // /// This function scans raw key-values from the relational table with specific `pk_prefix`. - // /// `pk_prefix` is used to identify the exact vnode the scan should perform on. - // async fn iter_kv_with_pk_prefix( - // &self, - // pk_prefix: impl Row, - // prefetch_options: PrefetchOptions, - // ) -> StreamExecutorResult<::IterStream<'_>> { - // let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - // let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); - // let encoded_key_range = range_of_prefix(&encoded_prefix); - - // // We assume that all usages of iterating the state table only access a single vnode. - // // If this assertion fails, then something must be wrong with the operator implementation or - // // the distribution derivation from the optimizer. - // let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - // let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode); - - // // Construct prefix hint for prefix bloom filter. - // let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()]; - // if self.prefix_hint_len != 0 { - // debug_assert_eq!(self.prefix_hint_len, pk_prefix.len()); - // } - // let prefix_hint = { - // if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() { - // None - // } else { - // let encoded_prefix_len = self - // .pk_serde - // .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?; - - // Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec())) - // } - // }; - - // trace!( - // table_id = %self.table_id(), - // ?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix, - // ?pk_prefix_indices, - // "storage_iter_with_prefix" - // ); - - // self.iter_kv(encoded_key_range_with_vnode, prefix_hint, prefetch_options) - // .await - // } - - /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same - /// `vnode`. + /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same + /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. + /// `pk_prefix` is used to identify the exact vnode the scan should perform on. pub async fn iter_row_with_pk_prefix_sub_range( &self, pk_prefix: impl Row, @@ -1203,7 +1147,6 @@ where prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - let memcomparable_range = prefix_and_sub_range_to_memcomparable( &self.pk_serde, sub_range, diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 0b50875adf847..7c863561d5c13 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound::{self}; + use futures::{pin_mut, StreamExt}; use futures_async_stream::for_await; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::EstimateSize; -use risingwave_common::row::RowExt; +use risingwave_common::row::{OwnedRow, RowExt}; use risingwave_common::types::Datum; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; @@ -182,10 +184,12 @@ impl MaterializedInputState { ) -> StreamExecutorResult { if !self.cache.is_synced() { let mut cache_filler = self.cache.begin_syncing(); - + let sub_range: &(Bound, Bound) = + &(Bound::Unbounded, Bound::Unbounded); let all_data_iter = state_table - .iter_row_with_pk_prefix( + .iter_row_with_pk_prefix_sub_range( group_key.map(GroupKey::table_pk), + sub_range, PrefetchOptions { exhaust_iter: cache_filler.capacity().is_none(), }, diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index 7ee23c06a5631..d3890171ee5cd 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -15,7 +15,8 @@ mod join_entry_state; use std::alloc::Global; -use std::ops::{Deref, DerefMut}; +use std::ops::Bound::Unbounded; +use std::ops::{Bound, Deref, DerefMut}; use std::sync::Arc; use futures::future::try_join; @@ -402,14 +403,17 @@ impl JoinHashMap { let mut entry_state = JoinEntryState::default(); if self.need_degree_table { - let table_iter_fut = self - .state - .table - .iter_row_with_pk_prefix(&key, PrefetchOptions::new_for_exhaust_iter()); - let degree_table_iter_fut = self - .degree_state - .table - .iter_row_with_pk_prefix(&key, PrefetchOptions::new_for_exhaust_iter()); + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); + let table_iter_fut = self.state.table.iter_row_with_pk_prefix_sub_range( + &key, + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ); + let degree_table_iter_fut = self.degree_state.table.iter_row_with_pk_prefix_sub_range( + &key, + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ); let (table_iter, degree_table_iter) = try_join(table_iter_fut, degree_table_iter_fut).await?; @@ -437,10 +441,15 @@ impl JoinHashMap { ); } } else { + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let table_iter = self .state .table - .iter_row_with_pk_prefix(&key, PrefetchOptions::new_for_exhaust_iter()) + .iter_row_with_pk_prefix_sub_range( + &key, + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await?; #[for_await] diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 22553641369c1..92d17a886645e 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::marker::PhantomData; +use std::ops::Bound; use futures::StreamExt; use futures_async_stream::{for_await, try_stream}; @@ -195,10 +196,15 @@ impl EowcOverWindowExecutor { curr_row_buffer: Default::default(), }; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); // Recover states from state table. let table_iter = this .state_table - .iter_row_with_pk_prefix(partition_key, PrefetchOptions::new_for_exhaust_iter()) + .iter_row_with_pk_prefix_sub_range( + partition_key, + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await?; #[for_await] diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index ab785acd9b681..10a38aec4c9ba 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -456,9 +456,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> { tracing::debug!(partition=?self.this_partition_key, "loading the whole partition into cache"); let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let table_iter = table - .iter_row_with_pk_prefix( + .iter_row_with_pk_prefix_sub_range( self.this_partition_key, + sub_range, PrefetchOptions::new_for_exhaust_iter(), ) .await?; diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 87d19e8550861..ea1705affa703 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; + use futures::{pin_mut, StreamExt}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::util::epoch::EpochPair; @@ -81,9 +83,10 @@ impl ManagedTopNState { offset: usize, limit: Option, ) -> StreamExecutorResult> { + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .iter_row_with_pk_prefix(&group_key, Default::default()) + .iter_row_with_pk_prefix_sub_range(&group_key, sub_range, Default::default()) .await?; pin_mut!(state_table_iter); @@ -118,10 +121,12 @@ impl ManagedTopNState { cache_size_limit: usize, ) -> StreamExecutorResult<()> { let cache = &mut topn_cache.high; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .iter_row_with_pk_prefix( + .iter_row_with_pk_prefix_sub_range( &group_key, + sub_range, PrefetchOptions { exhaust_iter: cache_size_limit == usize::MAX, }, @@ -165,11 +170,12 @@ impl ManagedTopNState { assert!(topn_cache.low.is_empty()); assert!(topn_cache.middle.is_empty()); assert!(topn_cache.high.is_empty()); - + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .iter_row_with_pk_prefix( + .iter_row_with_pk_prefix_sub_range( &group_key, + sub_range, PrefetchOptions { exhaust_iter: topn_cache.limit == usize::MAX, }, From f7df04fa0f12977c7aee73f3543927e123c72de8 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 15 Sep 2023 15:51:13 +0800 Subject: [PATCH 08/13] resolve comments --- src/stream/src/common/table/state_table.rs | 36 +++++++--------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 86c4f9d1e7624..045fb1fdaeba9 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -17,6 +17,7 @@ use std::ops::Bound::*; use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; +use either::Either; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1205,11 +1206,8 @@ where ) -> StreamExecutorResult> { let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - let memcomparable_range = prefix_and_sub_range_to_memcomparable( - &self.pk_serde, - sub_range, - pk_prefix.to_owned_row(), - ); + let memcomparable_range = + prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix); let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); Ok(deserialize_keyed_row_stream( @@ -1326,34 +1324,22 @@ pub fn prefix_range_to_memcomparable( ) } -pub fn prefix_and_sub_range_to_memcomparable( +fn prefix_and_sub_range_to_memcomparable( pk_serde: &OrderedRowSerde, sub_range: &(Bound, Bound), - pk_prefix: OwnedRow, + pk_prefix: impl Row, ) -> (Bound, Bound) { let (range_start, range_end) = sub_range; - let pk_prefix_calculate_start_range = pk_prefix.clone(); - let pk_prefix_calculate_end_range = pk_prefix.clone(); let prefix_serializer = pk_serde.prefix(pk_prefix.len()); - let serialized_pk_prefix = serialize_pk(pk_prefix, &prefix_serializer); - + let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer); let start_range = match range_start { - Included(start_range) => { - Bound::Included((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) - } - Excluded(start_range) => { - Bound::Excluded((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) - } - Unbounded => Bound::Included(pk_prefix_calculate_start_range), + Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))), + Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))), + Unbounded => Bound::Included(Either::Right(&pk_prefix)), }; - let end_range = match range_end { - Included(end_range) => { - Bound::Included((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) - } - Excluded(end_range) => { - Bound::Excluded((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) - } + Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)), + Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)), Unbounded => Unbounded, }; ( From 2e78075bb16a1f7860c6a6ae6753e561076d8bbe Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 18 Sep 2023 10:20:41 +0800 Subject: [PATCH 09/13] revert e2e test --- e2e_test/streaming/aggregate/boolean.slt | 67 ++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/e2e_test/streaming/aggregate/boolean.slt b/e2e_test/streaming/aggregate/boolean.slt index e69de29bb2d1d..c7e37afc463eb 100644 --- a/e2e_test/streaming/aggregate/boolean.slt +++ b/e2e_test/streaming/aggregate/boolean.slt @@ -0,0 +1,67 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t (v boolean); + +statement ok +create materialized view mv as select + bool_and(v), + bool_or(v) +from t; + +query BB +select * from mv; +---- +NULL NULL + + +statement ok +insert into t values (true); + +# table values: true + +query BB +select * from mv; +---- +t t + + +statement ok +insert into t values (false); + +# table values: true, false + +query BB +select * from mv; +---- +f t + + +statement ok +delete from t where v = true; + +# table values: false + +query BB +select * from mv; +---- +f f + + +statement ok +delete from t; + +# table values: empty + +query BB +select * from mv; +---- +NULL NULL + + +statement ok +drop materialized view mv; + +statement ok +drop table t; \ No newline at end of file From de3ffb35c62dd36365c79bf0c53922de5c649bdf Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 18 Sep 2023 10:21:44 +0800 Subject: [PATCH 10/13] revert e2e test --- e2e_test/streaming/aggregate/boolean.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/streaming/aggregate/boolean.slt b/e2e_test/streaming/aggregate/boolean.slt index c7e37afc463eb..86cf018fda4e1 100644 --- a/e2e_test/streaming/aggregate/boolean.slt +++ b/e2e_test/streaming/aggregate/boolean.slt @@ -64,4 +64,4 @@ statement ok drop materialized view mv; statement ok -drop table t; \ No newline at end of file +drop table t; From 9dcf2e5fc949d8b29ef858c62847df07676f1d6a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 18 Sep 2023 14:34:15 +0800 Subject: [PATCH 11/13] rename --- src/stream/src/common/table/state_table.rs | 8 ++++---- src/stream/src/common/table/test_state_table.rs | 15 +++++++-------- src/stream/src/executor/aggregation/minput.rs | 2 +- .../src/executor/backfill/arrangement_backfill.rs | 2 +- src/stream/src/executor/dynamic_filter.rs | 10 +++------- src/stream/src/executor/managed_state/join/mod.rs | 10 +++------- src/stream/src/executor/now.rs | 13 +------------ src/stream/src/executor/over_window/eowc.rs | 2 +- .../src/executor/over_window/over_partition.rs | 12 ++++-------- src/stream/src/executor/sort_buffer.rs | 2 +- .../src/executor/source/state_table_handler.rs | 2 +- src/stream/src/executor/top_n/top_n_state.rs | 6 +++--- 12 files changed, 30 insertions(+), 54 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index bd07eba676dda..a426587dba1c6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -933,7 +933,7 @@ where let mut streams = vec![]; for vnode in self.vnodes().iter_vnodes() { let stream = self - .iter_row_with_pk_range(&range, vnode, PrefetchOptions::default()) + .vnode_iter_row(&range, vnode, PrefetchOptions::default()) .await?; streams.push(Box::pin(stream)); } @@ -1097,13 +1097,13 @@ where prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); - self.iter_row_with_pk_prefix_sub_range(row::empty(), sub_range, prefetch_options) + self.prefix_iter_row(row::empty(), sub_range, prefetch_options) .await } /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. - pub async fn iter_row_with_pk_range( + pub async fn vnode_iter_row( &self, pk_range: &(Bound, Bound), // Optional vnode that returns an iterator only over the given range under that vnode. @@ -1141,7 +1141,7 @@ where /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. - pub async fn iter_row_with_pk_prefix_sub_range( + pub async fn prefix_iter_row( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index f4328b5ebd461..ddc20ae7719d0 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::ops::Bound; -use std::ops::Bound::Unbounded; use futures::{pin_mut, StreamExt}; use risingwave_common::array::{Op, StreamChunk}; @@ -283,9 +282,9 @@ async fn test_state_table_iter_with_prefix() { ])); let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); - let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let iter = state_table - .iter_row_with_pk_prefix_sub_range(&pk_prefix, sub_range, Default::default()) + .prefix_iter_row(&pk_prefix, sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -416,7 +415,7 @@ async fn test_state_table_iter_with_pk_range() { std::ops::Bound::Included(OwnedRow::new(vec![Some(4_i32.into())])), ); let iter = state_table - .iter_row_with_pk_range(&pk_range, DEFAULT_VNODE, Default::default()) + .vnode_iter_row(&pk_range, DEFAULT_VNODE, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -441,7 +440,7 @@ async fn test_state_table_iter_with_pk_range() { std::ops::Bound::::Unbounded, ); let iter = state_table - .iter_row_with_pk_range(&pk_range, DEFAULT_VNODE, Default::default()) + .vnode_iter_row(&pk_range, DEFAULT_VNODE, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -1900,7 +1899,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { ); let iter = state_table - .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default()) + .prefix_iter_row(pk_prefix, &sub_range1, Default::default()) .await .unwrap(); @@ -1938,7 +1937,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let iter = state_table - .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default()) + .prefix_iter_row(pk_prefix, &sub_range2, Default::default()) .await .unwrap(); @@ -1976,7 +1975,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let iter = state_table - .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default()) + .prefix_iter_row(pk_prefix, &sub_range3, Default::default()) .await .unwrap(); diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 7c863561d5c13..964173ec8cdae 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -187,7 +187,7 @@ impl MaterializedInputState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let all_data_iter = state_table - .iter_row_with_pk_prefix_sub_range( + .prefix_iter_row( group_key.map(GroupKey::table_pk), sub_range, PrefetchOptions { diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index d33aed6d6c441..f28f4f5deb3e1 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -548,7 +548,7 @@ where let range_bounds = range_bounds.unwrap(); let vnode_row_iter = upstream_table - .iter_row_with_pk_range(&range_bounds, vnode, Default::default()) + .vnode_iter_row(&range_bounds, vnode, Default::default()) .await?; // TODO: Is there some way to avoid double-pin here? diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 3c4d36871bfe8..d2c01ad5e7109 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -22,7 +22,7 @@ use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::row::{once, OwnedRow as RowData, OwnedRow, Row}; +use risingwave_common::row::{once, OwnedRow as RowData, Row}; use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::{build_func, BoxedExpression, InputRefExpression, LiteralExpression}; @@ -231,12 +231,8 @@ impl DynamicFilterExecutor Result, StreamExecutorError> { - let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); // Recover value for RHS if available - let rhs_stream = self - .right_table - .iter_row_with_pk_prefix_sub_range(OwnedRow::empty(), sub_range, Default::default()) - .await?; + let rhs_stream = self.right_table.iter_row(Default::default()).await?; pin_mut!(rhs_stream); if let Some(res) = rhs_stream.next().await { @@ -385,7 +381,7 @@ impl DynamicFilterExecutor JoinHashMap { if self.need_degree_table { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); - let table_iter_fut = self.state.table.iter_row_with_pk_prefix_sub_range( + let table_iter_fut = self.state.table.prefix_iter_row( &key, sub_range, PrefetchOptions::new_for_exhaust_iter(), ); - let degree_table_iter_fut = self.degree_state.table.iter_row_with_pk_prefix_sub_range( + let degree_table_iter_fut = self.degree_state.table.prefix_iter_row( &key, sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -446,11 +446,7 @@ impl JoinHashMap { let table_iter = self .state .table - .iter_row_with_pk_prefix_sub_range( - &key, - sub_range, - PrefetchOptions::new_for_exhaust_iter(), - ) + .prefix_iter_row(&key, sub_range, PrefetchOptions::new_for_exhaust_iter()) .await?; #[for_await] diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index d0539fe1c7712..9748fffc59c5b 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound; - use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row; -use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_storage::StateStore; use tokio::sync::mpsc::UnboundedReceiver; @@ -82,16 +79,8 @@ impl NowExecutor { if !initialized { // Handle the first barrier. state_table.init_epoch(barrier.epoch); - let sub_range: &(Bound, Bound) = - &(Bound::Unbounded, Bound::Unbounded); let state_row = { - let data_iter = state_table - .iter_row_with_pk_prefix_sub_range( - OwnedRow::empty(), - sub_range, - Default::default(), - ) - .await?; + let data_iter = state_table.iter_row(Default::default()).await?; pin_mut!(data_iter); if let Some(keyed_row) = data_iter.next().await { Some(keyed_row?) diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 92d17a886645e..1b48e189b6c9d 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -200,7 +200,7 @@ impl EowcOverWindowExecutor { // Recover states from state table. let table_iter = this .state_table - .iter_row_with_pk_prefix_sub_range( + .prefix_iter_row( partition_key, sub_range, PrefetchOptions::new_for_exhaust_iter(), diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 10a38aec4c9ba..681d01c9d1ba5 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -458,7 +458,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let table_iter = table - .iter_row_with_pk_prefix_sub_range( + .prefix_iter_row( self.this_partition_key, sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -573,7 +573,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ) -> StreamExecutorResult<()> { let streams = stream::iter(table.vnode_bitmap().iter_vnodes()) .map(|vnode| { - table.iter_row_with_pk_range( + table.vnode_iter_row( &table_pk_range, vnode, PrefetchOptions::new_for_exhaust_iter(), @@ -653,11 +653,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ); let streams: Vec<_> = futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| { - table.iter_row_with_pk_range( - &pk_range, - vnode, - PrefetchOptions::new_for_exhaust_iter(), - ) + table.vnode_iter_row(&pk_range, vnode, PrefetchOptions::new_for_exhaust_iter()) })) .await? .into_iter() @@ -751,7 +747,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ); let streams: Vec<_> = futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| { - table.iter_row_with_pk_range(&pk_range, vnode, PrefetchOptions::default()) + table.vnode_iter_row(&pk_range, vnode, PrefetchOptions::default()) })) .await? .into_iter() diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index 709597109af14..1da8a041041c2 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -213,7 +213,7 @@ impl SortBuffer { let streams: Vec<_> = futures::future::try_join_all(buffer_table.vnode_bitmap().iter_vnodes().map(|vnode| { - buffer_table.iter_row_with_pk_range( + buffer_table.vnode_iter_row( &pk_range, vnode, PrefetchOptions::new_with_exhaust_iter(filler.capacity().is_none()), diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index f1ee9f0c90d4b..96c74a90ef40d 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -84,7 +84,7 @@ impl SourceStateTableHandler { // all source executor has vnode id zero let iter = self .state_store - .iter_row_with_pk_range( + .vnode_iter_row( &(start, end), VirtualNode::ZERO, PrefetchOptions::new_for_exhaust_iter(), diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index ea1705affa703..3d29f1e0709f6 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -86,7 +86,7 @@ impl ManagedTopNState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .iter_row_with_pk_prefix_sub_range(&group_key, sub_range, Default::default()) + .prefix_iter_row(&group_key, sub_range, Default::default()) .await?; pin_mut!(state_table_iter); @@ -124,7 +124,7 @@ impl ManagedTopNState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .iter_row_with_pk_prefix_sub_range( + .prefix_iter_row( &group_key, sub_range, PrefetchOptions { @@ -173,7 +173,7 @@ impl ManagedTopNState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .iter_row_with_pk_prefix_sub_range( + .prefix_iter_row( &group_key, sub_range, PrefetchOptions { From 9c339da3aafde07e8b58bb09fc25ee7a4a113c52 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 10 Oct 2023 15:34:31 +0800 Subject: [PATCH 12/13] remove iter_row --- src/ctl/src/cmd_impl/bench.rs | 11 +++- src/stream/src/common/table/state_table.rs | 10 --- .../src/common/table/test_state_table.rs | 66 ++++++++++++++----- src/stream/src/executor/dynamic_filter.rs | 8 ++- src/stream/src/executor/now.rs | 10 ++- 5 files changed, 73 insertions(+), 32 deletions(-) diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index 823febed50a43..6d689764f5987 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; +use std::ops::Bound::Unbounded; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; @@ -20,6 +22,7 @@ use anyhow::Result; use clap::Subcommand; use futures::future::try_join_all; use futures::{pin_mut, Future, StreamExt}; +use risingwave_common::row::{self, OwnedRow}; use risingwave_common::util::epoch::EpochPair; use risingwave_storage::store::PrefetchOptions; use size::Size; @@ -102,8 +105,14 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { tb }; loop { + let sub_range: &(Bound, Bound) = + &(Unbounded, Unbounded); let stream = state_table - .iter_row(PrefetchOptions::new_for_exhaust_iter()) + .prefix_iter_row( + row::empty(), + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await?; pin_mut!(stream); iter_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed); diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 66e77f57b3c8a..24695470c9299 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1088,16 +1088,6 @@ where S: StateStore, SD: ValueRowSerde, { - /// This function scans rows from the relational table. - pub async fn iter_row( - &self, - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { - let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); - self.prefix_iter_row(row::empty(), sub_range, prefetch_options) - .await - } - /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. pub async fn vnode_iter_row( diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 5346d18b33f98..d75a1bcd8ed04 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound; +use std::ops::Bound::{self, *}; use futures::{pin_mut, StreamExt}; use risingwave_common::array::{Op, StreamChunk}; @@ -577,9 +577,12 @@ async fn test_state_table_iter_with_value_indices() { Some(99_i32.into()), Some(999_i32.into()), ])); - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); { - let iter = state_table.iter_row(Default::default()).await.unwrap(); + let iter = state_table + .prefix_iter_row(row::empty(), sub_range, Default::default()) + .await + .unwrap(); pin_mut!(iter); let res = iter.next().await.unwrap().unwrap(); @@ -634,7 +637,10 @@ async fn test_state_table_iter_with_value_indices() { Some(888_i32.into()), ])); - let iter = state_table.iter_row(Default::default()).await.unwrap(); + let iter = state_table + .prefix_iter_row(row::empty(), sub_range, Default::default()) + .await + .unwrap(); pin_mut!(iter); let res = iter.next().await.unwrap().unwrap(); @@ -738,9 +744,12 @@ async fn test_state_table_iter_with_shuffle_value_indices() { Some(99_i32.into()), Some(999_i32.into()), ])); - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); { - let iter = state_table.iter_row(Default::default()).await.unwrap(); + let iter = state_table + .prefix_iter_row(row::empty(), sub_range, Default::default()) + .await + .unwrap(); pin_mut!(iter); let res = iter.next().await.unwrap().unwrap(); @@ -816,7 +825,10 @@ async fn test_state_table_iter_with_shuffle_value_indices() { Some(888_i32.into()), ])); - let iter = state_table.iter_row(Default::default()).await.unwrap(); + let iter = state_table + .prefix_iter_row(row::empty(), sub_range, Default::default()) + .await + .unwrap(); pin_mut!(iter); let res = iter.next().await.unwrap().unwrap(); @@ -1001,9 +1013,13 @@ async fn test_state_table_write_chunk() { ); state_table.write_chunk(chunk); - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let rows: Vec<_> = state_table - .iter_row(PrefetchOptions::new_for_exhaust_iter()) + .prefix_iter_row( + row::empty(), + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await .unwrap() .collect::>() @@ -1115,9 +1131,13 @@ async fn test_state_table_write_chunk_visibility() { StreamChunk::with_visibility(ops, columns, Bitmap::from_iter([true, true, true, false])); state_table.write_chunk(chunk); - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let rows: Vec<_> = state_table - .iter_row(PrefetchOptions::new_for_exhaust_iter()) + .prefix_iter_row( + row::empty(), + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await .unwrap() .collect::>() @@ -1227,9 +1247,13 @@ async fn test_state_table_write_chunk_value_indices() { ); state_table.write_chunk(chunk); - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let rows: Vec<_> = state_table - .iter_row(PrefetchOptions::new_for_exhaust_iter()) + .prefix_iter_row( + row::empty(), + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await .unwrap() .collect::>() @@ -1509,9 +1533,13 @@ async fn test_state_table_watermark_cache_ignore_null() { let chunk = StreamChunk::from_rows(&rows, &data_types); state_table.write_chunk(chunk); - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let inserted_rows: Vec<_> = state_table - .iter_row(PrefetchOptions::new_for_exhaust_iter()) + .prefix_iter_row( + row::empty(), + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await .unwrap() .collect::>() @@ -1796,9 +1824,13 @@ async fn test_state_table_watermark_cache_refill() { for row in &rows { state_table.insert(row); } - + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let inserted_rows: Vec<_> = state_table - .iter_row(PrefetchOptions::new_for_exhaust_iter()) + .prefix_iter_row( + row::empty(), + sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) .await .unwrap() .collect::>() diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index f4a976ae6371d..b58c60168665d 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -22,7 +22,7 @@ use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::row::{once, OwnedRow as RowData, Row}; +use risingwave_common::row::{self, once, OwnedRow, OwnedRow as RowData, Row}; use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::{build_func, BoxedExpression, InputRefExpression, LiteralExpression}; @@ -232,7 +232,11 @@ impl DynamicFilterExecutor Result, StreamExecutorError> { // Recover value for RHS if available - let rhs_stream = self.right_table.iter_row(Default::default()).await?; + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); + let rhs_stream = self + .right_table + .prefix_iter_row(row::empty(), sub_range, Default::default()) + .await?; pin_mut!(rhs_stream); if let Some(res) = rhs_stream.next().await { diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 9748fffc59c5b..c15624a6054be 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; +use std::ops::Bound::Unbounded; + use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::row; +use risingwave_common::row::{self, OwnedRow}; use risingwave_common::types::{DataType, Datum}; use risingwave_storage::StateStore; use tokio::sync::mpsc::UnboundedReceiver; @@ -80,7 +83,10 @@ impl NowExecutor { // Handle the first barrier. state_table.init_epoch(barrier.epoch); let state_row = { - let data_iter = state_table.iter_row(Default::default()).await?; + let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); + let data_iter = state_table + .prefix_iter_row(row::empty(), sub_range, Default::default()) + .await?; pin_mut!(data_iter); if let Some(keyed_row) = data_iter.next().await { Some(keyed_row?) From 598922a92dad1cc3c46f2903ea48283576e3de65 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 11 Oct 2023 18:56:58 +0800 Subject: [PATCH 13/13] rename --- src/ctl/src/cmd_impl/bench.rs | 2 +- src/stream/src/common/table/state_table.rs | 9 +++--- .../src/common/table/test_state_table.rs | 30 +++++++++---------- src/stream/src/executor/aggregation/minput.rs | 2 +- .../executor/backfill/arrangement_backfill.rs | 2 +- src/stream/src/executor/dynamic_filter.rs | 6 ++-- .../src/executor/managed_state/join/mod.rs | 6 ++-- src/stream/src/executor/now.rs | 2 +- src/stream/src/executor/over_window/eowc.rs | 2 +- .../executor/over_window/over_partition.rs | 10 +++---- src/stream/src/executor/sort_buffer.rs | 4 +-- .../executor/source/state_table_handler.rs | 4 +-- src/stream/src/executor/top_n/top_n_state.rs | 6 ++-- 13 files changed, 43 insertions(+), 42 deletions(-) diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index 6d689764f5987..7dfd798a2b5be 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -108,7 +108,7 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let stream = state_table - .prefix_iter_row( + .iter_with_prefix( row::empty(), sub_range, PrefetchOptions::new_for_exhaust_iter(), diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 24695470c9299..fa222062a7ac8 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -930,7 +930,7 @@ where let mut streams = vec![]; for vnode in self.vnodes().iter_vnodes() { let stream = self - .vnode_iter_row(&range, vnode, PrefetchOptions::default()) + .iter_with_vnode(vnode, &range, PrefetchOptions::default()) .await?; streams.push(Box::pin(stream)); } @@ -1090,13 +1090,14 @@ where { /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. - pub async fn vnode_iter_row( + pub async fn iter_with_vnode( &self, - pk_range: &(Bound, Bound), + // Optional vnode that returns an iterator only over the given range under that vnode. // For now, we require this parameter, and will panic. In the future, when `None`, we can // iterate over each vnode that the `StateTableInner` owns. vnode: VirtualNode, + pk_range: &(Bound, Bound), prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { Ok(deserialize_keyed_row_stream( @@ -1129,7 +1130,7 @@ where /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. - pub async fn prefix_iter_row( + pub async fn iter_with_prefix( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index d75a1bcd8ed04..7b6d1dce99f21 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -284,7 +284,7 @@ async fn test_state_table_iter_with_prefix() { let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let iter = state_table - .prefix_iter_row(&pk_prefix, sub_range, Default::default()) + .iter_with_prefix(&pk_prefix, sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -415,7 +415,7 @@ async fn test_state_table_iter_with_pk_range() { std::ops::Bound::Included(OwnedRow::new(vec![Some(4_i32.into())])), ); let iter = state_table - .vnode_iter_row(&pk_range, DEFAULT_VNODE, Default::default()) + .iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -440,7 +440,7 @@ async fn test_state_table_iter_with_pk_range() { std::ops::Bound::::Unbounded, ); let iter = state_table - .vnode_iter_row(&pk_range, DEFAULT_VNODE, Default::default()) + .iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -580,7 +580,7 @@ async fn test_state_table_iter_with_value_indices() { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); { let iter = state_table - .prefix_iter_row(row::empty(), sub_range, Default::default()) + .iter_with_prefix(row::empty(), sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -638,7 +638,7 @@ async fn test_state_table_iter_with_value_indices() { ])); let iter = state_table - .prefix_iter_row(row::empty(), sub_range, Default::default()) + .iter_with_prefix(row::empty(), sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -747,7 +747,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); { let iter = state_table - .prefix_iter_row(row::empty(), sub_range, Default::default()) + .iter_with_prefix(row::empty(), sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -826,7 +826,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() { ])); let iter = state_table - .prefix_iter_row(row::empty(), sub_range, Default::default()) + .iter_with_prefix(row::empty(), sub_range, Default::default()) .await .unwrap(); pin_mut!(iter); @@ -1015,7 +1015,7 @@ async fn test_state_table_write_chunk() { state_table.write_chunk(chunk); let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let rows: Vec<_> = state_table - .prefix_iter_row( + .iter_with_prefix( row::empty(), sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -1133,7 +1133,7 @@ async fn test_state_table_write_chunk_visibility() { state_table.write_chunk(chunk); let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let rows: Vec<_> = state_table - .prefix_iter_row( + .iter_with_prefix( row::empty(), sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -1249,7 +1249,7 @@ async fn test_state_table_write_chunk_value_indices() { state_table.write_chunk(chunk); let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let rows: Vec<_> = state_table - .prefix_iter_row( + .iter_with_prefix( row::empty(), sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -1535,7 +1535,7 @@ async fn test_state_table_watermark_cache_ignore_null() { state_table.write_chunk(chunk); let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let inserted_rows: Vec<_> = state_table - .prefix_iter_row( + .iter_with_prefix( row::empty(), sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -1826,7 +1826,7 @@ async fn test_state_table_watermark_cache_refill() { } let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let inserted_rows: Vec<_> = state_table - .prefix_iter_row( + .iter_with_prefix( row::empty(), sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -1928,7 +1928,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { ); let iter = state_table - .prefix_iter_row(pk_prefix, &sub_range1, Default::default()) + .iter_with_prefix(pk_prefix, &sub_range1, Default::default()) .await .unwrap(); @@ -1966,7 +1966,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let iter = state_table - .prefix_iter_row(pk_prefix, &sub_range2, Default::default()) + .iter_with_prefix(pk_prefix, &sub_range2, Default::default()) .await .unwrap(); @@ -2004,7 +2004,7 @@ async fn test_state_table_iter_prefix_and_sub_range() { let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let iter = state_table - .prefix_iter_row(pk_prefix, &sub_range3, Default::default()) + .iter_with_prefix(pk_prefix, &sub_range3, Default::default()) .await .unwrap(); diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 964173ec8cdae..ced270110f8f4 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -187,7 +187,7 @@ impl MaterializedInputState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let all_data_iter = state_table - .prefix_iter_row( + .iter_with_prefix( group_key.map(GroupKey::table_pk), sub_range, PrefetchOptions { diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index f28f4f5deb3e1..02c38781c8140 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -548,7 +548,7 @@ where let range_bounds = range_bounds.unwrap(); let vnode_row_iter = upstream_table - .vnode_iter_row(&range_bounds, vnode, Default::default()) + .iter_with_vnode(vnode, &range_bounds, Default::default()) .await?; // TODO: Is there some way to avoid double-pin here? diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index b58c60168665d..505804560aa89 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -235,7 +235,7 @@ impl DynamicFilterExecutor, Bound) = &(Unbounded, Unbounded); let rhs_stream = self .right_table - .prefix_iter_row(row::empty(), sub_range, Default::default()) + .iter_with_prefix(row::empty(), sub_range, Default::default()) .await?; pin_mut!(rhs_stream); @@ -386,9 +386,9 @@ impl DynamicFilterExecutor JoinHashMap { if self.need_degree_table { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); - let table_iter_fut = self.state.table.prefix_iter_row( + let table_iter_fut = self.state.table.iter_with_prefix( &key, sub_range, PrefetchOptions::new_for_exhaust_iter(), ); - let degree_table_iter_fut = self.degree_state.table.prefix_iter_row( + let degree_table_iter_fut = self.degree_state.table.iter_with_prefix( &key, sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -446,7 +446,7 @@ impl JoinHashMap { let table_iter = self .state .table - .prefix_iter_row(&key, sub_range, PrefetchOptions::new_for_exhaust_iter()) + .iter_with_prefix(&key, sub_range, PrefetchOptions::new_for_exhaust_iter()) .await?; #[for_await] diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index c15624a6054be..e4bba69924a42 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -85,7 +85,7 @@ impl NowExecutor { let state_row = { let sub_range: &(Bound, Bound) = &(Unbounded, Unbounded); let data_iter = state_table - .prefix_iter_row(row::empty(), sub_range, Default::default()) + .iter_with_prefix(row::empty(), sub_range, Default::default()) .await?; pin_mut!(data_iter); if let Some(keyed_row) = data_iter.next().await { diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index df8e32b25f6b3..b5da45edd47e5 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -200,7 +200,7 @@ impl EowcOverWindowExecutor { // Recover states from state table. let table_iter = this .state_table - .prefix_iter_row( + .iter_with_prefix( partition_key, sub_range, PrefetchOptions::new_for_exhaust_iter(), diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 681d01c9d1ba5..a58950e001882 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -458,7 +458,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let table_iter = table - .prefix_iter_row( + .iter_with_prefix( self.this_partition_key, sub_range, PrefetchOptions::new_for_exhaust_iter(), @@ -573,9 +573,9 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ) -> StreamExecutorResult<()> { let streams = stream::iter(table.vnode_bitmap().iter_vnodes()) .map(|vnode| { - table.vnode_iter_row( - &table_pk_range, + table.iter_with_vnode( vnode, + &table_pk_range, PrefetchOptions::new_for_exhaust_iter(), ) }) @@ -653,7 +653,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ); let streams: Vec<_> = futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| { - table.vnode_iter_row(&pk_range, vnode, PrefetchOptions::new_for_exhaust_iter()) + table.iter_with_vnode(vnode, &pk_range, PrefetchOptions::new_for_exhaust_iter()) })) .await? .into_iter() @@ -747,7 +747,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ); let streams: Vec<_> = futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| { - table.vnode_iter_row(&pk_range, vnode, PrefetchOptions::default()) + table.iter_with_vnode(vnode, &pk_range, PrefetchOptions::default()) })) .await? .into_iter() diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index 1da8a041041c2..a1d6e3286ed5f 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -213,9 +213,9 @@ impl SortBuffer { let streams: Vec<_> = futures::future::try_join_all(buffer_table.vnode_bitmap().iter_vnodes().map(|vnode| { - buffer_table.vnode_iter_row( - &pk_range, + buffer_table.iter_with_vnode( vnode, + &pk_range, PrefetchOptions::new_with_exhaust_iter(filler.capacity().is_none()), ) })) diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 96c74a90ef40d..85c6d2d780d60 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -84,9 +84,9 @@ impl SourceStateTableHandler { // all source executor has vnode id zero let iter = self .state_store - .vnode_iter_row( - &(start, end), + .iter_with_vnode( VirtualNode::ZERO, + &(start, end), PrefetchOptions::new_for_exhaust_iter(), ) .await?; diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 3d29f1e0709f6..841e7f5bb50d7 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -86,7 +86,7 @@ impl ManagedTopNState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .prefix_iter_row(&group_key, sub_range, Default::default()) + .iter_with_prefix(&group_key, sub_range, Default::default()) .await?; pin_mut!(state_table_iter); @@ -124,7 +124,7 @@ impl ManagedTopNState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .prefix_iter_row( + .iter_with_prefix( &group_key, sub_range, PrefetchOptions { @@ -173,7 +173,7 @@ impl ManagedTopNState { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table - .prefix_iter_row( + .iter_with_prefix( &group_key, sub_range, PrefetchOptions {