Skip to content

Commit

Permalink
add iter_prefix_and_sub_range
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Sep 13, 2023
1 parent 60f8bcc commit 53808ba
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 117 deletions.
173 changes: 71 additions & 102 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::ops::Bound;
use std::ops::Bound::*;
use std::sync::Arc;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use either::Either;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{pin_mut, FutureExt, Stream, StreamExt};
use futures_async_stream::for_await;
use itertools::{izip, Itertools};
Expand Down Expand Up @@ -1179,87 +1178,25 @@ where
.await
}

// /// This function scans rows from the relational table with specific `pk_range` under the same
// /// `vnode`.
// pub async fn iter_row_with_pk_prefix_sub_range(
// &self,
// pk_prefix: impl Row,
// sub_range: &(Bound<impl Row>, Bound<impl Row>),
// prefetch_options: PrefetchOptions,
// ) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
// let (sub_range_start_bytes, sub_range_end_bytes) =
// prefix_range_to_memcomparable_v2(&self.pk_serde, sub_range);
// // let (sub_range_start_bytes, sub_range_end_bytes) =
// // sub_range;
// let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
// let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);

// let pk_prefix_bytes = Bytes::copy_from_slice(&encoded_prefix);
// let start_range = match sub_range_start_bytes {
// Included(start_bytes) => {
// Bound::Included((encoded_prefix.chain(start_bytes)).into_iter().collect())
// }
// Excluded(start_bytes) => {
// Bound::Excluded(encoded_prefix.chain(start_bytes).into_iter().collect())
// }
// Unbounded => Bound::Included(encoded_prefix),
// };

// let end_range = match sub_range_end_bytes {
// Included(end_bytes) => {
// Bound::Included((pk_prefix_bytes.chain(end_bytes)).into_iter().collect())
// }
// Excluded(end_bytes) => {
// Bound::Excluded(pk_prefix_bytes.chain(end_bytes).into_iter().collect())
// }
// Unbounded => end_bound_of_prefix(&pk_prefix_bytes),
// };
// println!("这里{:?}", (start_range.clone(), end_range.clone()));
// Ok(deserialize_keyed_row_stream(
// self.iter_kv((start_range, end_range), None, prefetch_options)
// .await?,
// &self.row_serde,
// ))
// }



/// This function scans rows from the relational table with specific `pk_range` under the same
/// `vnode`.
pub async fn xxx(
pub async fn iter_row_with_pk_prefix_sub_range(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let (start, end) = sub_range;
let pk_prefix = pk_prefix.to_owned_row();
let pk_prefix_v2 = pk_prefix.clone();
let pk_prefix_v3 = pk_prefix.clone();
let start_range = match start {
Included(start_bytes) => {
Bound::Included((&pk_prefix.chain(start_bytes)).to_owned_row())
}
Excluded(start_bytes) => {
Bound::Excluded((&pk_prefix.chain(start_bytes)).to_owned_row())
}
Unbounded => Bound::Included(pk_prefix),
};
let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes();

let end_range = match end {
Included(start_bytes) => {
Bound::Included((&pk_prefix_v2.chain(start_bytes)).to_owned_row())
}
Excluded(start_bytes) => {
Bound::Excluded((&pk_prefix_v2.chain(start_bytes)).to_owned_row())
}
Unbounded => Unbounded,
};
println!("这里range {:?}", (start_range.clone(), end_range.clone()));
let a = prefix_range_to_memcomparable_v2(&self.pk_serde,&(start_range, end_range), pk_prefix_v3);

let memcomparable_range = prefix_and_sub_range_to_memcomparable(
&self.pk_serde,
sub_range,
pk_prefix.to_owned_row(),
);

let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode);
Ok(deserialize_keyed_row_stream(
self.iter_kv(a, None, prefetch_options)
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await?,
&self.row_serde,
))
Expand Down Expand Up @@ -1372,14 +1309,39 @@ pub fn prefix_range_to_memcomparable(
)
}

pub fn prefix_range_to_memcomparable_v2(
pub fn prefix_and_sub_range_to_memcomparable(
pk_serde: &OrderedRowSerde,
range: &(Bound<impl Row>, Bound<impl Row>),
pk_prefix: OwnedRow
sub_range: &(Bound<impl Row>, Bound<impl Row>),
pk_prefix: OwnedRow,
) -> (Bound<Bytes>, Bound<Bytes>) {
let (range_start, range_end) = sub_range;
let pk_prefix_calculate_start_range = pk_prefix.clone();
let pk_prefix_calculate_end_range = pk_prefix.clone();
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
let serialized_pk_prefix = serialize_pk(pk_prefix, &prefix_serializer);

let start_range = match range_start {
Included(start_range) => {
Bound::Included((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row())
}
Excluded(start_range) => {
Bound::Excluded((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row())
}
Unbounded => Bound::Included(pk_prefix_calculate_start_range),
};

let end_range = match range_end {
Included(end_range) => {
Bound::Included((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row())
}
Excluded(end_range) => {
Bound::Excluded((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row())
}
Unbounded => Unbounded,
};
(
to_memcomparable_v2(pk_serde, &range.0, false, pk_prefix.clone()),
to_memcomparable_v2(pk_serde, &range.1, true, pk_prefix),
start_range_to_memcomparable(pk_serde, &start_range),
end_range_to_memcomparable(pk_serde, &end_range, serialized_pk_prefix),
)
}

Expand Down Expand Up @@ -1414,43 +1376,50 @@ fn to_memcomparable<R: Row>(
}
}


fn to_memcomparable_v2<R: Row>(
fn start_range_to_memcomparable<R: Row>(
pk_serde: &OrderedRowSerde,
bound: &Bound<R>,
is_upper: bool,
pk_prefix1: OwnedRow,
) -> Bound<Bytes> {
let serialize_pk_prefix = |pk_prefix: &R| {
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
serialize_pk(pk_prefix, &prefix_serializer)
};
match bound {
Unbounded => {
if is_upper {
let prefix_serializer = pk_serde.prefix(pk_prefix1.len());
let serialized = serialize_pk(pk_prefix1, &prefix_serializer);
Included(serialized)
} else {
Unbounded
}
},
unreachable!()
}
Included(r) => {
let serialized = serialize_pk_prefix(r);
if is_upper {
end_bound_of_prefix(&serialized)
} else {
Included(serialized)
}

Included(serialized)
}
Excluded(r) => {
let serialized = serialize_pk_prefix(r);
if !is_upper {
// if lower
start_bound_of_excluded_prefix(&serialized)
} else {
Excluded(serialized)
}

start_bound_of_excluded_prefix(&serialized)
}
}
}

fn end_range_to_memcomparable<R: Row>(
pk_serde: &OrderedRowSerde,
bound: &Bound<R>,
serialized_pk_prefix: Bytes,
) -> Bound<Bytes> {
let serialize_pk_prefix = |pk_prefix: &R| {
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
serialize_pk(pk_prefix, &prefix_serializer)
};
match bound {
Unbounded => end_bound_of_prefix(&serialized_pk_prefix),
Included(r) => {
let serialized = serialize_pk_prefix(r);

end_bound_of_prefix(&serialized)
}
Excluded(r) => {
let serialized = serialize_pk_prefix(r);
Excluded(serialized)
}
}
}
116 changes: 101 additions & 15 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1837,7 +1837,7 @@ async fn test_state_table_watermark_cache_refill() {
}

#[tokio::test]
async fn test_state_table_iter_prefix_sub_range() {
async fn test_state_table_iter_prefix_and_sub_range() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };
let test_env = prepare_hummock_test_env().await;

Expand Down Expand Up @@ -1891,21 +1891,96 @@ async fn test_state_table_iter_prefix_sub_range() {
state_table.commit(epoch).await.unwrap();

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
// let pk_prefix = OwnedRow::empty();

let pk_range = (
let sub_range1 = (
std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])),
std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])),
);

let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default())
.await
.unwrap();

pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(11_i32.into()),
Some(111_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await;
assert!(res.is_none());

let sub_range2: (Bound<OwnedRow>, Bound<OwnedRow>) = (
std::ops::Bound::Excluded(OwnedRow::new(vec![Some(11_i32.into())])),
std::ops::Bound::Unbounded,
);

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.xxx(pk_prefix, &pk_range, Default::default())
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default())
.await
.unwrap();

pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(33_i32.into()),
Some(333_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await;
assert!(res.is_none());

let sub_range3: (Bound<OwnedRow>, Bound<OwnedRow>) = (
std::ops::Bound::Unbounded,
std::ops::Bound::Included(OwnedRow::new(vec![Some(33_i32.into())])),
);

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default())
.await
.unwrap();

pin_mut!(iter);
let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Expand All @@ -1917,15 +1992,26 @@ async fn test_state_table_iter_prefix_sub_range() {

let res = iter.next().await.unwrap().unwrap();

// assert_eq!(
// &OwnedRow::new(vec![
// Some("abc".into()),
// Some("bbb".into()),
// Some("bbbb".into()),
// ]),
// res.as_ref()
// );

// let res = iter.next().await;
// assert!(res.is_none());
assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(33_i32.into()),
Some(333_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await;
assert!(res.is_none());
}

0 comments on commit 53808ba

Please sign in to comment.