Skip to content

Commit

Permalink
fix(state table): add the accidentally deleted prefix_hint back (#12817)
Browse files Browse the repository at this point in the history
Co-authored-by: stonepage <[email protected]>
  • Loading branch information
wcy-fdu and st1page authored Oct 12, 2023
1 parent 5919269 commit f27182f
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,42 +1136,61 @@ where
/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
/// `vnode`.
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
let encoded_key_range = range_of_prefix(&encoded_prefix);

// We assume that all usages of iterating the state table only access a single vnode.
// If this assertion fails, then something must be wrong with the operator implementation or
// the distribution derivation from the optimizer.
let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes();
let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode);

let memcomparable_range =
prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
// Construct prefix hint for prefix bloom filter.
let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
if self.prefix_hint_len != 0 {
debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
}
let prefix_hint = {
if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
None
} else {
let encoded_prefix_len = self
.pk_serde
.deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;

let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode);
Ok(deserialize_keyed_row_stream(
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await?,
&self.row_serde,
))
}
Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec()))
}
};

/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
/// `vnode`.
pub async fn iter_row_with_pk_prefix_sub_range(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes();
trace!(
table_id = %self.table_id(),
?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix,
?pk_prefix_indices,
"storage_iter_with_prefix"
);

let memcomparable_range =
prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);

let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode);

Ok(deserialize_keyed_row_stream(
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await?,
self.iter_kv(
memcomparable_range_with_vnode,
prefix_hint,
prefetch_options,
)
.await?,
&self.row_serde,
))
}
Expand Down

0 comments on commit f27182f

Please sign in to comment.