Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state_table): add iterator sub range under a certain pk prefix #12251

Merged
merged 9 commits into from
Sep 18, 2023
Merged
108 changes: 93 additions & 15 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,30 @@ where
.await
}

/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
/// `vnode`.
pub async fn iter_row_with_pk_prefix_sub_range(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes();

let memcomparable_range = prefix_and_sub_range_to_memcomparable(
&self.pk_serde,
sub_range,
pk_prefix.to_owned_row(),
);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode);
Ok(deserialize_keyed_row_stream(
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await?,
&self.row_serde,
))
}

/// This function scans raw key-values from the relational table with specific `pk_range` under
/// the same `vnode`.
async fn iter_kv_with_pk_range(
Expand Down Expand Up @@ -1297,15 +1321,50 @@ pub fn prefix_range_to_memcomparable(
range: &(Bound<impl Row>, Bound<impl Row>),
) -> (Bound<Bytes>, Bound<Bytes>) {
(
to_memcomparable(pk_serde, &range.0, false),
to_memcomparable(pk_serde, &range.1, true),
start_range_to_memcomparable(pk_serde, &range.0),
end_range_to_memcomparable(pk_serde, &range.1, None),
)
}

fn to_memcomparable<R: Row>(
pub fn prefix_and_sub_range_to_memcomparable(
pk_serde: &OrderedRowSerde,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
pk_prefix: OwnedRow,
) -> (Bound<Bytes>, Bound<Bytes>) {
let (range_start, range_end) = sub_range;
let pk_prefix_calculate_start_range = pk_prefix.clone();
let pk_prefix_calculate_end_range = pk_prefix.clone();
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
let serialized_pk_prefix = serialize_pk(pk_prefix, &prefix_serializer);

let start_range = match range_start {
Included(start_range) => {
Bound::Included((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row())
}
Excluded(start_range) => {
Bound::Excluded((pk_prefix_calculate_start_range.chain(start_range)).to_owned_row())
}
Unbounded => Bound::Included(pk_prefix_calculate_start_range),
};

let end_range = match range_end {
Included(end_range) => {
Bound::Included((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row())
}
Excluded(end_range) => {
Bound::Excluded((pk_prefix_calculate_end_range.chain(end_range)).to_owned_row())
}
Unbounded => Unbounded,
};
(
start_range_to_memcomparable(pk_serde, &start_range),
end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
)
}
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

fn start_range_to_memcomparable<R: Row>(
pk_serde: &OrderedRowSerde,
bound: &Bound<R>,
is_upper: bool,
) -> Bound<Bytes> {
let serialize_pk_prefix = |pk_prefix: &R| {
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
Expand All @@ -1315,20 +1374,39 @@ fn to_memcomparable<R: Row>(
Unbounded => Unbounded,
Included(r) => {
let serialized = serialize_pk_prefix(r);
if is_upper {
end_bound_of_prefix(&serialized)
} else {
Included(serialized)
}

Included(serialized)
}
Excluded(r) => {
let serialized = serialize_pk_prefix(r);
if !is_upper {
// if lower
start_bound_of_excluded_prefix(&serialized)
} else {
Excluded(serialized)
}

start_bound_of_excluded_prefix(&serialized)
}
}
}

fn end_range_to_memcomparable<R: Row>(
pk_serde: &OrderedRowSerde,
bound: &Bound<R>,
serialized_pk_prefix: Option<Bytes>,
) -> Bound<Bytes> {
let serialize_pk_prefix = |pk_prefix: &R| {
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
serialize_pk(pk_prefix, &prefix_serializer)
};
match bound {
Unbounded => match serialized_pk_prefix {
Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
None => Unbounded,
},
Included(r) => {
let serialized = serialize_pk_prefix(r);

end_bound_of_prefix(&serialized)
}
Excluded(r) => {
let serialized = serialize_pk_prefix(r);
Excluded(serialized)
}
}
}
182 changes: 182 additions & 0 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;

use futures::{pin_mut, StreamExt};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
Expand Down Expand Up @@ -1833,3 +1835,183 @@ async fn test_state_table_watermark_cache_refill() {
.as_scalar_ref_impl()
)
}

#[tokio::test]
async fn test_state_table_iter_prefix_and_sub_range() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };
let test_env = prepare_hummock_test_env().await;

let order_types = vec![OrderType::ascending(), OrderType::ascending()];
let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)];
let column_descs = vec![
ColumnDesc::unnamed(column_ids[0], DataType::Int32),
ColumnDesc::unnamed(column_ids[1], DataType::Int32),
ColumnDesc::unnamed(column_ids[2], DataType::Int32),
];
let pk_index = vec![0_usize, 1_usize];
let read_prefix_len_hint = 0;
let table = gen_prost_table(
TEST_TABLE_ID,
column_descs,
order_types,
pk_index,
read_prefix_len_hint,
);

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None)
.await;
let mut epoch = EpochPair::new_test_epoch(1);
state_table.init_epoch(epoch);

state_table.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Some(11_i32.into()),
Some(111_i32.into()),
]));
state_table.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]));
state_table.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Some(33_i32.into()),
Some(333_i32.into()),
]));

state_table.insert(OwnedRow::new(vec![
Some(4_i32.into()),
Some(44_i32.into()),
Some(444_i32.into()),
]));

epoch.inc();
state_table.commit(epoch).await.unwrap();

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);

let sub_range1 = (
std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])),
std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])),
);

let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default())
.await
.unwrap();

pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(11_i32.into()),
Some(111_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await;
assert!(res.is_none());

let sub_range2: (Bound<OwnedRow>, Bound<OwnedRow>) = (
std::ops::Bound::Excluded(OwnedRow::new(vec![Some(11_i32.into())])),
std::ops::Bound::Unbounded,
);

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default())
.await
.unwrap();

pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(33_i32.into()),
Some(333_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await;
assert!(res.is_none());

let sub_range3: (Bound<OwnedRow>, Bound<OwnedRow>) = (
std::ops::Bound::Unbounded,
std::ops::Bound::Included(OwnedRow::new(vec![Some(33_i32.into())])),
);

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default())
.await
.unwrap();

pin_mut!(iter);
let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(11_i32.into()),
Some(111_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await.unwrap().unwrap();

assert_eq!(
&OwnedRow::new(vec![
Some(1_i32.into()),
Some(33_i32.into()),
Some(333_i32.into()),
]),
res.as_ref()
);

let res = iter.next().await;
assert!(res.is_none());
}
Loading