Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(over window): fix table iter pk prefix to use deduped partition key #19081

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct ExecutorInner<S: StateStore> {

schema: Schema,
calls: Vec<WindowFuncCall>,
partition_key_indices: Vec<usize>,
deduped_part_key_indices: Vec<usize>,
order_key_indices: Vec<usize>,
order_key_data_types: Vec<DataType>,
order_key_order_types: Vec<OrderType>,
Expand Down Expand Up @@ -88,9 +88,10 @@ impl<S: StateStore> Execute for OverWindowExecutor<S> {
}

impl<S: StateStore> ExecutorInner<S> {
/// Get deduplicated partition key from a full row, which happened to be the prefix of table PK.
fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
full_row
.project(&self.partition_key_indices)
.project(&self.deduped_part_key_indices)
.into_owned_row()
}

Expand Down Expand Up @@ -162,13 +163,22 @@ impl<S: StateStore> OverWindowExecutor<S> {
&input_info.pk_indices,
);

let deduped_part_key_indices = {
let mut dedup = HashSet::new();
args.partition_key_indices
.iter()
.filter(|i| dedup.insert(**i))
.copied()
.collect()
};

Self {
input: args.input,
inner: ExecutorInner {
actor_ctx: args.actor_ctx,
schema: args.schema,
calls: args.calls,
partition_key_indices: args.partition_key_indices,
deduped_part_key_indices,
order_key_indices: args.order_key_indices,
order_key_data_types,
order_key_order_types: args.order_key_order_types,
Expand Down Expand Up @@ -262,7 +272,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
chunk: StreamChunk,
metrics: &'a OverWindowMetrics,
) {
// partition key => changes happened in the partition.
// (deduped) partition key => changes happened in the partition.
let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, PartitionDelta> = BTreeMap::new();
// input pk of update records of which the order key is changed.
let mut key_change_updated_pks = HashSet::new();
Expand Down
40 changes: 18 additions & 22 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ const MAGIC_CACHE_SIZE: usize = 1024;
const MAGIC_JITTER_PREVENTION: usize = MAGIC_CACHE_SIZE / 8;

pub(super) fn shrink_partition_cache(
this_partition_key: &OwnedRow,
deduped_part_key: &OwnedRow,
range_cache: &mut PartitionCache,
cache_policy: CachePolicy,
recently_accessed_range: RangeInclusive<StateKey>,
) {
tracing::trace!(
this_partition_key=?this_partition_key,
partition=?deduped_part_key,
cache_policy=?cache_policy,
recently_accessed_range=?recently_accessed_range,
"find the range to retain in the range cache"
Expand Down Expand Up @@ -218,7 +218,7 @@ pub(super) fn shrink_partition_cache(
};

tracing::trace!(
this_partition_key=?this_partition_key,
partition=?deduped_part_key,
retain_range=?(&start..=&end),
"retain range in the range cache"
);
Expand Down Expand Up @@ -290,7 +290,7 @@ impl<'a> AffectedRange<'a> {
/// By putting this type inside `private` module, we can avoid misuse of the internal fields and
/// methods.
pub(super) struct OverPartition<'a, S: StateStore> {
this_partition_key: &'a OwnedRow,
deduped_part_key: &'a OwnedRow,
range_cache: &'a mut PartitionCache,
cache_policy: CachePolicy,

Expand All @@ -312,7 +312,7 @@ const MAGIC_BATCH_SIZE: usize = 512;
impl<'a, S: StateStore> OverPartition<'a, S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
this_partition_key: &'a OwnedRow,
deduped_part_key: &'a OwnedRow,
cache: &'a mut PartitionCache,
cache_policy: CachePolicy,
calls: &'a [WindowFuncCall],
Expand All @@ -337,7 +337,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.any(|call| call.frame.bounds.end_is_unbounded());

Self {
this_partition_key,
deduped_part_key,
range_cache: cache,
cache_policy,

Expand Down Expand Up @@ -658,17 +658,17 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

if need_extend_leftward {
self.stats.left_miss_count += 1;
tracing::trace!(partition=?self.this_partition_key, "partition cache left extension triggered");
tracing::trace!(partition=?self.deduped_part_key, "partition cache left extension triggered");
let left_most = self.cache_real_first_key().unwrap_or(delta_first).clone();
self.extend_cache_leftward_by_n(table, &left_most).await?;
}
if need_extend_rightward {
self.stats.right_miss_count += 1;
tracing::trace!(partition=?self.this_partition_key, "partition cache right extension triggered");
tracing::trace!(partition=?self.deduped_part_key, "partition cache right extension triggered");
let right_most = self.cache_real_last_key().unwrap_or(delta_last).clone();
self.extend_cache_rightward_by_n(table, &right_most).await?;
}
tracing::trace!(partition=?self.this_partition_key, "partition cache extended");
tracing::trace!(partition=?self.deduped_part_key, "partition cache extended");
}
}

Expand Down Expand Up @@ -925,16 +925,12 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
return Ok(());
}

tracing::trace!(partition=?self.this_partition_key, "loading the whole partition into cache");
tracing::trace!(partition=?self.deduped_part_key, "loading the whole partition into cache");

let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let table_iter = table
.iter_with_prefix(
self.this_partition_key,
sub_range,
PrefetchOptions::default(),
)
.iter_with_prefix(self.deduped_part_key, sub_range, PrefetchOptions::default())
.await?;

#[for_await]
Expand Down Expand Up @@ -969,7 +965,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
{
// completely not overlapping, for the sake of simplicity, we re-init the cache
tracing::debug!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
cache_first=?cache_real_first_key,
cache_last=?cache_real_last_key,
range=?range,
Expand All @@ -985,7 +981,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
);
tracing::debug!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
table_sub_range=?table_sub_range,
"cache is empty, just loading the given range"
);
Expand All @@ -1007,7 +1003,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
),
);
tracing::trace!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
table_sub_range=?table_sub_range,
"loading the left half of given range"
);
Expand All @@ -1026,7 +1022,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
);
tracing::trace!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
table_sub_range=?table_sub_range,
"loading the right half of given range"
);
Expand Down Expand Up @@ -1139,7 +1135,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
) -> StreamExecutorResult<()> {
let stream = table
.iter_with_prefix(
self.this_partition_key,
self.deduped_part_key,
&table_sub_range,
PrefetchOptions::default(),
)
Expand Down Expand Up @@ -1171,7 +1167,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
);
let rev_stream = table
.rev_iter_with_prefix(
self.this_partition_key,
self.deduped_part_key,
&sub_range,
PrefetchOptions::default(),
)
Expand Down Expand Up @@ -1215,7 +1211,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
);
let stream = table
.iter_with_prefix(
self.this_partition_key,
self.deduped_part_key,
&sub_range,
PrefetchOptions::default(),
)
Expand Down
Loading