Skip to content

Commit

Permalink
skip remaining affected curr keys when rank has no change
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Oct 25, 2024
1 parent ca48be1 commit cdc8c0c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub(super) struct Calls {
pub(super) end_is_unbounded: bool,
/// Deduplicated indices of all arguments of all calls.
pub(super) all_arg_indices: Vec<usize>,
pub(super) rank_funcs_only: bool,
}

impl Calls {
Expand Down Expand Up @@ -180,13 +181,16 @@ impl Calls {
.dedup()
.collect();

let rank_funcs_only = calls.iter().all(|call| call.kind.is_rank());

Self {
calls,
super_rows_frame_bounds,
range_frames,
start_is_unbounded,
end_is_unbounded,
all_arg_indices,
rank_funcs_only,
}
}

Expand Down
15 changes: 13 additions & 2 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
BTreeMap<StateKey, Record<OwnedRow>>,
Option<RangeInclusive<StateKey>>,
)> {
let input_schema_len = table.get_data_types().len() - self.calls.len();
let calls = self.calls;
let input_schema_len = table.get_data_types().len() - calls.len();
let rank_funcs_only = calls.rank_funcs_only;

// return values
let mut part_changes = BTreeMap::new();
Expand All @@ -413,6 +414,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

let snapshot = part_with_delta.snapshot();
let delta = part_with_delta.delta();
let last_delta_key = delta.last_key_value().map(|(k, _)| k);

// Generate delete changes first, because deletes are skipped during iteration over
// `part_with_delta` in the next step.
Expand Down Expand Up @@ -442,6 +444,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
assert!(last_curr_key.is_normal());
assert!(last_frame_end.is_normal());

let last_delta_key = last_delta_key.unwrap();

if let Some(accessed_range) = accessed_range.as_mut() {
let min_start = first_frame_start
.as_normal_expect()
Expand Down Expand Up @@ -504,12 +508,19 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
let (key, row) = curr_key_cursor
.key_value()
.expect("cursor must be valid until `last_curr_key`");
let mut should_continue = true;

let output = states.slide_no_evict_hint()?;
compute_count += 1;

let old_output = &row.as_inner()[input_schema_len..];
if !old_output.is_empty() && old_output == output {
same_output_count += 1;

if rank_funcs_only && key >= last_delta_key {
// there won't be any more changes after this point, we can stop early
should_continue = false;
}
}

let new_row = OwnedRow::new(
Expand Down Expand Up @@ -542,7 +553,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

curr_key_cursor.move_next();

key != last_curr_key
should_continue && key != last_curr_key
} {}
}

Expand Down

0 comments on commit cdc8c0c

Please sign in to comment.