Skip to content

Commit

Permalink
perf(over window): skip remaining affected rows when rank is not chan…
Browse files Browse the repository at this point in the history
…ged (#18950)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 8, 2024
1 parent 5dcfae5 commit df85a5b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 7 deletions.
5 changes: 3 additions & 2 deletions src/expr/core/src/window_function/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use enum_as_inner::EnumAsInner;
use parse_display::{Display, FromStr};
use risingwave_common::bail;

use crate::aggregate::AggKind;
use crate::Result;

/// Kind of window functions.
#[derive(Debug, Display, FromStr, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Display, FromStr, Clone, PartialEq, Eq, Hash, EnumAsInner)]
#[display(style = "snake_case")]
pub enum WindowFuncKind {
// General-purpose window functions.
Expand Down Expand Up @@ -62,7 +63,7 @@ impl WindowFuncKind {
}

impl WindowFuncKind {
pub fn is_rank(&self) -> bool {
pub fn is_numbering(&self) -> bool {
matches!(self, Self::RowNumber | Self::Rank | Self::DenseRank)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl LogicalOverWindow {
let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?;

for window_func in &window_functions {
if window_func.kind.is_rank() && window_func.order_by.sort_exprs.is_empty() {
if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(format!(
"window rank function without order by: {:?}",
window_func
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/rule/over_window_split_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Rule for OverWindowSplitRule {
.iter()
.enumerate()
.map(|(idx, func)| {
let func_seq = if func.kind.is_rank() {
let func_seq = if func.kind.is_numbering() {
rank_func_seq += 1;
rank_func_seq
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Rule for OverWindowToTopNRule {
return None;
}
let window_func = &over_window.window_functions()[0];
if !window_func.kind.is_rank() {
if !window_func.kind.is_numbering() {
// Only rank functions can be converted to TopN.
return None;
}
Expand Down
10 changes: 10 additions & 0 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ pub(super) struct Calls {
pub(super) end_is_unbounded: bool,
/// Deduplicated indices of all arguments of all calls.
pub(super) all_arg_indices: Vec<usize>,

// TODO(rc): The following flags are used to optimize for `row_number`, `rank` and `dense_rank`.
// We should try our best to remove these flags while maintaining the performance in the future.
pub(super) numbering_only: bool,
pub(super) has_rank: bool,
}

impl Calls {
Expand Down Expand Up @@ -180,13 +185,18 @@ impl Calls {
.dedup()
.collect();

let numbering_only = calls.iter().all(|call| call.kind.is_numbering());
let has_rank = calls.iter().any(|call| call.kind.is_rank());

Self {
calls,
super_rows_frame_bounds,
range_frames,
start_is_unbounded,
end_is_unbounded,
all_arg_indices,
numbering_only,
has_rank,
}
}

Expand Down
25 changes: 23 additions & 2 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,10 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
BTreeMap<StateKey, Record<OwnedRow>>,
Option<RangeInclusive<StateKey>>,
)> {
let input_schema_len = table.get_data_types().len() - self.calls.len();
let calls = self.calls;
let input_schema_len = table.get_data_types().len() - calls.len();
let numbering_only = calls.numbering_only;
let has_rank = calls.has_rank;

// return values
let mut part_changes = BTreeMap::new();
Expand All @@ -413,6 +415,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

let snapshot = part_with_delta.snapshot();
let delta = part_with_delta.delta();
let last_delta_key = delta.last_key_value().map(|(k, _)| k.as_normal_expect());

// Generate delete changes first, because deletes are skipped during iteration over
// `part_with_delta` in the next step.
Expand Down Expand Up @@ -442,6 +445,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
assert!(last_curr_key.is_normal());
assert!(last_frame_end.is_normal());

let last_delta_key = last_delta_key.unwrap();

if let Some(accessed_range) = accessed_range.as_mut() {
let min_start = first_frame_start
.as_normal_expect()
Expand Down Expand Up @@ -504,12 +509,28 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
let (key, row) = curr_key_cursor
.key_value()
.expect("cursor must be valid until `last_curr_key`");
let mut should_continue = true;

let output = states.slide_no_evict_hint()?;
compute_count += 1;

let old_output = &row.as_inner()[input_schema_len..];
if !old_output.is_empty() && old_output == output {
same_output_count += 1;

if numbering_only {
if has_rank {
// It's possible that an `Insert` doesn't affect it's ties but affects
// all the following rows, so we need to check the `order_key`.
if key.as_normal_expect().order_key > last_delta_key.order_key {
// there won't be any more changes after this point, we can stop early
should_continue = false;
}
} else if key.as_normal_expect() >= last_delta_key {
// there won't be any more changes after this point, we can stop early
should_continue = false;
}
}
}

let new_row = OwnedRow::new(
Expand Down Expand Up @@ -542,7 +563,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

curr_key_cursor.move_next();

key != last_curr_key
should_continue && key != last_curr_key
} {}
}

Expand Down

0 comments on commit df85a5b

Please sign in to comment.