diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 3eb10fbe34d72..d97ea51b5360d 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -51,7 +51,7 @@ struct ExecutorInner { schema: Schema, calls: Vec, - partition_key_indices: Vec, + deduped_part_key_indices: Vec, order_key_indices: Vec, order_key_data_types: Vec, order_key_order_types: Vec, @@ -88,9 +88,10 @@ impl Execute for OverWindowExecutor { } impl ExecutorInner { + /// Get deduplicated partition key from a full row, which happened to be the prefix of table PK. fn get_partition_key(&self, full_row: impl Row) -> OwnedRow { full_row - .project(&self.partition_key_indices) + .project(&self.deduped_part_key_indices) .into_owned_row() } @@ -162,13 +163,22 @@ impl OverWindowExecutor { &input_info.pk_indices, ); + let deduped_part_key_indices = { + let mut dedup = HashSet::new(); + args.partition_key_indices + .iter() + .filter(|i| dedup.insert(**i)) + .copied() + .collect() + }; + Self { input: args.input, inner: ExecutorInner { actor_ctx: args.actor_ctx, schema: args.schema, calls: args.calls, - partition_key_indices: args.partition_key_indices, + deduped_part_key_indices, order_key_indices: args.order_key_indices, order_key_data_types, order_key_order_types: args.order_key_order_types, @@ -262,7 +272,7 @@ impl OverWindowExecutor { chunk: StreamChunk, metrics: &'a OverWindowMetrics, ) { - // partition key => changes happened in the partition. + // (deduped) partition key => changes happened in the partition. let mut deltas: BTreeMap, PartitionDelta> = BTreeMap::new(); // input pk of update records of which the order key is changed. let mut key_change_updated_pks = HashSet::new(); diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index a0fbf32ef563f..2ce9ecad38763 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -70,13 +70,13 @@ const MAGIC_CACHE_SIZE: usize = 1024; const MAGIC_JITTER_PREVENTION: usize = MAGIC_CACHE_SIZE / 8; pub(super) fn shrink_partition_cache( - this_partition_key: &OwnedRow, + deduped_part_key: &OwnedRow, range_cache: &mut PartitionCache, cache_policy: CachePolicy, recently_accessed_range: RangeInclusive, ) { tracing::trace!( - this_partition_key=?this_partition_key, + partition=?deduped_part_key, cache_policy=?cache_policy, recently_accessed_range=?recently_accessed_range, "find the range to retain in the range cache" @@ -218,7 +218,7 @@ pub(super) fn shrink_partition_cache( }; tracing::trace!( - this_partition_key=?this_partition_key, + partition=?deduped_part_key, retain_range=?(&start..=&end), "retain range in the range cache" ); @@ -290,7 +290,7 @@ impl<'a> AffectedRange<'a> { /// By putting this type inside `private` module, we can avoid misuse of the internal fields and /// methods. pub(super) struct OverPartition<'a, S: StateStore> { - this_partition_key: &'a OwnedRow, + deduped_part_key: &'a OwnedRow, range_cache: &'a mut PartitionCache, cache_policy: CachePolicy, @@ -312,7 +312,7 @@ const MAGIC_BATCH_SIZE: usize = 512; impl<'a, S: StateStore> OverPartition<'a, S> { #[allow(clippy::too_many_arguments)] pub fn new( - this_partition_key: &'a OwnedRow, + deduped_part_key: &'a OwnedRow, cache: &'a mut PartitionCache, cache_policy: CachePolicy, calls: &'a [WindowFuncCall], @@ -337,7 +337,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { .any(|call| call.frame.bounds.end_is_unbounded()); Self { - this_partition_key, + deduped_part_key, range_cache: cache, cache_policy, @@ -658,17 +658,17 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if need_extend_leftward { self.stats.left_miss_count += 1; - tracing::trace!(partition=?self.this_partition_key, "partition cache left extension triggered"); + tracing::trace!(partition=?self.deduped_part_key, "partition cache left extension triggered"); let left_most = self.cache_real_first_key().unwrap_or(delta_first).clone(); self.extend_cache_leftward_by_n(table, &left_most).await?; } if need_extend_rightward { self.stats.right_miss_count += 1; - tracing::trace!(partition=?self.this_partition_key, "partition cache right extension triggered"); + tracing::trace!(partition=?self.deduped_part_key, "partition cache right extension triggered"); let right_most = self.cache_real_last_key().unwrap_or(delta_last).clone(); self.extend_cache_rightward_by_n(table, &right_most).await?; } - tracing::trace!(partition=?self.this_partition_key, "partition cache extended"); + tracing::trace!(partition=?self.deduped_part_key, "partition cache extended"); } } @@ -925,16 +925,12 @@ impl<'a, S: StateStore> OverPartition<'a, S> { return Ok(()); } - tracing::trace!(partition=?self.this_partition_key, "loading the whole partition into cache"); + tracing::trace!(partition=?self.deduped_part_key, "loading the whole partition into cache"); let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let table_iter = table - .iter_with_prefix( - self.this_partition_key, - sub_range, - PrefetchOptions::default(), - ) + .iter_with_prefix(self.deduped_part_key, sub_range, PrefetchOptions::default()) .await?; #[for_await] @@ -969,7 +965,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { { // completely not overlapping, for the sake of simplicity, we re-init the cache tracing::debug!( - partition=?self.this_partition_key, + partition=?self.deduped_part_key, cache_first=?cache_real_first_key, cache_last=?cache_real_last_key, range=?range, @@ -985,7 +981,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?), ); tracing::debug!( - partition=?self.this_partition_key, + partition=?self.deduped_part_key, table_sub_range=?table_sub_range, "cache is empty, just loading the given range" ); @@ -1007,7 +1003,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ), ); tracing::trace!( - partition=?self.this_partition_key, + partition=?self.deduped_part_key, table_sub_range=?table_sub_range, "loading the left half of given range" ); @@ -1026,7 +1022,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?), ); tracing::trace!( - partition=?self.this_partition_key, + partition=?self.deduped_part_key, table_sub_range=?table_sub_range, "loading the right half of given range" ); @@ -1139,7 +1135,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ) -> StreamExecutorResult<()> { let stream = table .iter_with_prefix( - self.this_partition_key, + self.deduped_part_key, &table_sub_range, PrefetchOptions::default(), ) @@ -1171,7 +1167,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ); let rev_stream = table .rev_iter_with_prefix( - self.this_partition_key, + self.deduped_part_key, &sub_range, PrefetchOptions::default(), ) @@ -1215,7 +1211,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ); let stream = table .iter_with_prefix( - self.this_partition_key, + self.deduped_part_key, &sub_range, PrefetchOptions::default(), )