Skip to content

Commit

Permalink
refactor(over window): rename buffer_ref to window to better refl…
Browse files Browse the repository at this point in the history
…ect the semantic (#17066)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jun 3, 2024
1 parent aab3511 commit 8f63559
Showing 1 changed file with 46 additions and 47 deletions.
93 changes: 46 additions & 47 deletions src/expr/impl/src/window_function/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ impl<W: WindowImpl> WindowBuffer<W> {

/// Get the current window info.
pub fn curr_window(&self) -> CurrWindow<'_, W::Key> {
let buffer_ref = BufferRef {
let window = BufferRef {
buffer: &self.buffer,
curr_idx: self.curr_idx,
left_idx: self.left_idx,
right_excl_idx: self.right_excl_idx,
};
CurrWindow {
key: self.curr_key(),
preceding_saturated: self.window_impl.preceding_saturated(buffer_ref),
following_saturated: self.window_impl.following_saturated(buffer_ref),
preceding_saturated: self.window_impl.preceding_saturated(window),
following_saturated: self.window_impl.following_saturated(window),
}
}

Expand Down Expand Up @@ -190,13 +190,13 @@ impl<W: WindowImpl> WindowBuffer<W> {
}

fn recalculate_left_right(&mut self) {
let buffer_ref = BufferRefMut {
let window = BufferRefMut {
buffer: &self.buffer,
curr_idx: &mut self.curr_idx,
left_idx: &mut self.left_idx,
right_excl_idx: &mut self.right_excl_idx,
};
self.window_impl.recalculate_left_right(buffer_ref);
self.window_impl.recalculate_left_right(window);
}
}

Expand Down Expand Up @@ -227,14 +227,14 @@ pub(super) trait WindowImpl {
/// Whether the preceding half of the current window is saturated.
/// By "saturated" we mean that every row that is possible to be in the preceding half of the
/// current window is already in the buffer.
fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool;
fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;

/// Whether the following half of the current window is saturated.
fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool;
fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;

/// Recalculate the left and right indices of the current window, according to the latest
/// `curr_idx`. The indices are indices in the buffer vector.
fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>);
fn recalculate_left_right(&self, window: BufferRefMut<'_, Self::Key, Self::Value>);
}

/// The sliding window implementation for `ROWS` frames.
Expand All @@ -258,8 +258,8 @@ impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
type Key = K;
type Value = V;

fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool {
buffer_ref.curr_idx < buffer_ref.buffer.len() && {
fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
window.curr_idx < window.buffer.len() && {
let start_off = self.frame_bounds.start.to_offset();
if let Some(start_off) = start_off {
if start_off >= 0 {
Expand All @@ -269,18 +269,18 @@ impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
// the following can be simplified.
#[allow(clippy::nonminimal_bool)]
{
assert!(buffer_ref.curr_idx >= buffer_ref.left_idx);
assert!(window.curr_idx >= window.left_idx);
}
buffer_ref.curr_idx - buffer_ref.left_idx >= start_off.unsigned_abs()
window.curr_idx - window.left_idx >= start_off.unsigned_abs()
}
} else {
false // unbounded frame start, never preceding-saturated
}
}
}

fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool {
buffer_ref.curr_idx < buffer_ref.buffer.len() && {
fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
window.curr_idx < window.buffer.len() && {
let end_off = self.frame_bounds.end.to_offset();
if let Some(end_off) = end_off {
if end_off <= 0 {
Expand All @@ -289,49 +289,48 @@ impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
// FIXME(rc): Ditto.
#[allow(clippy::nonminimal_bool)]
{
assert!(buffer_ref.right_excl_idx > 0);
assert!(buffer_ref.right_excl_idx > buffer_ref.curr_idx);
assert!(buffer_ref.right_excl_idx <= buffer_ref.buffer.len());
assert!(window.right_excl_idx > 0);
assert!(window.right_excl_idx > window.curr_idx);
assert!(window.right_excl_idx <= window.buffer.len());
}
buffer_ref.right_excl_idx - 1 - buffer_ref.curr_idx >= end_off as usize
window.right_excl_idx - 1 - window.curr_idx >= end_off as usize
}
} else {
false // unbounded frame end, never following-saturated
}
}
}

fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>) {
if buffer_ref.buffer.is_empty() {
*buffer_ref.left_idx = 0;
*buffer_ref.right_excl_idx = 0;
fn recalculate_left_right(&self, window: BufferRefMut<'_, Self::Key, Self::Value>) {
if window.buffer.is_empty() {
*window.left_idx = 0;
*window.right_excl_idx = 0;
}

let start_off = self.frame_bounds.start.to_offset();
let end_off = self.frame_bounds.end.to_offset();
if let Some(start_off) = start_off {
let logical_left_idx = *buffer_ref.curr_idx as isize + start_off;
let logical_left_idx = *window.curr_idx as isize + start_off;
if logical_left_idx >= 0 {
*buffer_ref.left_idx =
std::cmp::min(logical_left_idx as usize, buffer_ref.buffer.len());
*window.left_idx = std::cmp::min(logical_left_idx as usize, window.buffer.len());
} else {
*buffer_ref.left_idx = 0;
*window.left_idx = 0;
}
} else {
// unbounded start
*buffer_ref.left_idx = 0;
*window.left_idx = 0;
}
if let Some(end_off) = end_off {
let logical_right_excl_idx = *buffer_ref.curr_idx as isize + end_off + 1;
let logical_right_excl_idx = *window.curr_idx as isize + end_off + 1;
if logical_right_excl_idx >= 0 {
*buffer_ref.right_excl_idx =
std::cmp::min(logical_right_excl_idx as usize, buffer_ref.buffer.len());
*window.right_excl_idx =
std::cmp::min(logical_right_excl_idx as usize, window.buffer.len());
} else {
*buffer_ref.right_excl_idx = 0;
*window.right_excl_idx = 0;
}
} else {
// unbounded end
*buffer_ref.right_excl_idx = buffer_ref.buffer.len();
*window.right_excl_idx = window.buffer.len();
}
}
}
Expand All @@ -355,36 +354,36 @@ impl<V: Clone> WindowImpl for RangeWindow<V> {
type Key = StateKey;
type Value = V;

fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool {
buffer_ref.curr_idx < buffer_ref.buffer.len() && {
fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
window.curr_idx < window.buffer.len() && {
// XXX(rc): It seems that preceding saturation is not important, may remove later.
true
}
}

fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool {
buffer_ref.curr_idx < buffer_ref.buffer.len()
fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
window.curr_idx < window.buffer.len()
&& {
// Left OK? (note that `left_idx` can be greater than `right_idx`)
// The following line checks whether the left value is the last one in the buffer.
// Here we adopt a conservative approach, which means we assume the next future value
// is likely to be the same as the last value in the current window, in which case
// we can't say the current window is saturated.
buffer_ref.left_idx < buffer_ref.buffer.len() /* non-zero */ - 1
window.left_idx < window.buffer.len() /* non-zero */ - 1
}
&& {
// Right OK? Ditto.
buffer_ref.right_excl_idx < buffer_ref.buffer.len()
window.right_excl_idx < window.buffer.len()
}
}

fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>) {
if buffer_ref.buffer.is_empty() {
*buffer_ref.left_idx = 0;
*buffer_ref.right_excl_idx = 0;
fn recalculate_left_right(&self, window: BufferRefMut<'_, Self::Key, Self::Value>) {
if window.buffer.is_empty() {
*window.left_idx = 0;
*window.right_excl_idx = 0;
}

let Some(entry) = buffer_ref.buffer.get(*buffer_ref.curr_idx) else {
let Some(entry) = window.buffer.get(*window.curr_idx) else {
// If the current index has been moved to a future position, we can't touch anything
// because the next coming key may equal to the previous one which means the left and
// right indices will be the same.
Expand All @@ -403,15 +402,15 @@ impl<V: Clone> WindowImpl for RangeWindow<V> {
Sentinelled::Smallest => {
// unbounded frame start
assert_eq!(
*buffer_ref.left_idx, 0,
*window.left_idx, 0,
"for unbounded start, left index should always be 0"
);
}
Sentinelled::Normal(value) => {
// bounded, find the start position
let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
.expect("no reason to fail here");
*buffer_ref.left_idx = buffer_ref
*window.left_idx = window
.buffer
.partition_point(|elem| elem.key.order_key < value_enc);
}
Expand All @@ -421,13 +420,13 @@ impl<V: Clone> WindowImpl for RangeWindow<V> {
match self.frame_bounds.frame_end_of(curr_order_value) {
Sentinelled::Largest => {
// unbounded frame end
*buffer_ref.right_excl_idx = buffer_ref.buffer.len();
*window.right_excl_idx = window.buffer.len();
}
Sentinelled::Normal(value) => {
// bounded, find the end position
let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
.expect("no reason to fail here");
*buffer_ref.right_excl_idx = buffer_ref
*window.right_excl_idx = window
.buffer
.partition_point(|elem| elem.key.order_key <= value_enc);
}
Expand Down

0 comments on commit 8f63559

Please sign in to comment.