From c3660e005cdb2b6f110fb351f6e9e25d20e2014b Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Sep 2023 13:24:11 +0800 Subject: [PATCH 1/6] save work --- src/stream/src/common/table/state_table.rs | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index dfdd11732c942..afdc19a3c7fea 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1178,6 +1178,35 @@ where .await } + /// This function scans rows from the relational table with specific `pk_range` under the same + /// `vnode`. + pub async fn iter_row_with_pk_prefix_sub_range( + &self, + pk_prefix: impl Row, + sub_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { + // let (sub_range_start, sub_range_end) = prefix_range_to_memcomparable(&self.pk_serde, sub_range); + let (sub_range_start, sub_range_end) = sub_range; + let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); + let start_range = match sub_range_end { + Included(sub_range_start) => { + Bound::Included(pk_prefix.chain(*sub_range_start)) + }, + Excluded(start) => Bound::Excluded(pk_prefix.chain(*start)), + Unbounded => Bound::Included(pk_prefix.chain()), + }; + + // let end_range = match sub_range_end { + // Included(sub_range_end) => todo!(), + // Excluded(sub_range_end) => todo!(), + // Unbounded => todo!(), + // }; + + todo!() + } + + /// This function scans raw key-values from the relational table with specific `pk_range` under /// the same `vnode`. async fn iter_kv_with_pk_range( From 60f8bcc0ec90b066861c8c3436e0eb54750859f9 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Sep 2023 17:19:44 +0800 Subject: [PATCH 2/6] save work --- src/stream/src/common/table/state_table.rs | 148 +++++++++++++++--- .../src/common/table/test_state_table.rs | 96 ++++++++++++ 2 files changed, 225 insertions(+), 19 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index afdc19a3c7fea..6a0bcad67344f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -16,7 +16,8 @@ use std::ops::Bound; use std::ops::Bound::*; use std::sync::Arc; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use either::Either; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1178,35 +1179,92 @@ where .await } + // /// This function scans rows from the relational table with specific `pk_range` under the same + // /// `vnode`. + // pub async fn iter_row_with_pk_prefix_sub_range( + // &self, + // pk_prefix: impl Row, + // sub_range: &(Bound, Bound), + // prefetch_options: PrefetchOptions, + // ) -> StreamExecutorResult> { + // let (sub_range_start_bytes, sub_range_end_bytes) = + // prefix_range_to_memcomparable_v2(&self.pk_serde, sub_range); + // // let (sub_range_start_bytes, sub_range_end_bytes) = + // // sub_range; + // let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); + // let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); + + // let pk_prefix_bytes = Bytes::copy_from_slice(&encoded_prefix); + // let start_range = match sub_range_start_bytes { + // Included(start_bytes) => { + // Bound::Included((encoded_prefix.chain(start_bytes)).into_iter().collect()) + // } + // Excluded(start_bytes) => { + // Bound::Excluded(encoded_prefix.chain(start_bytes).into_iter().collect()) + // } + // Unbounded => Bound::Included(encoded_prefix), + // }; + + // let end_range = match sub_range_end_bytes { + // Included(end_bytes) => { + // Bound::Included((pk_prefix_bytes.chain(end_bytes)).into_iter().collect()) + // } + // Excluded(end_bytes) => { + // Bound::Excluded(pk_prefix_bytes.chain(end_bytes).into_iter().collect()) + // } + // Unbounded => end_bound_of_prefix(&pk_prefix_bytes), + // }; + // println!("这里{:?}", (start_range.clone(), end_range.clone())); + // Ok(deserialize_keyed_row_stream( + // self.iter_kv((start_range, end_range), None, prefetch_options) + // .await?, + // &self.row_serde, + // )) + // } + + + /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. - pub async fn iter_row_with_pk_prefix_sub_range( + pub async fn xxx( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - // let (sub_range_start, sub_range_end) = prefix_range_to_memcomparable(&self.pk_serde, sub_range); - let (sub_range_start, sub_range_end) = sub_range; - let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - let start_range = match sub_range_end { - Included(sub_range_start) => { - Bound::Included(pk_prefix.chain(*sub_range_start)) - }, - Excluded(start) => Bound::Excluded(pk_prefix.chain(*start)), - Unbounded => Bound::Included(pk_prefix.chain()), + let (start, end) = sub_range; + let pk_prefix = pk_prefix.to_owned_row(); + let pk_prefix_v2 = pk_prefix.clone(); + let pk_prefix_v3 = pk_prefix.clone(); + let start_range = match start { + Included(start_bytes) => { + Bound::Included((&pk_prefix.chain(start_bytes)).to_owned_row()) + } + Excluded(start_bytes) => { + Bound::Excluded((&pk_prefix.chain(start_bytes)).to_owned_row()) + } + Unbounded => Bound::Included(pk_prefix), }; - // let end_range = match sub_range_end { - // Included(sub_range_end) => todo!(), - // Excluded(sub_range_end) => todo!(), - // Unbounded => todo!(), - // }; - - todo!() + let end_range = match end { + Included(start_bytes) => { + Bound::Included((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) + } + Excluded(start_bytes) => { + Bound::Excluded((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) + } + Unbounded => Unbounded, + }; + println!("这里range {:?}", (start_range.clone(), end_range.clone())); + let a = prefix_range_to_memcomparable_v2(&self.pk_serde,&(start_range, end_range), pk_prefix_v3); + + Ok(deserialize_keyed_row_stream( + self.iter_kv(a, None, prefetch_options) + .await?, + &self.row_serde, + )) } - /// This function scans raw key-values from the relational table with specific `pk_range` under /// the same `vnode`. async fn iter_kv_with_pk_range( @@ -1314,6 +1372,17 @@ pub fn prefix_range_to_memcomparable( ) } +pub fn prefix_range_to_memcomparable_v2( + pk_serde: &OrderedRowSerde, + range: &(Bound, Bound), + pk_prefix: OwnedRow +) -> (Bound, Bound) { + ( + to_memcomparable_v2(pk_serde, &range.0, false, pk_prefix.clone()), + to_memcomparable_v2(pk_serde, &range.1, true, pk_prefix), + ) +} + fn to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, @@ -1344,3 +1413,44 @@ fn to_memcomparable( } } } + + +fn to_memcomparable_v2( + pk_serde: &OrderedRowSerde, + bound: &Bound, + is_upper: bool, + pk_prefix1: OwnedRow, +) -> Bound { + let serialize_pk_prefix = |pk_prefix: &R| { + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + serialize_pk(pk_prefix, &prefix_serializer) + }; + match bound { + Unbounded => { + if is_upper { + let prefix_serializer = pk_serde.prefix(pk_prefix1.len()); + let serialized = serialize_pk(pk_prefix1, &prefix_serializer); + Included(serialized) + } else { + Unbounded + } + }, + Included(r) => { + let serialized = serialize_pk_prefix(r); + if is_upper { + end_bound_of_prefix(&serialized) + } else { + Included(serialized) + } + } + Excluded(r) => { + let serialized = serialize_pk_prefix(r); + if !is_upper { + // if lower + start_bound_of_excluded_prefix(&serialized) + } else { + Excluded(serialized) + } + } + } +} diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index c3e5759a47ae6..fb761b7ad0936 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; + use futures::{pin_mut, StreamExt}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -1833,3 +1835,97 @@ async fn test_state_table_watermark_cache_refill() { .as_scalar_ref_impl() ) } + +#[tokio::test] +async fn test_state_table_iter_prefix_sub_range() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let test_env = prepare_hummock_test_env().await; + + let order_types = vec![OrderType::ascending(), OrderType::ascending()]; + let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ColumnDesc::unnamed(column_ids[2], DataType::Int32), + ]; + let pk_index = vec![0_usize, 1_usize]; + let read_prefix_len_hint = 0; + let table = gen_prost_table( + TEST_TABLE_ID, + column_descs, + order_types, + pk_index, + read_prefix_len_hint, + ); + + test_env.register_table(table.clone()).await; + let mut state_table = + StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) + .await; + let mut epoch = EpochPair::new_test_epoch(1); + state_table.init_epoch(epoch); + + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ])); + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ])); + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ])); + + state_table.insert(OwnedRow::new(vec![ + Some(4_i32.into()), + Some(44_i32.into()), + Some(444_i32.into()), + ])); + + epoch.inc(); + state_table.commit(epoch).await.unwrap(); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + // let pk_prefix = OwnedRow::empty(); + + let pk_range = ( + std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])), + std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])), + ); + let iter = state_table + .xxx(pk_prefix, &pk_range, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + // assert_eq!( + // &OwnedRow::new(vec![ + // Some("abc".into()), + // Some("bbb".into()), + // Some("bbbb".into()), + // ]), + // res.as_ref() + // ); + + // let res = iter.next().await; + // assert!(res.is_none()); +} From 53808ba52963defad75ed50026788afd7ac636d3 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 13 Sep 2023 11:10:11 +0800 Subject: [PATCH 3/6] add iter_prefix_and_sub_range --- src/stream/src/common/table/state_table.rs | 173 +++++++----------- .../src/common/table/test_state_table.rs | 116 ++++++++++-- 2 files changed, 172 insertions(+), 117 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 6a0bcad67344f..341b915fa53df 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -16,8 +16,7 @@ use std::ops::Bound; use std::ops::Bound::*; use std::sync::Arc; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use either::Either; +use bytes::{BufMut, Bytes, BytesMut}; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1179,87 +1178,25 @@ where .await } - // /// This function scans rows from the relational table with specific `pk_range` under the same - // /// `vnode`. - // pub async fn iter_row_with_pk_prefix_sub_range( - // &self, - // pk_prefix: impl Row, - // sub_range: &(Bound, Bound), - // prefetch_options: PrefetchOptions, - // ) -> StreamExecutorResult> { - // let (sub_range_start_bytes, sub_range_end_bytes) = - // prefix_range_to_memcomparable_v2(&self.pk_serde, sub_range); - // // let (sub_range_start_bytes, sub_range_end_bytes) = - // // sub_range; - // let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - // let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); - - // let pk_prefix_bytes = Bytes::copy_from_slice(&encoded_prefix); - // let start_range = match sub_range_start_bytes { - // Included(start_bytes) => { - // Bound::Included((encoded_prefix.chain(start_bytes)).into_iter().collect()) - // } - // Excluded(start_bytes) => { - // Bound::Excluded(encoded_prefix.chain(start_bytes).into_iter().collect()) - // } - // Unbounded => Bound::Included(encoded_prefix), - // }; - - // let end_range = match sub_range_end_bytes { - // Included(end_bytes) => { - // Bound::Included((pk_prefix_bytes.chain(end_bytes)).into_iter().collect()) - // } - // Excluded(end_bytes) => { - // Bound::Excluded(pk_prefix_bytes.chain(end_bytes).into_iter().collect()) - // } - // Unbounded => end_bound_of_prefix(&pk_prefix_bytes), - // }; - // println!("这里{:?}", (start_range.clone(), end_range.clone())); - // Ok(deserialize_keyed_row_stream( - // self.iter_kv((start_range, end_range), None, prefetch_options) - // .await?, - // &self.row_serde, - // )) - // } - - - /// This function scans rows from the relational table with specific `pk_range` under the same /// `vnode`. - pub async fn xxx( + pub async fn iter_row_with_pk_prefix_sub_range( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - let (start, end) = sub_range; - let pk_prefix = pk_prefix.to_owned_row(); - let pk_prefix_v2 = pk_prefix.clone(); - let pk_prefix_v3 = pk_prefix.clone(); - let start_range = match start { - Included(start_bytes) => { - Bound::Included((&pk_prefix.chain(start_bytes)).to_owned_row()) - } - Excluded(start_bytes) => { - Bound::Excluded((&pk_prefix.chain(start_bytes)).to_owned_row()) - } - Unbounded => Bound::Included(pk_prefix), - }; + let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - let end_range = match end { - Included(start_bytes) => { - Bound::Included((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) - } - Excluded(start_bytes) => { - Bound::Excluded((&pk_prefix_v2.chain(start_bytes)).to_owned_row()) - } - Unbounded => Unbounded, - }; - println!("这里range {:?}", (start_range.clone(), end_range.clone())); - let a = prefix_range_to_memcomparable_v2(&self.pk_serde,&(start_range, end_range), pk_prefix_v3); - + let memcomparable_range = prefix_and_sub_range_to_memcomparable( + &self.pk_serde, + sub_range, + pk_prefix.to_owned_row(), + ); + + let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); Ok(deserialize_keyed_row_stream( - self.iter_kv(a, None, prefetch_options) + self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options) .await?, &self.row_serde, )) @@ -1372,14 +1309,39 @@ pub fn prefix_range_to_memcomparable( ) } -pub fn prefix_range_to_memcomparable_v2( +pub fn prefix_and_sub_range_to_memcomparable( pk_serde: &OrderedRowSerde, - range: &(Bound, Bound), - pk_prefix: OwnedRow + sub_range: &(Bound, Bound), + pk_prefix: OwnedRow, ) -> (Bound, Bound) { + let (range_start, range_end) = sub_range; + let pk_prefix_calculate_start_range = pk_prefix.clone(); + let pk_prefix_calculate_end_range = pk_prefix.clone(); + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + let serialized_pk_prefix = serialize_pk(pk_prefix, &prefix_serializer); + + let start_range = match range_start { + Included(start_range) => { + Bound::Included((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) + } + Excluded(start_range) => { + Bound::Excluded((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) + } + Unbounded => Bound::Included(pk_prefix_calculate_start_range), + }; + + let end_range = match range_end { + Included(end_range) => { + Bound::Included((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) + } + Excluded(end_range) => { + Bound::Excluded((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) + } + Unbounded => Unbounded, + }; ( - to_memcomparable_v2(pk_serde, &range.0, false, pk_prefix.clone()), - to_memcomparable_v2(pk_serde, &range.1, true, pk_prefix), + start_range_to_memcomparable(pk_serde, &start_range), + end_range_to_memcomparable(pk_serde, &end_range, serialized_pk_prefix), ) } @@ -1414,12 +1376,9 @@ fn to_memcomparable( } } - -fn to_memcomparable_v2( +fn start_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, - is_upper: bool, - pk_prefix1: OwnedRow, ) -> Bound { let serialize_pk_prefix = |pk_prefix: &R| { let prefix_serializer = pk_serde.prefix(pk_prefix.len()); @@ -1427,30 +1386,40 @@ fn to_memcomparable_v2( }; match bound { Unbounded => { - if is_upper { - let prefix_serializer = pk_serde.prefix(pk_prefix1.len()); - let serialized = serialize_pk(pk_prefix1, &prefix_serializer); - Included(serialized) - } else { - Unbounded - } - }, + unreachable!() + } Included(r) => { let serialized = serialize_pk_prefix(r); - if is_upper { - end_bound_of_prefix(&serialized) - } else { - Included(serialized) - } + + Included(serialized) } Excluded(r) => { let serialized = serialize_pk_prefix(r); - if !is_upper { - // if lower - start_bound_of_excluded_prefix(&serialized) - } else { - Excluded(serialized) - } + + start_bound_of_excluded_prefix(&serialized) + } + } +} + +fn end_range_to_memcomparable( + pk_serde: &OrderedRowSerde, + bound: &Bound, + serialized_pk_prefix: Bytes, +) -> Bound { + let serialize_pk_prefix = |pk_prefix: &R| { + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + serialize_pk(pk_prefix, &prefix_serializer) + }; + match bound { + Unbounded => end_bound_of_prefix(&serialized_pk_prefix), + Included(r) => { + let serialized = serialize_pk_prefix(r); + + end_bound_of_prefix(&serialized) + } + Excluded(r) => { + let serialized = serialize_pk_prefix(r); + Excluded(serialized) } } } diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index fb761b7ad0936..2f5dc3202adb9 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1837,7 +1837,7 @@ async fn test_state_table_watermark_cache_refill() { } #[tokio::test] -async fn test_state_table_iter_prefix_sub_range() { +async fn test_state_table_iter_prefix_and_sub_range() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; let test_env = prepare_hummock_test_env().await; @@ -1891,14 +1891,52 @@ async fn test_state_table_iter_prefix_sub_range() { state_table.commit(epoch).await.unwrap(); let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); - // let pk_prefix = OwnedRow::empty(); - let pk_range = ( + let sub_range1 = ( std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])), std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])), ); + + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); + + let sub_range2: (Bound, Bound) = ( + std::ops::Bound::Excluded(OwnedRow::new(vec![Some(11_i32.into())])), + std::ops::Bound::Unbounded, + ); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); let iter = state_table - .xxx(pk_prefix, &pk_range, Default::default()) + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default()) .await .unwrap(); @@ -1906,6 +1944,43 @@ async fn test_state_table_iter_prefix_sub_range() { let res = iter.next().await.unwrap().unwrap(); + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); + + let sub_range3: (Bound, Bound) = ( + std::ops::Bound::Unbounded, + std::ops::Bound::Included(OwnedRow::new(vec![Some(33_i32.into())])), + ); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + let res = iter.next().await.unwrap().unwrap(); + assert_eq!( &OwnedRow::new(vec![ Some(1_i32.into()), @@ -1917,15 +1992,26 @@ async fn test_state_table_iter_prefix_sub_range() { let res = iter.next().await.unwrap().unwrap(); - // assert_eq!( - // &OwnedRow::new(vec![ - // Some("abc".into()), - // Some("bbb".into()), - // Some("bbbb".into()), - // ]), - // res.as_ref() - // ); - - // let res = iter.next().await; - // assert!(res.is_none()); + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); } From 2529231c9184f31821073131e22f638d9ed4d463 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 13 Sep 2023 11:13:52 +0800 Subject: [PATCH 4/6] add comments --- src/stream/src/common/table/state_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 341b915fa53df..c3615ded2ea5c 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1178,7 +1178,7 @@ where .await } - /// This function scans rows from the relational table with specific `pk_range` under the same + /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same /// `vnode`. pub async fn iter_row_with_pk_prefix_sub_range( &self, From c64444c33b0317f09e3b80f76a0fd98917c5b5d9 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 14 Sep 2023 18:05:09 +0800 Subject: [PATCH 5/6] unify some code --- src/stream/src/common/table/state_table.rs | 48 ++++------------------ 1 file changed, 9 insertions(+), 39 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index c3615ded2ea5c..c1a7de2f03b87 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1304,8 +1304,8 @@ pub fn prefix_range_to_memcomparable( range: &(Bound, Bound), ) -> (Bound, Bound) { ( - to_memcomparable(pk_serde, &range.0, false), - to_memcomparable(pk_serde, &range.1, true), + start_range_to_memcomparable(pk_serde, &range.0), + end_range_to_memcomparable(pk_serde, &range.1, None), ) } @@ -1341,41 +1341,10 @@ pub fn prefix_and_sub_range_to_memcomparable( }; ( start_range_to_memcomparable(pk_serde, &start_range), - end_range_to_memcomparable(pk_serde, &end_range, serialized_pk_prefix), + end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)), ) } -fn to_memcomparable( - pk_serde: &OrderedRowSerde, - bound: &Bound, - is_upper: bool, -) -> Bound { - let serialize_pk_prefix = |pk_prefix: &R| { - let prefix_serializer = pk_serde.prefix(pk_prefix.len()); - serialize_pk(pk_prefix, &prefix_serializer) - }; - match bound { - Unbounded => Unbounded, - Included(r) => { - let serialized = serialize_pk_prefix(r); - if is_upper { - end_bound_of_prefix(&serialized) - } else { - Included(serialized) - } - } - Excluded(r) => { - let serialized = serialize_pk_prefix(r); - if !is_upper { - // if lower - start_bound_of_excluded_prefix(&serialized) - } else { - Excluded(serialized) - } - } - } -} - fn start_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, @@ -1385,9 +1354,7 @@ fn start_range_to_memcomparable( serialize_pk(pk_prefix, &prefix_serializer) }; match bound { - Unbounded => { - unreachable!() - } + Unbounded => Unbounded, Included(r) => { let serialized = serialize_pk_prefix(r); @@ -1404,14 +1371,17 @@ fn start_range_to_memcomparable( fn end_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, - serialized_pk_prefix: Bytes, + serialized_pk_prefix: Option, ) -> Bound { let serialize_pk_prefix = |pk_prefix: &R| { let prefix_serializer = pk_serde.prefix(pk_prefix.len()); serialize_pk(pk_prefix, &prefix_serializer) }; match bound { - Unbounded => end_bound_of_prefix(&serialized_pk_prefix), + Unbounded => match serialized_pk_prefix { + Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix), + None => Unbounded, + }, Included(r) => { let serialized = serialize_pk_prefix(r); From f7df04fa0f12977c7aee73f3543927e123c72de8 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 15 Sep 2023 15:51:13 +0800 Subject: [PATCH 6/6] resolve comments --- src/stream/src/common/table/state_table.rs | 36 +++++++--------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 86c4f9d1e7624..045fb1fdaeba9 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -17,6 +17,7 @@ use std::ops::Bound::*; use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; +use either::Either; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1205,11 +1206,8 @@ where ) -> StreamExecutorResult> { let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); - let memcomparable_range = prefix_and_sub_range_to_memcomparable( - &self.pk_serde, - sub_range, - pk_prefix.to_owned_row(), - ); + let memcomparable_range = + prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix); let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); Ok(deserialize_keyed_row_stream( @@ -1326,34 +1324,22 @@ pub fn prefix_range_to_memcomparable( ) } -pub fn prefix_and_sub_range_to_memcomparable( +fn prefix_and_sub_range_to_memcomparable( pk_serde: &OrderedRowSerde, sub_range: &(Bound, Bound), - pk_prefix: OwnedRow, + pk_prefix: impl Row, ) -> (Bound, Bound) { let (range_start, range_end) = sub_range; - let pk_prefix_calculate_start_range = pk_prefix.clone(); - let pk_prefix_calculate_end_range = pk_prefix.clone(); let prefix_serializer = pk_serde.prefix(pk_prefix.len()); - let serialized_pk_prefix = serialize_pk(pk_prefix, &prefix_serializer); - + let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer); let start_range = match range_start { - Included(start_range) => { - Bound::Included((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) - } - Excluded(start_range) => { - Bound::Excluded((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row()) - } - Unbounded => Bound::Included(pk_prefix_calculate_start_range), + Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))), + Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))), + Unbounded => Bound::Included(Either::Right(&pk_prefix)), }; - let end_range = match range_end { - Included(end_range) => { - Bound::Included((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) - } - Excluded(end_range) => { - Bound::Excluded((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row()) - } + Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)), + Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)), Unbounded => Unbounded, }; (