Skip to content

Commit

Permalink
perf(over window): O(1) impl for `delta_btree_map::CursorWithDelta::m…
Browse files Browse the repository at this point in the history
…ove_next ` (#19214)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 4, 2024
1 parent f5360b3 commit 229d235
Show file tree
Hide file tree
Showing 3 changed files with 447 additions and 451 deletions.
87 changes: 44 additions & 43 deletions src/stream/src/executor/over_window/frame_finder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::ops::Bound;

use delta_btree_map::DeltaBTreeMap;
use delta_btree_map::{CursorWithDelta, DeltaBTreeMap};
use itertools::Itertools;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{Datum, Sentinelled, ToDatumRef};
Expand Down Expand Up @@ -284,15 +284,22 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>(
} else {
part_with_delta.upper_bound(Bound::Included(delta_key))
};
let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| {
if LEFT {
cursor.peek_next().map(|(k, _)| k)
} else {
cursor.peek_prev().map(|(k, _)| k)
}
};

let n_rows_to_move = if LEFT {
frame_bounds.n_following_rows().unwrap()
} else {
frame_bounds.n_preceding_rows().unwrap()
};

if n_rows_to_move == 0 {
return cursor
.key()
return pointed_key(cursor)
.or_else(|| {
if LEFT {
part_with_delta.last_key()
Expand All @@ -304,28 +311,16 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>(
}

for _ in 0..n_rows_to_move {
// Note that we have to move before check, to handle situation where the
// cursor is at ghost position at first.
if LEFT {
cursor.move_prev();
} else {
cursor.move_next();
}
if cursor.position().is_ghost() {
let res = if LEFT { cursor.prev() } else { cursor.next() };
if res.is_none() {
// we reach the end
break;
}
}
cursor
.key()
.or_else(|| {
// Note the difference between this with the `n_rows_to_move == 0` case.
if LEFT {
part_with_delta.first_key()
} else {
part_with_delta.last_key()
}
})
.unwrap()

// We always have a valid key here, because `part_with_delta` must not be empty,
// and `n_rows_to_move` is always larger than 0 when we reach here.
pointed_key(cursor).unwrap()
}

fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
Expand All @@ -350,8 +345,18 @@ fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
// have `curr_key` which definitely exists in the `part_with_delta`. We just find
// the cursor pointing to it and move the cursor to frame boundary.

let mut cursor = part_with_delta.find(curr_key).unwrap();
assert!(!cursor.position().is_ghost());
let mut cursor = if LEFT {
part_with_delta.before(curr_key).unwrap()
} else {
part_with_delta.after(curr_key).unwrap()
};
let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| {
if LEFT {
cursor.peek_next().map(|(k, _)| k)
} else {
cursor.peek_prev().map(|(k, _)| k)
}
};

let n_rows_to_move = if LEFT {
frame_bounds.n_preceding_rows().unwrap()
Expand All @@ -360,25 +365,16 @@ fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
};

for _ in 0..n_rows_to_move {
if LEFT {
cursor.move_prev();
} else {
cursor.move_next();
}
if cursor.position().is_ghost() {
let res = if LEFT { cursor.prev() } else { cursor.next() };
if res.is_none() {
// we reach the end
break;
}
}
cursor
.key()
.or_else(|| {
if LEFT {
part_with_delta.first_key()
} else {
part_with_delta.last_key()
}
})
.unwrap()

// We always have a valid key here, because `cursor` must point to a valid key
// at the beginning.
pointed_key(cursor).unwrap()
}

/// Given a pair of left and right state keys, calculate the leftmost (smallest) and rightmost
Expand Down Expand Up @@ -497,11 +493,15 @@ fn find_for_range_frames<'cache, const LEFT: bool>(
// the curr key.
prev_key
} else {
// If cursor is in ghost position, it simply means that the search key is larger
// If there's nothing on the left, it simply means that the search key is larger
// than any existing key. Returning the last key in this case does no harm. Especially,
// if the last key is largest sentinel, the caller should extend the cache rightward
// to get possible entries with the same order value into the cache.
cursor.key().or_else(|| part_with_delta.last_key()).unwrap()
cursor
.peek_next()
.map(|(k, _)| k)
.or_else(|| part_with_delta.last_key())
.unwrap()
}
} else {
let cursor = part_with_delta.upper_bound(Bound::Included(&search_key));
Expand All @@ -511,7 +511,8 @@ fn find_for_range_frames<'cache, const LEFT: bool>(
next_key
} else {
cursor
.key()
.peek_prev()
.map(|(k, _)| k)
.or_else(|| part_with_delta.first_key())
.unwrap()
}
Expand Down
71 changes: 32 additions & 39 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::ops::{Bound, RangeInclusive};

use delta_btree_map::{Change, DeltaBTreeMap, PositionType};
use delta_btree_map::{Change, DeltaBTreeMap};
use educe::Educe;
use futures_async_stream::for_await;
use risingwave_common::array::stream_record::Record;
Expand Down Expand Up @@ -470,12 +470,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
// Populate window states with the affected range of rows.
{
let mut cursor = part_with_delta
.find(first_frame_start)
.before(first_frame_start)
.expect("first frame start key must exist");
while {
let (key, row) = cursor
.key_value()
.expect("cursor must be valid until `last_frame_end`");

while let Some((key, row)) = cursor.next() {
accessed_entry_count += 1;

for (call, state) in calls.iter().zip_eq_fast(states.iter_mut()) {
// TODO(rc): batch appending
Expand All @@ -488,28 +487,28 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.into(),
);
}
accessed_entry_count += 1;
cursor.move_next();

key != last_frame_end
} {}
if key == last_frame_end {
break;
}
}
}

// Slide to the first affected key. We can safely pass in `first_curr_key` here
// because it definitely exists in the states by the definition of affected range.
states.just_slide_to(first_curr_key.as_normal_expect())?;
let mut curr_key_cursor = part_with_delta.find(first_curr_key).unwrap();
let mut curr_key_cursor = part_with_delta.before(first_curr_key).unwrap();
assert_eq!(
states.curr_key(),
curr_key_cursor.key().map(CacheKey::as_normal_expect)
curr_key_cursor
.peek_next()
.map(|(k, _)| k)
.map(CacheKey::as_normal_expect)
);

// Slide and generate changes.
while {
let (key, row) = curr_key_cursor
.key_value()
.expect("cursor must be valid until `last_curr_key`");
let mut should_continue = true;
while let Some((key, row)) = curr_key_cursor.next() {
let mut should_stop = false;

let output = states.slide_no_evict_hint()?;
compute_count += 1;
Expand All @@ -524,11 +523,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
// all the following rows, so we need to check the `order_key`.
if key.as_normal_expect().order_key > last_delta_key.order_key {
// there won't be any more changes after this point, we can stop early
should_continue = false;
should_stop = true;
}
} else if key.as_normal_expect() >= last_delta_key {
// there won't be any more changes after this point, we can stop early
should_continue = false;
should_stop = true;
}
}
}
Expand All @@ -542,29 +541,23 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.collect(),
);

match curr_key_cursor.position() {
PositionType::Ghost => unreachable!(),
PositionType::Snapshot | PositionType::DeltaUpdate => {
// update
let old_row = snapshot.get(key).unwrap().clone();
if old_row != new_row {
part_changes.insert(
key.as_normal_expect().clone(),
Record::Update { old_row, new_row },
);
}
}
PositionType::DeltaInsert => {
// insert
part_changes
.insert(key.as_normal_expect().clone(), Record::Insert { new_row });
if let Some(old_row) = snapshot.get(key).cloned() {
// update
if old_row != new_row {
part_changes.insert(
key.as_normal_expect().clone(),
Record::Update { old_row, new_row },
);
}
} else {
// insert
part_changes.insert(key.as_normal_expect().clone(), Record::Insert { new_row });
}

curr_key_cursor.move_next();

should_continue && key != last_curr_key
} {}
if should_stop || key == last_curr_key {
break;
}
}
}

self.stats.accessed_entry_count += accessed_entry_count;
Expand Down
Loading

0 comments on commit 229d235

Please sign in to comment.