diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 7beaef468cf61..37e788a3e7abd 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1136,42 +1136,61 @@ where /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. + + /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same + /// `vnode`. pub async fn iter_with_prefix( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { + let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); + let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); + let encoded_key_range = range_of_prefix(&encoded_prefix); + + // We assume that all usages of iterating the state table only access a single vnode. + // If this assertion fails, then something must be wrong with the operator implementation or + // the distribution derivation from the optimizer. let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); + let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode); - let memcomparable_range = - prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix); + // Construct prefix hint for prefix bloom filter. + let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()]; + if self.prefix_hint_len != 0 { + debug_assert_eq!(self.prefix_hint_len, pk_prefix.len()); + } + let prefix_hint = { + if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() { + None + } else { + let encoded_prefix_len = self + .pk_serde + .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?; - let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); - Ok(deserialize_keyed_row_stream( - self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options) - .await?, - &self.row_serde, - )) - } + Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec())) + } + }; - /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same - /// `vnode`. - pub async fn iter_row_with_pk_prefix_sub_range( - &self, - pk_prefix: impl Row, - sub_range: &(Bound, Bound), - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { - let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); + trace!( + table_id = %self.table_id(), + ?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix, + ?pk_prefix_indices, + "storage_iter_with_prefix" + ); let memcomparable_range = prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix); let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); + Ok(deserialize_keyed_row_stream( - self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options) - .await?, + self.iter_kv( + memcomparable_range_with_vnode, + prefix_hint, + prefetch_options, + ) + .await?, &self.row_serde, )) }